Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
base.py5.22 kB
"""데이터 수집기 베이스 클래스""" import asyncio import logging from abc import ABC, abstractmethod from datetime import datetime from typing import Dict, Any, List, Optional from src.exceptions import DataCollectionError, DataValidationError class BaseCollector(ABC): """데이터 수집기 베이스 클래스""" def __init__(self, config: Dict[str, Any]): self.name = config.get("name", "unknown_collector") self.config = config self.retry_attempts = config.get("retry_attempts", 3) self.retry_delay = config.get("retry_delay", 1) self.timeout = config.get("timeout", 30) self.batch_size = config.get("batch_size", 100) # 상태 관리 self.is_running = False self.last_run: Optional[datetime] = None # 통계 self.collected_count = 0 self.error_count = 0 self.total_runs = 0 self._successful_runs = 0 # 로깅 self.logger = logging.getLogger(f"{__name__}.{self.name}") @abstractmethod async def collect_data(self) -> List[Dict[str, Any]]: """데이터 수집 구현 (서브클래스에서 구현)""" pass @abstractmethod def validate_data(self, data: List[Dict[str, Any]]) -> bool: """데이터 검증 (서브클래스에서 구현)""" pass async def process_batch(self, batch: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """배치 데이터 처리 (필요시 서브클래스에서 오버라이드)""" return batch async def run(self) -> List[Dict[str, Any]]: """데이터 수집 실행""" if self.is_running: raise RuntimeError("Collector is already running") self.is_running = True self.total_runs += 1 try: # 재시도 로직으로 데이터 수집 data = await self._collect_with_retry() # 데이터 검증 if not self.validate_data(data): self.error_count += 1 raise DataValidationError(f"Data validation failed for {self.name}") # 배치 처리 processed_data = await self._process_in_batches(data) # 성공 통계 업데이트 self.collected_count += len(processed_data) self.last_run = datetime.now() self._successful_runs += 1 self.logger.info(f"Successfully collected {len(processed_data)} items") return processed_data finally: self.is_running = False async def _collect_with_retry(self) -> List[Dict[str, Any]]: """재시도 로직이 포함된 데이터 수집""" last_exception = None for attempt in range(self.retry_attempts): try: self.logger.debug(f"Collection attempt {attempt + 1}/{self.retry_attempts}") # 타임아웃 적용 data = await asyncio.wait_for( self.collect_data(), timeout=self.timeout ) return data except Exception as e: last_exception = e self.error_count += 1 self.logger.warning( f"Collection attempt {attempt + 1} failed: {e}" ) # 마지막 시도가 아니면 재시도 지연 if attempt < self.retry_attempts - 1: await asyncio.sleep(self.retry_delay * (2 ** attempt)) # 지수 백오프 # 모든 재시도 실패 raise DataCollectionError( f"Data collection failed after {self.retry_attempts} attempts: {last_exception}" ) async def _process_in_batches(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """데이터를 배치 단위로 처리""" if not data: return data processed_data = [] # 배치 단위로 나누어 처리 for i in range(0, len(data), self.batch_size): batch = data[i:i + self.batch_size] processed_batch = await self.process_batch(batch) processed_data.extend(processed_batch) return processed_data def get_stats(self) -> Dict[str, Any]: """수집기 통계 반환""" success_rate = 0.0 if self.total_runs > 0: success_rate = self._successful_runs / self.total_runs return { "name": self.name, "total_runs": self.total_runs, "total_collected": self.collected_count, "total_errors": self.error_count, "success_rate": success_rate, "last_run": self.last_run.isoformat() if self.last_run else None, "is_running": self.is_running } async def shutdown(self) -> None: """수집기 종료 및 정리""" self.is_running = False self.logger.info(f"Collector {self.name} shutdown completed")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server