Skip to main content
Glama
JDeun

Unified Search MCP Server

by JDeun
base.py7.93 kB
# src/services/base.py """ 서비스 베이스 클래스 공통 기능 및 인터페이스 정의 """ from abc import ABC, abstractmethod from typing import Optional, Dict, Any, List, TypeVar, Generic import asyncio import logging from datetime import datetime import httpx from ..config import get_settings, get_security_config from ..models import ServiceError, ExternalAPIError, TimeoutError from ..cache import get_cache_manager, cached from ..utils import get_logger, PerformanceLogger, get_audit_logger from ..monitoring import MetricsCollector T = TypeVar('T') logger = get_logger(__name__) class BaseSearchService(ABC, Generic[T]): """검색 서비스 베이스 클래스""" def __init__(self): self.settings = get_settings() self.security_config = get_security_config() self.cache_manager = get_cache_manager() self.audit_logger = get_audit_logger() self._client: Optional[httpx.AsyncClient] = None self._client_lock = asyncio.Lock() @property @abstractmethod def service_name(self) -> str: """서비스 이름""" pass @property @abstractmethod def api_base_url(self) -> str: """API 베이스 URL""" pass async def get_client(self) -> httpx.AsyncClient: """HTTP 클라이언트 가져오기 (연결 풀링)""" if self._client is None: async with self._client_lock: if self._client is None: self._client = httpx.AsyncClient( timeout=httpx.Timeout(self.settings.http_timeout), limits=httpx.Limits( max_keepalive_connections=5, max_connections=10 ), headers=self._get_default_headers() ) return self._client def _get_default_headers(self) -> Dict[str, str]: """기본 헤더""" return { 'User-Agent': f'UnifiedSearchMCP/{self.settings.environment}', 'Accept': 'application/json', } @abstractmethod async def search(self, query: str, **kwargs) -> List[T]: """검색 수행""" pass async def _make_request( self, method: str, url: str, **kwargs ) -> httpx.Response: """HTTP 요청 수행""" client = await self.get_client() async with PerformanceLogger( f"{self.service_name}_request", logger ).add_context(method=method, url=url) as perf: try: response = await client.request(method, url, **kwargs) response.raise_for_status() # 메트릭 기록 MetricsCollector.record_api_call( api=self.service_name, endpoint=url, success=True, duration=perf.duration if hasattr(perf, 'duration') else 0 ) return response except httpx.TimeoutException as e: MetricsCollector.record_api_call( api=self.service_name, endpoint=url, success=False, duration=self.settings.http_timeout ) raise TimeoutError( service=self.service_name, timeout=self.settings.http_timeout, details={'url': url} ) except httpx.HTTPStatusError as e: MetricsCollector.record_api_call( api=self.service_name, endpoint=url, success=False, duration=perf.duration if hasattr(perf, 'duration') else 0 ) # 상태 코드별 처리 if e.response.status_code == 429: # Rate limit retry_after = e.response.headers.get('Retry-After', '60') raise ExternalAPIError( service=self.service_name, message=f"Rate limit exceeded. Retry after {retry_after}s", details={ 'status_code': 429, 'retry_after': retry_after } ) elif e.response.status_code >= 500: # 서버 에러 raise ExternalAPIError( service=self.service_name, message=f"Server error: {e.response.status_code}", details={'status_code': e.response.status_code} ) else: # 클라이언트 에러 raise ExternalAPIError( service=self.service_name, message=f"Client error: {e.response.status_code}", details={ 'status_code': e.response.status_code, 'response': e.response.text[:500] # 응답 일부 } ) except Exception as e: MetricsCollector.record_error( error_type=type(e).__name__, source=self.service_name ) raise async def close(self): """리소스 정리""" if self._client: await self._client.aclose() self._client = None def log_search( self, query: str, results_count: int, duration: float, **extra ): """검색 감사 로그""" self.audit_logger.log_search( query=query, source=self.service_name, results_count=results_count, duration=duration, **extra ) class RetryMixin: """재시도 믹스인""" async def retry_with_backoff( self, func, max_retries: int = 3, initial_delay: float = 1.0, max_delay: float = 60.0, backoff_factor: float = 2.0 ): """지수 백오프로 재시도""" delay = initial_delay last_exception = None for attempt in range(max_retries): try: return await func() except Exception as e: last_exception = e if attempt < max_retries - 1: logger.warning( f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s..." ) await asyncio.sleep(delay) delay = min(delay * backoff_factor, max_delay) else: logger.error(f"All {max_retries} attempts failed") raise last_exception class ConcurrentSearchMixin: """동시 검색 믹스인""" async def search_concurrently( self, search_funcs: List[tuple], max_concurrent: int = 5 ) -> Dict[str, Any]: """동시 검색 수행""" semaphore = asyncio.Semaphore(max_concurrent) async def limited_search(name: str, func): async with semaphore: try: return name, await func() except Exception as e: logger.error(f"Search {name} failed: {e}") return name, {'error': str(e)} tasks = [ limited_search(name, func) for name, func in search_funcs ] results = await asyncio.gather(*tasks) return dict(results)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JDeun/unified-search-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server