Skip to main content
Glama
JDeun

Unified Search MCP Server

by JDeun
unified.py8.13 kB
# src/services/unified.py """ 통합 검색 서비스 여러 소스에서 동시에 검색 수행 """ import asyncio from typing import List, Dict, Any, Optional, Union from datetime import datetime import logging from .base import ConcurrentSearchMixin from .scholar import GoogleScholarService from .web import GoogleWebService from .youtube import YouTubeService from ..models import ( SearchSource, SearchRequest, SearchResponse, ScholarResult, WebResult, YouTubeResult, APIUsageStats ) from ..config import get_settings from ..cache import get_cache_manager, cached from ..utils import PerformanceLogger, get_audit_logger from ..monitoring import MetricsCollector logger = logging.getLogger(__name__) class UnifiedSearchService(ConcurrentSearchMixin): """통합 검색 서비스""" def __init__(self): self.settings = get_settings() self.cache_manager = get_cache_manager() self.audit_logger = get_audit_logger() # 서비스 초기화 self.services = { SearchSource.SCHOLAR: GoogleScholarService(), SearchSource.WEB: GoogleWebService(), SearchSource.YOUTUBE: YouTubeService() } @cached(ttl=1800, source="unified") # 30분 캐시 async def search(self, request: SearchRequest) -> SearchResponse: """ 통합 검색 수행 Args: request: 검색 요청 객체 Returns: SearchResponse: 통합 검색 결과 """ start_time = datetime.utcnow() async with PerformanceLogger( "unified_search", logger ).add_context( query=request.query, sources=[s.value for s in request.sources] ): # 검색 작업 준비 search_tasks = [] for source in request.sources: if source in self.services: search_tasks.append(( source, self._search_source(source, request) )) # 동시 검색 실행 results_dict = await self.search_concurrently( search_tasks, max_concurrent=len(search_tasks) # 모든 소스 동시 실행 ) # 결과 정리 results: Dict[SearchSource, List[Union[ScholarResult, WebResult, YouTubeResult]]] = {} errors: Dict[SearchSource, str] = {} for source, result in results_dict.items(): if isinstance(result, dict) and 'error' in result: errors[source] = result['error'] results[source] = [] else: results[source] = result # 검색 시간 계산 search_time = (datetime.utcnow() - start_time).total_seconds() # 응답 생성 response = SearchResponse( query=request.query, results=results, search_time=search_time, errors=errors, metadata={ 'requested_sources': [s.value for s in request.sources], 'successful_sources': [s.value for s in results.keys() if s not in errors], 'cache_hit': False # 캐시 데코레이터가 처리 } ) # 감사 로그 self.audit_logger.log_search( query=request.query, source="unified", results_count=response.total_results, duration=search_time, metadata=response.metadata ) # 메트릭 기록 for source in request.sources: success = source not in errors MetricsCollector.record_search( source=source.value, success=success, duration=search_time / len(request.sources) # 평균 시간 ) return response async def _search_source( self, source: SearchSource, request: SearchRequest ) -> List[Union[ScholarResult, WebResult, YouTubeResult]]: """개별 소스 검색""" service = self.services[source] try: # 소스별 파라미터 준비 if source == SearchSource.SCHOLAR: results = await service.search( query=request.query, num_results=request.num_results, author=request.author, year_start=request.year_start, year_end=request.year_end ) elif source == SearchSource.WEB: results = await service.search( query=request.query, num_results=request.num_results, language=request.language, safe_search=request.safe_search ) elif source == SearchSource.YOUTUBE: results = await service.search( query=request.query, num_results=request.num_results, video_duration=request.video_duration, upload_date=request.upload_date, order=request.sort_order ) else: results = [] return results except Exception as e: logger.error(f"{source.value} 검색 오류: {e}") raise async def get_api_usage_stats(self) -> APIUsageStats: """API 사용량 통계 조회""" stats = APIUsageStats() # 캐시 통계 cache_stats = self.cache_manager.get_stats() for source in SearchSource: stats.cache_stats[source.value] = { 'size': cache_stats.get('total_requests', 0), 'hit_rate': cache_stats.get('hit_rate', 0) } # API 한도 정보 stats.limits['google_web'] = { 'daily_limit': self.settings.google_web_daily_limit, 'used': stats.usage.get('google_web', 0), 'remaining': max(0, self.settings.google_web_daily_limit - stats.usage.get('google_web', 0)) } stats.limits['youtube'] = { 'daily_limit': self.settings.youtube_daily_limit, 'used': stats.usage.get('youtube', 0), 'remaining': max(0, self.settings.youtube_daily_limit - stats.usage.get('youtube', 0)) } return stats async def get_service_status(self) -> Dict[str, Any]: """서비스 상태 조회""" status = {} for source, service in self.services.items(): try: # 서비스 헬스 체크 is_healthy = await service.health_check() if hasattr(service, 'health_check') else True status[source.value] = { 'status': 'healthy' if is_healthy else 'unhealthy', 'available': True } except Exception as e: status[source.value] = { 'status': 'error', 'available': False, 'error': str(e) } return status async def close(self): """리소스 정리""" # 모든 서비스 종료 close_tasks = [ service.close() for service in self.services.values() ] await asyncio.gather(*close_tasks, return_exceptions=True) # 싱글톤 인스턴스 _unified_service: Optional[UnifiedSearchService] = None def get_unified_service() -> UnifiedSearchService: """통합 검색 서비스 싱글톤""" global _unified_service if _unified_service is None: _unified_service = UnifiedSearchService() return _unified_service

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JDeun/unified-search-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server