Skip to main content
Glama
Skynotdie

MCP Localization Project

by Skynotdie
web_search_main.py19.7 kB
#!/usr/bin/env python3 """ Web Search Main - 완전한 웹 검색 MCP 메인 클래스 모든 구성 요소를 통합하여 완전한 웹 검색 솔루션을 제공합니다. """ import asyncio import sqlite3 import json import time import logging from pathlib import Path from typing import Dict, List, Optional, Any from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed from web_search_base import ( SearchEngine, SearchRequest, SearchResponse, SearchResult, SearchResultType, ContentType, QualityScore ) from web_search_playwright import PlaywrightManager from web_search_antibot import EnhancedAntiBotManager from web_search_analyzer import ContentAnalyzer from web_search_engine import AdvancedSearchEngineHandler logger = logging.getLogger(__name__) class SearchCache: """고급 검색 캐시 시스템""" def __init__(self, db_path: str = "search_cache.db", ttl_hours: int = 24): self.db_path = db_path self.ttl_hours = ttl_hours self._init_database() def _init_database(self): """데이터베이스 초기화""" try: with sqlite3.connect(self.db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS search_cache ( cache_key TEXT PRIMARY KEY, query TEXT NOT NULL, engines TEXT NOT NULL, results TEXT NOT NULL, metadata TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP NOT NULL ) """) # 인덱스 생성 conn.execute("CREATE INDEX IF NOT EXISTS idx_expires_at ON search_cache(expires_at)") conn.execute("CREATE INDEX IF NOT EXISTS idx_query ON search_cache(query)") conn.commit() logger.info("검색 캐시 데이터베이스 초기화 완료") except Exception as e: logger.error(f"캐시 데이터베이스 초기화 실패: {e}") def get_cached_results(self, cache_key: str) -> Optional[SearchResponse]: """캐시된 결과 조회""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" SELECT results, metadata FROM search_cache WHERE cache_key = ? AND expires_at > CURRENT_TIMESTAMP """, (cache_key,)) row = cursor.fetchone() if row: results_data = json.loads(row[0]) metadata = json.loads(row[1]) if row[1] else {} # SearchResponse 객체 재구성 response = SearchResponse( query=metadata.get('query', ''), results=[self._dict_to_search_result(r) for r in results_data], total_results=metadata.get('total_results', len(results_data)), search_time=metadata.get('search_time', 0.0), engines_used=[SearchEngine(e) for e in metadata.get('engines_used', [])], cached=True ) logger.debug(f"캐시 히트: {cache_key}") return response except Exception as e: logger.error(f"캐시 조회 실패: {e}") return None def save_results(self, cache_key: str, response: SearchResponse): """검색 결과 캐시 저장""" try: expires_at = datetime.now() + timedelta(hours=self.ttl_hours) results_data = [result.to_dict() for result in response.results] metadata = { 'query': response.query, 'total_results': response.total_results, 'search_time': response.search_time, 'engines_used': [e.value for e in response.engines_used] } with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT OR REPLACE INTO search_cache (cache_key, query, engines, results, metadata, expires_at) VALUES (?, ?, ?, ?, ?, ?) """, ( cache_key, response.query, ','.join([e.value for e in response.engines_used]), json.dumps(results_data), json.dumps(metadata), expires_at.isoformat() )) conn.commit() logger.debug(f"캐시 저장: {cache_key}") except Exception as e: logger.error(f"캐시 저장 실패: {e}") def cleanup_expired(self): """만료된 캐시 정리""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute("DELETE FROM search_cache WHERE expires_at <= CURRENT_TIMESTAMP") deleted_count = cursor.rowcount conn.commit() if deleted_count > 0: logger.info(f"만료된 캐시 {deleted_count}개 정리 완료") except Exception as e: logger.error(f"캐시 정리 실패: {e}") def _dict_to_search_result(self, data: Dict) -> SearchResult: """딕셔너리를 SearchResult 객체로 변환""" return SearchResult( title=data['title'], url=data['url'], snippet=data['snippet'], result_type=SearchResultType(data['result_type']), ranking=data['ranking'], relevance_score=data.get('relevance_score', 0.0), quality_score=QualityScore(data.get('quality_score', 3)), authority_score=data.get('authority_score', 0.0), freshness_score=data.get('freshness_score', 0.0), extracted_content=data.get('extracted_content'), content_summary=data.get('content_summary'), content_type=ContentType(data['content_type']) if data.get('content_type') else ContentType.TEXT, language=data.get('language', 'unknown'), domain=data.get('domain'), author=data.get('author'), publish_date=datetime.fromisoformat(data['publish_date']) if data.get('publish_date') else None, last_modified=datetime.fromisoformat(data['last_modified']) if data.get('last_modified') else None, tags=data.get('tags', []), response_time=data.get('response_time', 0.0), content_length=data.get('content_length', 0), status_code=data.get('status_code', 200), timestamp=datetime.fromisoformat(data['timestamp']) ) class CompleteWebSearchMCP: """완전한 웹 검색 MCP - 로컬 AI용 포괄적 인터넷 검색 기반 시스템""" def __init__(self, cache_db_path: str = "search_cache.db", cache_ttl_hours: int = 24): # 핵심 컴포넌트 초기화 self.playwright_manager = PlaywrightManager() self.antibot_manager = EnhancedAntiBotManager() self.content_analyzer = ContentAnalyzer() self.engine_handler = AdvancedSearchEngineHandler( self.antibot_manager, self.playwright_manager ) self.cache = SearchCache(cache_db_path, cache_ttl_hours) # 통계 및 모니터링 self.search_stats = { 'total_searches': 0, 'cache_hits': 0, 'engine_usage': {}, 'error_count': 0, 'avg_response_time': 0.0 } logger.info("CompleteWebSearchMCP 초기화 완료") async def search(self, request: SearchRequest) -> SearchResponse: """메인 검색 메서드""" start_time = time.time() try: # 통계 업데이트 self.search_stats['total_searches'] += 1 # 캐시 확인 cache_key = request.get_cache_key() cached_response = self.cache.get_cached_results(cache_key) if cached_response: self.search_stats['cache_hits'] += 1 return cached_response # 다중 엔진 검색 실행 all_results = [] engines_used = [] # 병렬 검색 실행 if len(request.engines) > 1: results = await self._search_multiple_engines_parallel(request) else: results = await self._search_single_engine(request, request.engines[0]) for engine, engine_results in results.items(): if engine_results: engines_used.append(engine) all_results.extend(engine_results) # 엔진 사용 통계 업데이트 engine_key = engine.value self.search_stats['engine_usage'][engine_key] = \ self.search_stats['engine_usage'].get(engine_key, 0) + 1 # 결과 후처리 processed_results = await self._post_process_results(all_results, request) # 응답 생성 search_time = time.time() - start_time response = SearchResponse( query=request.query, results=processed_results, total_results=len(processed_results), search_time=search_time, engines_used=engines_used, cached=False ) # 통계 업데이트 self._update_response_time(search_time) # 캐시 저장 self.cache.save_results(cache_key, response) return response except Exception as e: self.search_stats['error_count'] += 1 logger.error(f"검색 실패: {e}") raise async def _search_multiple_engines_parallel(self, request: SearchRequest) -> Dict[SearchEngine, List[SearchResult]]: """다중 엔진 병렬 검색""" tasks = [] for engine in request.engines: task = self.engine_handler.search_engine_async(request, engine) tasks.append((engine, task)) results = {} completed_tasks = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True) for i, (engine, _) in enumerate(tasks): result = completed_tasks[i] if isinstance(result, list): results[engine] = result else: logger.error(f"엔진 {engine.value} 검색 실패: {result}") results[engine] = [] return results async def _search_single_engine(self, request: SearchRequest, engine: SearchEngine) -> Dict[SearchEngine, List[SearchResult]]: """단일 엔진 검색""" results = await self.engine_handler.search_engine_async(request, engine) return {engine: results} async def _post_process_results(self, results: List[SearchResult], request: SearchRequest) -> List[SearchResult]: """검색 결과 후처리""" if not results: return results # 중복 제거 if request.deduplicate: results = self._deduplicate_results(results) # 품질 필터링 if request.quality_filter: results = await self._filter_by_quality(results) # 콘텐츠 추출 및 요약 if request.extract_content or request.summarize_content: results = await self._extract_and_summarize_content(results, request) # 정렬 (관련성 점수 기준) results.sort(key=lambda x: x.relevance_score, reverse=True) # 결과 수 제한 return results[:request.num_results] def _deduplicate_results(self, results: List[SearchResult]) -> List[SearchResult]: """중복 결과 제거""" seen_urls = set() unique_results = [] for result in results: if result.url not in seen_urls: seen_urls.add(result.url) unique_results.append(result) return unique_results async def _filter_by_quality(self, results: List[SearchResult]) -> List[SearchResult]: """품질 기반 필터링""" filtered_results = [] for result in results: # 도메인 권위 점수 계산 result.authority_score = self.content_analyzer._calculate_authority_score(result.domain) # 최소 품질 기준 (조정 가능) if result.authority_score >= 0.3 and result.relevance_score >= 0.2: filtered_results.append(result) return filtered_results async def _extract_and_summarize_content(self, results: List[SearchResult], request: SearchRequest) -> List[SearchResult]: """콘텐츠 추출 및 요약""" # 동시 처리를 위한 ThreadPoolExecutor 사용 with ThreadPoolExecutor(max_workers=5) as executor: tasks = [] for result in results[:10]: # 상위 10개만 처리 if request.extract_content: task = executor.submit(self._extract_content_sync, result) tasks.append((result, task)) # 결과 수집 for result, task in tasks: try: content = task.result(timeout=10) # 10초 타임아웃 if content: result.extracted_content = content[:request.max_content_length] # 품질 분석 quality, score, analysis = self.content_analyzer.analyze_content_quality( content, result.url ) result.quality_score = quality result.authority_score = max(result.authority_score, analysis.get('authority_score', 0)) # 요약 생성 if request.summarize_content: result.content_summary = self.content_analyzer.generate_summary( content, max_length=200 ) except Exception as e: logger.warning(f"콘텐츠 처리 실패 {result.url}: {e}") continue return results def _extract_content_sync(self, result: SearchResult) -> Optional[str]: """동기적 콘텐츠 추출 (ThreadPoolExecutor용)""" try: import requests session = self.antibot_manager.get_session(SearchEngine.GOOGLE) response = session.get(result.url, timeout=10) response.raise_for_status() # BeautifulSoup으로 텍스트 추출 from bs4 import BeautifulSoup soup = BeautifulSoup(response.text, 'html.parser') # 불필요한 요소 제거 for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']): element.decompose() text = soup.get_text() # 정리 및 압축 lines = [line.strip() for line in text.splitlines() if line.strip()] clean_text = '\n'.join(lines) return clean_text except Exception as e: logger.debug(f"콘텐츠 추출 실패 {result.url}: {e}") return None def _update_response_time(self, response_time: float): """평균 응답 시간 업데이트""" current_avg = self.search_stats['avg_response_time'] total_searches = self.search_stats['total_searches'] if total_searches == 1: self.search_stats['avg_response_time'] = response_time else: # 지수 이동 평균 alpha = 0.1 self.search_stats['avg_response_time'] = alpha * response_time + (1 - alpha) * current_avg def get_search_stats(self) -> Dict[str, Any]: """검색 통계 반환""" return self.search_stats.copy() async def cleanup(self): """리소스 정리""" await self.playwright_manager.cleanup() self.cache.cleanup_expired() logger.info("CompleteWebSearchMCP 정리 완료") # MCP 서버 인터페이스 class WebSearchMCPServer: """MCP 서버 인터페이스""" def __init__(self): self.search_engine = CompleteWebSearchMCP() async def search(self, **kwargs) -> Dict[str, Any]: """검색 실행 (MCP 호환)""" try: # 매개변수 파싱 query = kwargs.get('query', '') if not query: return {'error': 'Query is required'} # SearchRequest 생성 request = SearchRequest( query=query, engines=[SearchEngine(e) for e in kwargs.get('engines', ['google'])], num_results=kwargs.get('num_results', 10), language=kwargs.get('language', 'ko'), region=kwargs.get('region', 'KR'), safe_search=kwargs.get('safe_search', True), extract_content=kwargs.get('extract_content', True), summarize_content=kwargs.get('summarize_content', True), quality_filter=kwargs.get('quality_filter', True) ) # 검색 실행 response = await self.search_engine.search(request) return response.to_dict() except Exception as e: logger.error(f"MCP 검색 실패: {e}") return {'error': str(e)} async def get_stats(self) -> Dict[str, Any]: """검색 통계 조회""" return self.search_engine.get_search_stats() async def cleanup(self): """리소스 정리""" await self.search_engine.cleanup() # 실행 예제 async def main(): """테스트용 메인 함수""" search_mcp = CompleteWebSearchMCP() try: # 테스트 검색 request = SearchRequest( query="Python 머신러닝 라이브러리", engines=[SearchEngine.GOOGLE, SearchEngine.GITHUB], num_results=10, extract_content=True, summarize_content=True ) response = await search_mcp.search(request) print(f"검색 결과: {response.total_results}개") print(f"검색 시간: {response.search_time:.2f}초") print(f"사용된 엔진: {[e.value for e in response.engines_used]}") for i, result in enumerate(response.results[:5], 1): print(f"\n{i}. {result.title}") print(f" URL: {result.url}") print(f" 관련성: {result.relevance_score:.2f}") print(f" 요약: {result.content_summary[:100]}...") finally: await search_mcp.cleanup() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Skynotdie/mky'

If you have feedback or need assistance with the MCP directory API, please join our Discord server