web_search.py•6.52 kB
"""
Web search capability implementation.
"""
import asyncio
import aiohttp
from typing import Dict, Any, List, Optional
from urllib.parse import quote_plus
import logging
logger = logging.getLogger(__name__)
class WebSearchCapability:
"""Tokenless web search using multiple search engines."""
def __init__(self):
self.session: Optional[aiohttp.ClientSession] = None
async def _ensure_session(self):
"""Ensure HTTP session is available."""
if self.session is None or self.session.closed:
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
headers={'User-Agent': 'Mozilla/5.0 (compatible; KatamariMCP/1.0)'}
)
async def search_duckduckgo(self, query: str, max_results: int = 5) -> List[Dict[str, Any]]:
"""Search using DuckDuckGo HTML version."""
await self._ensure_session()
try:
url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
async with self.session.get(url) as response:
if response.status != 200:
raise Exception(f"DuckDuckGo search failed: {response.status}")
html = await response.text()
return self._parse_duckduckgo_results(html, max_results)
except Exception as e:
logger.error(f"DuckDuckGo search error: {e}")
return []
def _parse_duckduckgo_results(self, html: str, max_results: int) -> List[Dict[str, Any]]:
"""Parse DuckDuckGo HTML results."""
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
results = []
# Find result divs
result_divs = soup.find_all('div', class_='result')
for div in result_divs[:max_results]:
try:
title_tag = div.find('a', class_='result__a')
snippet_tag = div.find('a', class_='result__snippet')
if title_tag:
title = title_tag.get_text(strip=True)
url = title_tag.get('href', '')
snippet = snippet_tag.get_text(strip=True) if snippet_tag else ''
results.append({
'title': title,
'url': url,
'snippet': snippet,
'source': 'duckduckgo'
})
except Exception as e:
logger.warning(f"Error parsing result: {e}")
continue
return results
async def search_brave(self, query: str, max_results: int = 5) -> List[Dict[str, Any]]:
"""Search using Brave Search API (free tier)."""
await self._ensure_session()
try:
# Brave Search API - no key required for basic usage
url = f"https://api.search.brave.com/res/v1/web/search"
params = {
'q': query,
'count': max_results,
'text_decorations': 'false'
}
headers = {
'Accept': 'application/json',
'User-Agent': 'Mozilla/5.0 (compatible; KatamariMCP/1.0)'
}
async with self.session.get(url, params=params, headers=headers) as response:
if response.status != 200:
raise Exception(f"Brave search failed: {response.status}")
data = await response.json()
return self._parse_brave_results(data)
except Exception as e:
logger.error(f"Brave search error: {e}")
return []
def _parse_brave_results(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Parse Brave Search API results."""
results = []
if 'web' in data and 'results' in data['web']:
for item in data['web']['results']:
results.append({
'title': item.get('title', ''),
'url': item.get('url', ''),
'snippet': item.get('description', ''),
'source': 'brave'
})
return results
async def search(self, query: str, max_results: int = 5) -> Dict[str, Any]:
"""Perform web search using multiple engines."""
if not query or not query.strip():
raise ValueError("Query cannot be empty")
# Try multiple search engines in parallel
tasks = [
self.search_duckduckgo(query, max_results),
self.search_brave(query, max_results)
]
results_lists = await asyncio.gather(*tasks, return_exceptions=True)
all_results = []
sources_used = []
for i, results in enumerate(results_lists):
if isinstance(results, Exception):
logger.warning(f"Search engine {i} failed: {results}")
continue
if results and isinstance(results, list):
all_results.extend(results)
sources_used.append(['duckduckgo', 'brave'][i])
# Remove duplicates based on URL
seen_urls = set()
unique_results = []
for result in all_results:
url = result.get('url', '')
if url and url not in seen_urls:
seen_urls.add(url)
unique_results.append(result)
# Limit results
final_results = unique_results[:max_results]
return {
'query': query,
'results': final_results,
'total_results': len(final_results),
'sources_used': sources_used,
'status': 'success' if final_results else 'no_results'
}
async def close(self):
"""Close HTTP session."""
if self.session and not self.session.closed:
await self.session.close()
# Global instance
_web_search = WebSearchCapability()
async def web_search(query: str, max_results: int = 5) -> Dict[str, Any]:
"""Main web search function."""
return await _web_search.search(query, max_results)
async def close_web_search():
"""Cleanup function."""
if _web_search.session and not _web_search.session.closed:
await _web_search.session.close()