base.py•5.22 kB
"""데이터 수집기 베이스 클래스"""
import asyncio
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Dict, Any, List, Optional
from src.exceptions import DataCollectionError, DataValidationError
class BaseCollector(ABC):
"""데이터 수집기 베이스 클래스"""
def __init__(self, config: Dict[str, Any]):
self.name = config.get("name", "unknown_collector")
self.config = config
self.retry_attempts = config.get("retry_attempts", 3)
self.retry_delay = config.get("retry_delay", 1)
self.timeout = config.get("timeout", 30)
self.batch_size = config.get("batch_size", 100)
# 상태 관리
self.is_running = False
self.last_run: Optional[datetime] = None
# 통계
self.collected_count = 0
self.error_count = 0
self.total_runs = 0
self._successful_runs = 0
# 로깅
self.logger = logging.getLogger(f"{__name__}.{self.name}")
@abstractmethod
async def collect_data(self) -> List[Dict[str, Any]]:
"""데이터 수집 구현 (서브클래스에서 구현)"""
pass
@abstractmethod
def validate_data(self, data: List[Dict[str, Any]]) -> bool:
"""데이터 검증 (서브클래스에서 구현)"""
pass
async def process_batch(self, batch: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""배치 데이터 처리 (필요시 서브클래스에서 오버라이드)"""
return batch
async def run(self) -> List[Dict[str, Any]]:
"""데이터 수집 실행"""
if self.is_running:
raise RuntimeError("Collector is already running")
self.is_running = True
self.total_runs += 1
try:
# 재시도 로직으로 데이터 수집
data = await self._collect_with_retry()
# 데이터 검증
if not self.validate_data(data):
self.error_count += 1
raise DataValidationError(f"Data validation failed for {self.name}")
# 배치 처리
processed_data = await self._process_in_batches(data)
# 성공 통계 업데이트
self.collected_count += len(processed_data)
self.last_run = datetime.now()
self._successful_runs += 1
self.logger.info(f"Successfully collected {len(processed_data)} items")
return processed_data
finally:
self.is_running = False
async def _collect_with_retry(self) -> List[Dict[str, Any]]:
"""재시도 로직이 포함된 데이터 수집"""
last_exception = None
for attempt in range(self.retry_attempts):
try:
self.logger.debug(f"Collection attempt {attempt + 1}/{self.retry_attempts}")
# 타임아웃 적용
data = await asyncio.wait_for(
self.collect_data(),
timeout=self.timeout
)
return data
except Exception as e:
last_exception = e
self.error_count += 1
self.logger.warning(
f"Collection attempt {attempt + 1} failed: {e}"
)
# 마지막 시도가 아니면 재시도 지연
if attempt < self.retry_attempts - 1:
await asyncio.sleep(self.retry_delay * (2 ** attempt)) # 지수 백오프
# 모든 재시도 실패
raise DataCollectionError(
f"Data collection failed after {self.retry_attempts} attempts: {last_exception}"
)
async def _process_in_batches(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""데이터를 배치 단위로 처리"""
if not data:
return data
processed_data = []
# 배치 단위로 나누어 처리
for i in range(0, len(data), self.batch_size):
batch = data[i:i + self.batch_size]
processed_batch = await self.process_batch(batch)
processed_data.extend(processed_batch)
return processed_data
def get_stats(self) -> Dict[str, Any]:
"""수집기 통계 반환"""
success_rate = 0.0
if self.total_runs > 0:
success_rate = self._successful_runs / self.total_runs
return {
"name": self.name,
"total_runs": self.total_runs,
"total_collected": self.collected_count,
"total_errors": self.error_count,
"success_rate": success_rate,
"last_run": self.last_run.isoformat() if self.last_run else None,
"is_running": self.is_running
}
async def shutdown(self) -> None:
"""수집기 종료 및 정리"""
self.is_running = False
self.logger.info(f"Collector {self.name} shutdown completed")