MCP DuckDuckGo Search Plugin

  • mcp_duckduckgo
""" Search functionality for the DuckDuckGo search plugin. """ import logging from typing import Dict, Any import urllib.parse import httpx from mcp.server.fastmcp import Context from bs4 import BeautifulSoup # Configure logging logger = logging.getLogger("mcp_duckduckgo.search") def extract_domain(url: str) -> str: """ Extract the domain name from a URL. Args: url: The URL to extract the domain from Returns: The domain name """ try: parsed_url = urllib.parse.urlparse(url) domain = parsed_url.netloc return domain except Exception as e: logger.error(f"Error extracting domain from URL {url}: {e}") return "" async def duckduckgo_search(params: Dict[str, Any], ctx: Context) -> Dict[str, Any]: """ Perform a web search using DuckDuckGo API. Args: params: Dictionary containing search parameters ctx: MCP context object providing access to lifespan resources Returns: Dictionary with search results """ query = params.get("query") count = params.get("count", 10) offset = params.get("offset", 0) page = params.get("page", 1) if not query: logger.error("Query parameter is required") raise ValueError("Query parameter is required") logger.info(f"Searching DuckDuckGo for: {query}") # We'll use the DuckDuckGo Lite API endpoint which doesn't require an API key # This is for demonstration purposes. For production, consider using a proper search API url = "https://lite.duckduckgo.com/lite/" # Create a new HTTP client if lifespan_context is not available http_client = None close_client = False try: # Try to get the HTTP client from the lifespan context if hasattr(ctx, 'lifespan_context') and 'http_client' in ctx.lifespan_context: logger.info("Using HTTP client from lifespan context") http_client = ctx.lifespan_context["http_client"] else: # Create a new HTTP client if not available in the context logger.info("Creating new HTTP client") http_client = httpx.AsyncClient( timeout=10.0, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", } ) close_client = True # Log the search operation if hasattr(ctx, 'info'): await ctx.info(f"Searching for: {query} (page {page})") response = await http_client.post( url, data={ "q": query, "kl": "wt-wt", # No region localization "s": offset, # Start index for pagination }, timeout=10.0, ) response.raise_for_status() # Log the response status and content length logger.info(f"Response status: {response.status_code}, Content length: {len(response.text)}") # Parse the HTML response to extract search results # Note: This is a simplified implementation and might break if DuckDuckGo changes their HTML structure # For a production service, consider using a more robust solution soup = BeautifulSoup(response.text, "html.parser") # Log the HTML structure to understand what we're working with logger.info(f"HTML title: {soup.title.string if soup.title else 'No title'}") # Log all available table classes to see what's in the response tables = soup.find_all("table") logger.info(f"Found {len(tables)} tables in the response") for i, table in enumerate(tables): logger.info(f"Table {i} class: {table.get('class', 'No class')}") # Find all result rows in the HTML result_rows = soup.find_all("tr", class_="result-link") result_snippets = soup.find_all("tr", class_="result-snippet") logger.info(f"Found {len(result_rows)} result rows and {len(result_snippets)} result snippets") # If we didn't find any results with the expected classes, try to find links in a different way if len(result_rows) == 0: logger.info("No results found with expected classes, trying alternative parsing") # Try to find all links in the document all_links = soup.find_all("a") logger.info(f"Found {len(all_links)} links in the document") # Log the first few links to see what we're working with for i, link in enumerate(all_links[:5]): logger.info(f"Link {i}: text='{link.text.strip()}', href='{link.get('href', '')}'") total_results = len(result_rows) # Report progress to the client if the method is available if hasattr(ctx, 'report_progress'): await ctx.report_progress(0, total_results) results = [] # Extract only the requested number of results starting from the offset for i in range(min(count, len(result_rows))): if offset + i >= len(result_rows): break title_elem = result_rows[offset + i].find("a") if not title_elem: continue title = title_elem.text.strip() url = title_elem.get("href", "") domain = extract_domain(url) description = "" if offset + i < len(result_snippets): description = result_snippets[offset + i].text.strip() # Create a dictionary directly instead of using SearchResult model results.append({ "title": title, "url": url, "description": description, "published_date": None, "domain": domain }) # Update progress if the method is available if hasattr(ctx, 'report_progress'): await ctx.report_progress(i + 1, total_results) # If we still don't have results, try an alternative approach if len(results) == 0: logger.info("No results found with standard parsing, trying alternative approach") # Try to find results in a different way - this is a fallback approach # Look for any links that might be search results all_links = soup.find_all("a") # Filter links that look like search results (not navigation links) potential_results = [link for link in all_links if link.get('href') and not link.get('href').startswith('#') and not link.get('href').startswith('/')] logger.info(f"Found {len(potential_results)} potential result links") # Take up to 'count' results for i, link in enumerate(potential_results[:count]): if i >= count: break title = link.text.strip() url = link.get('href', '') domain = extract_domain(url) # Try to find a description - look for text in the parent or next sibling description = "" parent = link.parent if parent and parent.text and len(parent.text.strip()) > len(title): description = parent.text.strip() if not description and link.next_sibling: description = link.next_sibling.text.strip() if hasattr(link.next_sibling, 'text') else "" results.append({ "title": title, "url": url, "description": description, "published_date": None, "domain": domain }) total_results = len(potential_results) # Calculate more accurate total_results estimation # DuckDuckGo doesn't provide exact total counts, but we can estimate # based on pagination and number of results per page estimated_total = max(total_results, offset + len(results)) # For pagination purposes, we should always claim there are more results # unless we received fewer than requested if len(results) >= count: estimated_total = max(estimated_total, offset + count + 1) return { "results": results, "total_results": estimated_total, } except httpx.HTTPStatusError as e: logger.error(f"HTTP error occurred: {e}") if hasattr(ctx, 'error'): await ctx.error(f"HTTP error: {str(e)}") raise ValueError(f"HTTP error: {str(e)}") except httpx.RequestError as e: logger.error(f"Request error occurred: {e}") if hasattr(ctx, 'error'): await ctx.error(f"Request error: {str(e)}") raise ValueError(f"Request error: {str(e)}") except Exception as e: logger.error(f"An unexpected error occurred: {e}") if hasattr(ctx, 'error'): await ctx.error(f"Unexpected error: {str(e)}") raise ValueError(f"Unexpected error: {str(e)}") finally: # Close the HTTP client if we created it if close_client and http_client: await http_client.aclose()