#!/usr/bin/env python3
"""
Web Search Main - 완전한 웹 검색 MCP 메인 클래스
모든 구성 요소를 통합하여 완전한 웹 검색 솔루션을 제공합니다.
"""
import asyncio
import sqlite3
import json
import time
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from web_search_base import (
SearchEngine, SearchRequest, SearchResponse, SearchResult,
SearchResultType, ContentType, QualityScore
)
from web_search_playwright import PlaywrightManager
from web_search_antibot import EnhancedAntiBotManager
from web_search_analyzer import ContentAnalyzer
from web_search_engine import AdvancedSearchEngineHandler
logger = logging.getLogger(__name__)
class SearchCache:
"""고급 검색 캐시 시스템"""
def __init__(self, db_path: str = "search_cache.db", ttl_hours: int = 24):
self.db_path = db_path
self.ttl_hours = ttl_hours
self._init_database()
def _init_database(self):
"""데이터베이스 초기화"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS search_cache (
cache_key TEXT PRIMARY KEY,
query TEXT NOT NULL,
engines TEXT NOT NULL,
results TEXT NOT NULL,
metadata TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL
)
""")
# 인덱스 생성
conn.execute("CREATE INDEX IF NOT EXISTS idx_expires_at ON search_cache(expires_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_query ON search_cache(query)")
conn.commit()
logger.info("검색 캐시 데이터베이스 초기화 완료")
except Exception as e:
logger.error(f"캐시 데이터베이스 초기화 실패: {e}")
def get_cached_results(self, cache_key: str) -> Optional[SearchResponse]:
"""캐시된 결과 조회"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT results, metadata FROM search_cache
WHERE cache_key = ? AND expires_at > CURRENT_TIMESTAMP
""", (cache_key,))
row = cursor.fetchone()
if row:
results_data = json.loads(row[0])
metadata = json.loads(row[1]) if row[1] else {}
# SearchResponse 객체 재구성
response = SearchResponse(
query=metadata.get('query', ''),
results=[self._dict_to_search_result(r) for r in results_data],
total_results=metadata.get('total_results', len(results_data)),
search_time=metadata.get('search_time', 0.0),
engines_used=[SearchEngine(e) for e in metadata.get('engines_used', [])],
cached=True
)
logger.debug(f"캐시 히트: {cache_key}")
return response
except Exception as e:
logger.error(f"캐시 조회 실패: {e}")
return None
def save_results(self, cache_key: str, response: SearchResponse):
"""검색 결과 캐시 저장"""
try:
expires_at = datetime.now() + timedelta(hours=self.ttl_hours)
results_data = [result.to_dict() for result in response.results]
metadata = {
'query': response.query,
'total_results': response.total_results,
'search_time': response.search_time,
'engines_used': [e.value for e in response.engines_used]
}
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO search_cache
(cache_key, query, engines, results, metadata, expires_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (
cache_key,
response.query,
','.join([e.value for e in response.engines_used]),
json.dumps(results_data),
json.dumps(metadata),
expires_at.isoformat()
))
conn.commit()
logger.debug(f"캐시 저장: {cache_key}")
except Exception as e:
logger.error(f"캐시 저장 실패: {e}")
def cleanup_expired(self):
"""만료된 캐시 정리"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("DELETE FROM search_cache WHERE expires_at <= CURRENT_TIMESTAMP")
deleted_count = cursor.rowcount
conn.commit()
if deleted_count > 0:
logger.info(f"만료된 캐시 {deleted_count}개 정리 완료")
except Exception as e:
logger.error(f"캐시 정리 실패: {e}")
def _dict_to_search_result(self, data: Dict) -> SearchResult:
"""딕셔너리를 SearchResult 객체로 변환"""
return SearchResult(
title=data['title'],
url=data['url'],
snippet=data['snippet'],
result_type=SearchResultType(data['result_type']),
ranking=data['ranking'],
relevance_score=data.get('relevance_score', 0.0),
quality_score=QualityScore(data.get('quality_score', 3)),
authority_score=data.get('authority_score', 0.0),
freshness_score=data.get('freshness_score', 0.0),
extracted_content=data.get('extracted_content'),
content_summary=data.get('content_summary'),
content_type=ContentType(data['content_type']) if data.get('content_type') else ContentType.TEXT,
language=data.get('language', 'unknown'),
domain=data.get('domain'),
author=data.get('author'),
publish_date=datetime.fromisoformat(data['publish_date']) if data.get('publish_date') else None,
last_modified=datetime.fromisoformat(data['last_modified']) if data.get('last_modified') else None,
tags=data.get('tags', []),
response_time=data.get('response_time', 0.0),
content_length=data.get('content_length', 0),
status_code=data.get('status_code', 200),
timestamp=datetime.fromisoformat(data['timestamp'])
)
class CompleteWebSearchMCP:
"""완전한 웹 검색 MCP - 로컬 AI용 포괄적 인터넷 검색 기반 시스템"""
def __init__(self, cache_db_path: str = "search_cache.db", cache_ttl_hours: int = 24):
# 핵심 컴포넌트 초기화
self.playwright_manager = PlaywrightManager()
self.antibot_manager = EnhancedAntiBotManager()
self.content_analyzer = ContentAnalyzer()
self.engine_handler = AdvancedSearchEngineHandler(
self.antibot_manager,
self.playwright_manager
)
self.cache = SearchCache(cache_db_path, cache_ttl_hours)
# 통계 및 모니터링
self.search_stats = {
'total_searches': 0,
'cache_hits': 0,
'engine_usage': {},
'error_count': 0,
'avg_response_time': 0.0
}
logger.info("CompleteWebSearchMCP 초기화 완료")
async def search(self, request: SearchRequest) -> SearchResponse:
"""메인 검색 메서드"""
start_time = time.time()
try:
# 통계 업데이트
self.search_stats['total_searches'] += 1
# 캐시 확인
cache_key = request.get_cache_key()
cached_response = self.cache.get_cached_results(cache_key)
if cached_response:
self.search_stats['cache_hits'] += 1
return cached_response
# 다중 엔진 검색 실행
all_results = []
engines_used = []
# 병렬 검색 실행
if len(request.engines) > 1:
results = await self._search_multiple_engines_parallel(request)
else:
results = await self._search_single_engine(request, request.engines[0])
for engine, engine_results in results.items():
if engine_results:
engines_used.append(engine)
all_results.extend(engine_results)
# 엔진 사용 통계 업데이트
engine_key = engine.value
self.search_stats['engine_usage'][engine_key] = \
self.search_stats['engine_usage'].get(engine_key, 0) + 1
# 결과 후처리
processed_results = await self._post_process_results(all_results, request)
# 응답 생성
search_time = time.time() - start_time
response = SearchResponse(
query=request.query,
results=processed_results,
total_results=len(processed_results),
search_time=search_time,
engines_used=engines_used,
cached=False
)
# 통계 업데이트
self._update_response_time(search_time)
# 캐시 저장
self.cache.save_results(cache_key, response)
return response
except Exception as e:
self.search_stats['error_count'] += 1
logger.error(f"검색 실패: {e}")
raise
async def _search_multiple_engines_parallel(self, request: SearchRequest) -> Dict[SearchEngine, List[SearchResult]]:
"""다중 엔진 병렬 검색"""
tasks = []
for engine in request.engines:
task = self.engine_handler.search_engine_async(request, engine)
tasks.append((engine, task))
results = {}
completed_tasks = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True)
for i, (engine, _) in enumerate(tasks):
result = completed_tasks[i]
if isinstance(result, list):
results[engine] = result
else:
logger.error(f"엔진 {engine.value} 검색 실패: {result}")
results[engine] = []
return results
async def _search_single_engine(self, request: SearchRequest, engine: SearchEngine) -> Dict[SearchEngine, List[SearchResult]]:
"""단일 엔진 검색"""
results = await self.engine_handler.search_engine_async(request, engine)
return {engine: results}
async def _post_process_results(self, results: List[SearchResult], request: SearchRequest) -> List[SearchResult]:
"""검색 결과 후처리"""
if not results:
return results
# 중복 제거
if request.deduplicate:
results = self._deduplicate_results(results)
# 품질 필터링
if request.quality_filter:
results = await self._filter_by_quality(results)
# 콘텐츠 추출 및 요약
if request.extract_content or request.summarize_content:
results = await self._extract_and_summarize_content(results, request)
# 정렬 (관련성 점수 기준)
results.sort(key=lambda x: x.relevance_score, reverse=True)
# 결과 수 제한
return results[:request.num_results]
def _deduplicate_results(self, results: List[SearchResult]) -> List[SearchResult]:
"""중복 결과 제거"""
seen_urls = set()
unique_results = []
for result in results:
if result.url not in seen_urls:
seen_urls.add(result.url)
unique_results.append(result)
return unique_results
async def _filter_by_quality(self, results: List[SearchResult]) -> List[SearchResult]:
"""품질 기반 필터링"""
filtered_results = []
for result in results:
# 도메인 권위 점수 계산
result.authority_score = self.content_analyzer._calculate_authority_score(result.domain)
# 최소 품질 기준 (조정 가능)
if result.authority_score >= 0.3 and result.relevance_score >= 0.2:
filtered_results.append(result)
return filtered_results
async def _extract_and_summarize_content(self, results: List[SearchResult], request: SearchRequest) -> List[SearchResult]:
"""콘텐츠 추출 및 요약"""
# 동시 처리를 위한 ThreadPoolExecutor 사용
with ThreadPoolExecutor(max_workers=5) as executor:
tasks = []
for result in results[:10]: # 상위 10개만 처리
if request.extract_content:
task = executor.submit(self._extract_content_sync, result)
tasks.append((result, task))
# 결과 수집
for result, task in tasks:
try:
content = task.result(timeout=10) # 10초 타임아웃
if content:
result.extracted_content = content[:request.max_content_length]
# 품질 분석
quality, score, analysis = self.content_analyzer.analyze_content_quality(
content, result.url
)
result.quality_score = quality
result.authority_score = max(result.authority_score, analysis.get('authority_score', 0))
# 요약 생성
if request.summarize_content:
result.content_summary = self.content_analyzer.generate_summary(
content, max_length=200
)
except Exception as e:
logger.warning(f"콘텐츠 처리 실패 {result.url}: {e}")
continue
return results
def _extract_content_sync(self, result: SearchResult) -> Optional[str]:
"""동기적 콘텐츠 추출 (ThreadPoolExecutor용)"""
try:
import requests
session = self.antibot_manager.get_session(SearchEngine.GOOGLE)
response = session.get(result.url, timeout=10)
response.raise_for_status()
# BeautifulSoup으로 텍스트 추출
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# 불필요한 요소 제거
for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']):
element.decompose()
text = soup.get_text()
# 정리 및 압축
lines = [line.strip() for line in text.splitlines() if line.strip()]
clean_text = '\n'.join(lines)
return clean_text
except Exception as e:
logger.debug(f"콘텐츠 추출 실패 {result.url}: {e}")
return None
def _update_response_time(self, response_time: float):
"""평균 응답 시간 업데이트"""
current_avg = self.search_stats['avg_response_time']
total_searches = self.search_stats['total_searches']
if total_searches == 1:
self.search_stats['avg_response_time'] = response_time
else:
# 지수 이동 평균
alpha = 0.1
self.search_stats['avg_response_time'] = alpha * response_time + (1 - alpha) * current_avg
def get_search_stats(self) -> Dict[str, Any]:
"""검색 통계 반환"""
return self.search_stats.copy()
async def cleanup(self):
"""리소스 정리"""
await self.playwright_manager.cleanup()
self.cache.cleanup_expired()
logger.info("CompleteWebSearchMCP 정리 완료")
# MCP 서버 인터페이스
class WebSearchMCPServer:
"""MCP 서버 인터페이스"""
def __init__(self):
self.search_engine = CompleteWebSearchMCP()
async def search(self, **kwargs) -> Dict[str, Any]:
"""검색 실행 (MCP 호환)"""
try:
# 매개변수 파싱
query = kwargs.get('query', '')
if not query:
return {'error': 'Query is required'}
# SearchRequest 생성
request = SearchRequest(
query=query,
engines=[SearchEngine(e) for e in kwargs.get('engines', ['google'])],
num_results=kwargs.get('num_results', 10),
language=kwargs.get('language', 'ko'),
region=kwargs.get('region', 'KR'),
safe_search=kwargs.get('safe_search', True),
extract_content=kwargs.get('extract_content', True),
summarize_content=kwargs.get('summarize_content', True),
quality_filter=kwargs.get('quality_filter', True)
)
# 검색 실행
response = await self.search_engine.search(request)
return response.to_dict()
except Exception as e:
logger.error(f"MCP 검색 실패: {e}")
return {'error': str(e)}
async def get_stats(self) -> Dict[str, Any]:
"""검색 통계 조회"""
return self.search_engine.get_search_stats()
async def cleanup(self):
"""리소스 정리"""
await self.search_engine.cleanup()
# 실행 예제
async def main():
"""테스트용 메인 함수"""
search_mcp = CompleteWebSearchMCP()
try:
# 테스트 검색
request = SearchRequest(
query="Python 머신러닝 라이브러리",
engines=[SearchEngine.GOOGLE, SearchEngine.GITHUB],
num_results=10,
extract_content=True,
summarize_content=True
)
response = await search_mcp.search(request)
print(f"검색 결과: {response.total_results}개")
print(f"검색 시간: {response.search_time:.2f}초")
print(f"사용된 엔진: {[e.value for e in response.engines_used]}")
for i, result in enumerate(response.results[:5], 1):
print(f"\n{i}. {result.title}")
print(f" URL: {result.url}")
print(f" 관련성: {result.relevance_score:.2f}")
print(f" 요약: {result.content_summary[:100]}...")
finally:
await search_mcp.cleanup()
if __name__ == "__main__":
asyncio.run(main())