Skip to main content
Glama

MCP MixSearch

by geosp
service.py23 kB
""" Web Search Service Provides web search functionality with multiple engines and content extraction. """ import asyncio import logging from typing import List, Dict, Any, Optional from dataclasses import dataclass from datetime import datetime, UTC import urllib.parse import httpx import aiohttp from bs4 import BeautifulSoup from playwright.async_api import async_playwright from fake_useragent import UserAgent import os import json from dotenv import load_dotenv from ddgs import DDGS # Load environment variables from .env file load_dotenv() # Configure logging to a file logging.basicConfig( filename="web_search_debug.log", level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Ensure logging to file is properly configured file_handler = logging.FileHandler("web_search_debug.log") file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" )) if not any(isinstance(handler, logging.FileHandler) for handler in logger.handlers): logger.addHandler(file_handler) @dataclass class SearchResult: title: str url: str description: str full_content: str = "" content_preview: str = "" word_count: int = 0 timestamp: str = "" fetch_status: str = "success" error: Optional[str] = None class WebSearchService: def __init__(self): self.ua = UserAgent() self.timeout = aiohttp.ClientTimeout(total=30) self.max_content_length = 500000 self.max_concurrent_requests = 5 self.brave_api_key = os.getenv("BRAVE_API_KEY") async def search_and_extract(self, query: str, limit: int = 5, max_content_length: Optional[int] = None, include_content: bool = True, **kwargs) -> Dict[str, Any]: """Full web search with content extraction""" logger.info(f"Starting comprehensive web search for: '{query}' with parameters: {kwargs}") results = await self._multi_engine_search(query, limit, **kwargs) # Extract full content concurrently if requested if include_content: await self._extract_content_concurrent(results, max_content_length) return { "query": query, "results": [self._result_to_dict(r) for r in results], "total_results": len(results), "status": "success" } async def search_summaries(self, query: str, limit: int = 5, **kwargs) -> Dict[str, Any]: """Lightweight search returning only summaries using multi-engine approach""" logger.info(f"Starting multi-engine search for summaries: '{query}' with parameters: {kwargs}") # Use multi-engine search instead of just Brave API results = await self._multi_engine_search(query, limit, **kwargs) # Format results as summaries (no content extraction needed) return { "query": query, "results": [ { "title": r.title, "url": r.url, "description": r.description, "timestamp": r.timestamp } for r in results ], "total_results": len(results), "status": "success" } async def extract_single_page(self, url: str, max_content_length: Optional[int] = None) -> str: """Extract content from a single webpage""" logger.info(f"Extracting content from: {url}") return await self._extract_page_content(url, max_content_length) async def _multi_engine_search(self, query: str, limit: int, **kwargs) -> List[SearchResult]: """Try multiple search engines in order of preference""" engines = [ ("DDGS", self._search_ddgs), # DDGS with Brave/DDG backends (highest priority) ("Brave", self._search_brave), # Browser-based Brave (fallback) ("DuckDuckGo", self._search_duckduckgo) # Browser-based DDG (last resort) ] for engine_name, engine_func in engines: try: logger.info(f"Trying {engine_name} search engine") results = await engine_func(query, limit, **kwargs) if results and self._assess_result_quality(results, query) >= 0.3: logger.info(f"Using results from {engine_name}: {len(results)} results") return results[:limit] except Exception as e: logger.warning(f"Search engine {engine_name} failed: {e}") continue logger.warning("All search engines failed, returning empty results") return [] async def _search_brave(self, query: str, limit: int, **kwargs) -> List[SearchResult]: """Browser-based Brave search""" async with async_playwright() as p: browser = await p.firefox.launch(headless=True) try: context = await browser.new_context( user_agent=self.ua.random, viewport={'width': 1920, 'height': 1080} ) page = await context.new_page() # Build search URL with parameters search_params = {'q': query, 'spellcheck': '1'} # Add country parameter if specified if kwargs.get('country'): search_params['country'] = kwargs['country'].upper() # Add language parameter if specified if kwargs.get('language'): search_params['lang'] = kwargs['language'] # Add time filter if recency_days specified if kwargs.get('recency_days') is not None: recency = kwargs['recency_days'] if recency == 1: search_params['tf'] = 'd' # past day elif recency <= 7: search_params['tf'] = 'w' # past week elif recency <= 30: search_params['tf'] = 'm' # past month elif recency <= 365: search_params['tf'] = 'y' # past year param_string = '&'.join([f"{k}={urllib.parse.quote(str(v))}" for k, v in search_params.items()]) search_url = f"https://search.brave.com/search?{param_string}" logger.info(f"Browser-based Brave search URL: {search_url}") await page.goto(search_url) await page.wait_for_load_state('networkidle') # Extract results results_data = await page.evaluate(""" () => { const results = []; document.querySelectorAll('.snippet').forEach(el => { const titleEl = el.querySelector('a'); const descEl = el.querySelector('.snippet-description, .snippet-content'); if (titleEl && descEl) { results.push({ title: titleEl.textContent.trim(), url: titleEl.href, description: descEl.textContent.trim() }); } }); return results; } """) results = [] for r in results_data[:limit]: results.append(SearchResult( title=r['title'], url=r['url'], description=r['description'], timestamp=datetime.now(UTC).isoformat() )) return results finally: await browser.close() async def _search_duckduckgo(self, query: str, limit: int, **kwargs) -> List[SearchResult]: """HTTP-based DuckDuckGo search""" async with aiohttp.ClientSession(timeout=self.timeout) as session: headers = {'User-Agent': self.ua.random} # Build DuckDuckGo URL with parameters search_params = {'q': query} # DuckDuckGo supports some region filtering if kwargs.get('country'): # DuckDuckGo uses region codes like us-en, uk-en, etc. country = kwargs['country'].lower() language = kwargs.get('language', 'en') search_params['kl'] = f"{country}-{language}" param_string = '&'.join([f"{k}={urllib.parse.quote(str(v))}" for k, v in search_params.items()]) url = f"https://duckduckgo.com/html/?{param_string}" logger.info(f"DuckDuckGo search URL: {url}") async with session.get(url, headers=headers) as response: html = await response.text() soup = BeautifulSoup(html, 'lxml') results = [] for result in soup.select('.result')[:limit]: title_el = result.select_one('.result__title a') snippet_el = result.select_one('.result__snippet') if title_el and snippet_el: # Extract actual URL from DDG redirect href = title_el['href'] actual_url = href if href.startswith('//duckduckgo.com/l/?uddg='): # Parse the actual URL from DDG redirect try: actual_url = href.split('uddg=')[1].split('&')[0] actual_url = urllib.parse.unquote(actual_url) except: actual_url = href results.append(SearchResult( title=title_el.get_text().strip(), url=actual_url, description=snippet_el.get_text().strip(), timestamp=datetime.now(UTC).isoformat() )) return results async def _search_ddgs(self, query: str, limit: int, **kwargs) -> List[SearchResult]: """DDGS library-based search with full parameter support and Brave backend preference""" try: logger.info(f"Starting DDGS library search for: '{query}' with parameters: {kwargs}") # Prepare DDGS parameters search_params = { 'max_results': limit } # Try Brave backend first (preferred) with supported parameters brave_compatible = True brave_params = search_params.copy() # Brave backend supports language and time, but not region if kwargs.get('language'): brave_params['language'] = kwargs['language'] # Map recency_days to DDGS time parameter if kwargs.get('recency_days') is not None: recency = kwargs['recency_days'] if recency == 1: brave_params['time'] = 'day' elif recency <= 7: brave_params['time'] = 'week' elif recency <= 30: brave_params['time'] = 'month' elif recency <= 365: brave_params['time'] = 'year' # -1 means no time filter, so we don't set time parameter # If country/region is specified, Brave backend doesn't support it if kwargs.get('country'): brave_compatible = False logger.info(f"Country filter '{kwargs['country']}' not compatible with Brave backend, will try DuckDuckGo") # Choose search type based on source parameter search_method = 'text' # default if kwargs.get('source'): source = kwargs['source'].lower() if source == 'news': search_method = 'news' elif source in ['images', 'videos']: search_method = source # Try Brave backend first if compatible if brave_compatible: try: brave_params['backend'] = 'brave' logger.info(f"DDGS search - trying Brave backend, method: {search_method}, params: {brave_params}") ddgs = DDGS() search_func = getattr(ddgs, search_method) # Execute search in a thread pool to avoid blocking loop = asyncio.get_event_loop() raw_results = await loop.run_in_executor( None, lambda: list(search_func(query, **brave_params)) ) if raw_results: logger.info(f"DDGS Brave backend successful: {len(raw_results)} results") return self._convert_ddgs_results(raw_results, search_method) except Exception as e: logger.warning(f"DDGS Brave backend failed: {e}, trying DuckDuckGo backend") # Fallback to DuckDuckGo backend with full parameter support ddg_params = search_params.copy() if kwargs.get('country'): ddg_params['region'] = kwargs['country'].lower() if kwargs.get('language'): ddg_params['language'] = kwargs['language'] # Map recency_days to DDGS time parameter for DDG if kwargs.get('recency_days') is not None: recency = kwargs['recency_days'] if recency == 1: ddg_params['time'] = 'day' elif recency <= 7: ddg_params['time'] = 'week' elif recency <= 30: ddg_params['time'] = 'month' elif recency <= 365: ddg_params['time'] = 'year' ddg_params['backend'] = 'duckduckgo' logger.info(f"DDGS search - using DuckDuckGo backend, method: {search_method}, params: {ddg_params}") ddgs = DDGS() search_func = getattr(ddgs, search_method) # Execute search in a thread pool to avoid blocking loop = asyncio.get_event_loop() raw_results = await loop.run_in_executor( None, lambda: list(search_func(query, **ddg_params)) ) logger.info(f"DDGS DuckDuckGo backend completed: {len(raw_results)} results") return self._convert_ddgs_results(raw_results, search_method) except Exception as e: logger.error(f"DDGS search failed: {e}") raise e def _convert_ddgs_results(self, raw_results: list, search_method: str) -> List[SearchResult]: """Convert DDGS results to our SearchResult format""" results = [] for item in raw_results: if search_method == 'news': # News results have: date, title, body, url, image, source results.append(SearchResult( title=item.get('title', ''), url=item.get('url', ''), description=item.get('body', ''), timestamp=item.get('date', datetime.now(UTC).isoformat()) )) else: # Text results have: title, href, body results.append(SearchResult( title=item.get('title', ''), url=item.get('href', ''), description=item.get('body', ''), timestamp=datetime.now(UTC).isoformat() )) return results async def _extract_content_concurrent(self, results: List[SearchResult], max_content_length: Optional[int]) -> None: """Extract content from multiple URLs concurrently""" semaphore = asyncio.Semaphore(self.max_concurrent_requests) async def extract_with_semaphore(result: SearchResult): async with semaphore: try: content = await self._extract_page_content(result.url, max_content_length) result.full_content = content result.content_preview = content[:500] + "..." if len(content) > 500 else content result.word_count = len(content.split()) result.fetch_status = "success" except Exception as e: result.fetch_status = "error" result.error = str(e) logger.warning(f"Content extraction failed for {result.url}: {e}") await asyncio.gather(*[extract_with_semaphore(r) for r in results]) async def _extract_page_content(self, url: str, max_content_length: Optional[int]) -> str: """Extract readable content from a webpage""" try: # Try fast HTTP extraction first content = await self._extract_with_httpx(url, max_content_length) if self._is_meaningful_content(content): return content except Exception as e: logger.debug(f"HTTP extraction failed for {url}: {e}") # Fallback to browser extraction return await self._extract_with_browser(url, max_content_length) async def _extract_with_httpx(self, url: str, max_content_length: Optional[int]) -> str: """Fast HTTP-based content extraction""" async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: response = await client.get(url, headers={'User-Agent': self.ua.random}) response.raise_for_status() soup = BeautifulSoup(response.text, 'lxml') # Remove unwanted elements for tag in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'ads', '.ad', '.advertisement']): tag.decompose() # Extract main content content = "" for tag in soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li']): text = tag.get_text().strip() if text and len(text) > 20: # Filter short fragments content += text + "\n\n" content = content.strip() if max_content_length and len(content) > max_content_length: content = content[:max_content_length] return content async def _extract_with_browser(self, url: str, max_content_length: Optional[int]) -> str: """Browser-based content extraction for dynamic sites""" async with async_playwright() as p: browser = await p.chromium.launch(headless=True) try: context = await browser.new_context( user_agent=self.ua.random, viewport={'width': 1920, 'height': 1080} ) page = await context.new_page() await page.goto(url, wait_until='networkidle') await page.wait_for_timeout(2000) # Wait for dynamic content # Extract readable text content content = await page.evaluate(""" () => { // Remove unwanted elements const elements = document.querySelectorAll('script, style, nav, header, footer, aside, .ad, .advertisement'); elements.forEach(el => el.remove()); // Extract main content const contentSelectors = ['main', 'article', '.content', '.post', '.entry', '#content', '#main']; let content = ''; for (const selector of contentSelectors) { const element = document.querySelector(selector); if (element) { content = element.textContent.trim(); break; } } // Fallback to body text if (!content) { content = document.body.textContent.trim(); } return content; } """) if max_content_length and len(content) > max_content_length: content = content[:max_content_length] return content finally: await browser.close() def _assess_result_quality(self, results: List[SearchResult], query: str) -> float: """Assess search result quality (0.0 to 1.0)""" if not results: return 0.0 # Simple quality metrics query_words = set(query.lower().split()) total_score = 0.0 for result in results: title_words = set(result.title.lower().split()) desc_words = set(result.description.lower().split()) # Relevance score based on word overlap relevance = len(query_words & (title_words | desc_words)) / len(query_words) total_score += relevance return min(total_score / len(results), 1.0) def _is_meaningful_content(self, content: str) -> bool: """Check if extracted content is meaningful""" if len(content) < 100: return False # Check for common bot detection messages bot_indicators = ['captcha', 'blocked', 'access denied', 'robot', 'verification', '403', 'forbidden'] content_lower = content.lower() for indicator in bot_indicators: if indicator in content_lower: return False return True def _result_to_dict(self, result: SearchResult) -> Dict[str, Any]: """Convert SearchResult to dictionary""" return { "title": result.title, "url": result.url, "description": result.description, "full_content": result.full_content, "content_preview": result.content_preview, "word_count": result.word_count, "timestamp": result.timestamp, "fetch_status": result.fetch_status, "error": result.error }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/geosp/mcp-mixsearch'

If you have feedback or need assistance with the MCP directory API, please join our Discord server