mcp-google

by Ejb503
Verified
  • src
from mcp.server.fastmcp import FastMCP import httpx import time import asyncio from typing import Optional, Dict, List, Any, Tuple from dataclasses import dataclass from enum import Enum import os import sys import io api_key = os.getenv("BRAVE_API_KEY") if not api_key: raise ValueError("BRAVE_API_KEY environment variable required") class RateLimitError(Exception): pass @dataclass class RateLimit: per_second: int = 1 per_month: int = 2000 _requests: Dict[str, int] = None _last_reset: float = 0.0 def __post_init__(self): self._requests = {"second": 0, "month": 0} self._last_reset = time.time() def check(self): now = time.time() if now - self._last_reset > 1: self._requests["second"] = 0 self._last_reset = now if (self._requests["second"] >= self.per_second or self._requests["month"] >= self.per_month): raise RateLimitError("Rate limit exceeded") self._requests["second"] += 1 self._requests["month"] += 1 class BraveSearchServer: def __init__(self, api_key: str): # Configure stdout for UTF-8 if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') self.mcp = FastMCP( "brave-search", dependencies=["httpx", "asyncio"] ) self.api_key = api_key self.base_url = "https://api.search.brave.com/res/v1" self.rate_limit = RateLimit() self._client = None self._setup_tools() def get_client(self): if self._client is None or self._client.is_closed: self._client = httpx.AsyncClient( headers={ "X-Subscription-Token": self.api_key, "Accept": "application/json", "Accept-Encoding": "gzip" }, timeout=30.0 ) return self._client async def _get_web_results(self, query: str, min_results: int) -> List[Dict]: """Fetch web results with pagination until minimum count is reached""" client = self.get_client() self.rate_limit.check() try: # Make a single request with the maximum allowed count response = await client.get( f"{self.base_url}/web/search", params={ "q": query, "count": min_results } ) response.raise_for_status() data = response.json() results = data.get("web", {}).get("results", []) return results except httpx.HTTPStatusError as e: if e.response.status_code == 422: # If we get a 422, try with a smaller count response = await client.get( f"{self.base_url}/web/search", params={ "q": query, "count": 10 # Fall back to smaller count } ) response.raise_for_status() data = response.json() return data.get("web", {}).get("results", []) raise # Re-raise other HTTP errors def _format_web_results(self, data: Dict, min_results: int = 10) -> str: """Format web search results with enhanced information""" results = [] web_results = data.get("web", {}).get("results", []) for result in web_results[:max(min_results, len(web_results))]: # Strip or replace any potential Unicode characters title = result.get('title', 'N/A').encode('ascii', 'replace').decode() desc = result.get('description', 'N/A').encode('ascii', 'replace').decode() formatted_result = [ f"Title: {title}", f"Description: {desc}", f"URL: {result.get('url', 'N/A')}" ] # Add additional metadata if available if "meta_url" in result: formatted_result.append(f"Source: {result['meta_url']}") if "age" in result: formatted_result.append(f"Age: {result['age']}") if "language" in result: formatted_result.append(f"Language: {result['language']}") results.append("\n".join(formatted_result)) return "\n\n".join(results) def _setup_tools(self): @self.mcp.tool() async def brave_web_search( query: str, count: Optional[int] = 20 ) -> str: """Execute web search using Brave Search API with improved results Args: query: Search terms count: Desired number of results (10-20) """ min_results = max(10, min(count, 20)) # Ensure between 10 and 20 all_results = await self._get_web_results(query, min_results) if not all_results: return "No results found for the query." formatted_results = [] for result in all_results[:min_results]: formatted_result = [ f"Title: {result.get('title', 'N/A')}", f"Description: {result.get('description', 'N/A')}", f"URL: {result.get('url', 'N/A')}" ] # Include additional context if available if result.get('extra_snippets'): formatted_result.append("Additional Context:") formatted_result.extend([f"- {snippet}" for snippet in result['extra_snippets'][:2]]) formatted_results.append("\n".join(formatted_result)) return "\n\n".join(formatted_results) @self.mcp.tool() async def brave_local_search( query: str, count: Optional[int] = 20 # Changed default from 5 to 20 ) -> str: """Search for local businesses and places Args: query: Location terms count: Results (1-20 """ self.rate_limit.check() # Initial location search params = { "q": query, "search_lang": "en", "result_filter": "locations", "count": 20 # Always request maximum results } client = self.get_client() response = await client.get( f"{self.base_url}/web/search", params=params ) response.raise_for_status() data = response.json() location_ids = self._extract_location_ids(data) if not location_ids: # If no local results found, fallback to web search # with minimum 10 results return await brave_web_search(query, 20) # If we have less than 10 location IDs, try to get more offset = 0 while len(location_ids) < 10 and offset < 40: offset += 20 additional_response = await client.get( f"{self.base_url}/web/search", params={ "q": query, "search_lang": "en", "result_filter": "locations", "count": 20, "offset": offset } ) additional_data = additional_response.json() location_ids.extend(self._extract_location_ids(additional_data)) # Get details for at least 10 locations pois, descriptions = await self._get_location_details( location_ids[:max(10, len(location_ids))] ) return self._format_local_results(pois, descriptions) async def _get_location_details( self, ids: List[str] ) -> Tuple[Dict[str, Any], Dict[str, Any]]: """Fetch POI and description data for locations""" client = self.get_client() pois_response, desc_response = await asyncio.gather( client.get( f"{self.base_url}/local/pois", params={"ids": ids} ), client.get( f"{self.base_url}/local/descriptions", params={"ids": ids} ) ) return ( pois_response.json(), desc_response.json() ) def _extract_location_ids(self, data: Dict) -> List[str]: """Extract location IDs from search response""" return [ result["id"] for result in data.get("locations", {}).get("results", []) if "id" in result ] def _format_local_results( self, pois: Dict[str, Any], descriptions: Dict[str, Any] ) -> str: """Format local search results with details""" results = [] for poi in pois.get("results", []): location = { "name": poi.get("name", "N/A"), "address": self._format_address(poi.get("address", {})), "phone": poi.get("phone", "N/A"), "rating": self._format_rating(poi.get("rating", {})), "price": poi.get("priceRange", "N/A"), "hours": ", ".join(poi.get("openingHours", [])) or "N/A", "description": descriptions.get("descriptions", {}).get( poi["id"], "No description available" ) } results.append( f"Name: {location['name']}\n" f"Address: {location['address']}\n" f"Phone: {location['phone']}\n" f"Rating: {location['rating']}\n" f"Price Range: {location['price']}\n" f"Hours: {location['hours']}\n" f"Description: {location['description']}" ) return "\n---\n".join(results) or "No local results found" def _format_address(self, addr: Dict) -> str: """Format address components""" components = [ addr.get("streetAddress", ""), addr.get("addressLocality", ""), addr.get("addressRegion", ""), addr.get("postalCode", "") ] return ", ".join(filter(None, components)) or "N/A" def _format_rating(self, rating: Dict) -> str: """Format rating information""" if not rating: return "N/A" # Use ASCII star (*) instead of Unicode star stars = "*" * int(float(rating.get('ratingValue', 0))) return f"{rating.get('ratingValue', 'N/A')} {stars} ({rating.get('ratingCount', 0)} reviews)" def run(self): """Start the MCP server""" self.mcp.run() if __name__ == "__main__": server = BraveSearchServer(api_key) server.run()