metasearch-mcp

by YeonwooSung
Verified
MIT License
  • Apple
  • Linux
  • src
import os import json import logging from datetime import datetime from collections.abc import Sequence from typing import Any, Optional from tavily import AsyncTavilyClient from dotenv import load_dotenv from mcp.server import Server from mcp.types import ( Resource, Tool, TextContent, ImageContent, EmbeddedResource, EmptyResult ) from pydantic import AnyUrl import asyncio load_dotenv() # create logger logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("metasearch-mcp") # AP key for Tavily API API_KEY = os.getenv("TAVILY_API_KEY") if not API_KEY: logger.error("TAVILY_API_KEY environment variable not found") raise ValueError("TAVILY_API_KEY environment variable required") # Prepare the server app = Server("metasearch-mcp") @app.list_resources() async def list_resources() -> list[Resource]: logger.info("Listing available resources") resources = [ Resource( uri=AnyUrl("websearch://query=`who is current Prime Minister of Japan 2024`,search_depth=`basic`"), name="Web Search about `who is current Prime Minister of Japan 2024`.\ There are two types of search_depth: 'basic' and 'advanced', with 'advanced' searching deeper.'", mimeType="application/json", description="General web search using Tavily API" ) ] logger.debug(f"Returning resources with full content: {resources}") return resources @app.list_tools() async def list_tools() -> list[Tool]: """Return a list of available tools""" logger.info("Listing available tools") tools = [ Tool( name="search", description="Search the web using Tavily API", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query" }, "search_depth": { "type": "string", "description": "Search depth (basic or advanced)", "enum": ["basic", "advanced"] } }, "required": ["query"] } ) ] logger.debug(f"Returning tools: {tools}") return tools async def process_search_results(results: dict) -> TextContent: """Process search results and return TextContent""" if not results: logger.warning("Empty search results received") return TextContent( type="text", text="No results were found for your query. Please try a different search term." ) response_text = [] if 'answer' in results and results['answer']: logger.info("Search successful - Answer generated") response_text.append("AI Answer:") response_text.append(results['answer']) response_text.append("\n") if 'results' in results and results['results']: logger.info("Search successful - Results available") response_text.append("\nSearch Results:") for i, result in enumerate(results['results'], 1): response_text.append(f"\n{i}. {result.get('title', 'Title not found')}") response_text.append(f"URL: {result.get('url', 'URL not found')}") response_text.append(f"Summary: {result.get('snippet', 'Summary not found')}\n") if response_text: return TextContent(type="text", text="\n".join(response_text)) logger.warning("No answer or results found in search results") return TextContent( type="text", text="The search was completed but no relevant information was found. Please try refining your query." ) @app.call_tool() async def call_tool(name: str, arguments: Any) -> Sequence[TextContent]: """Calling the search tool""" logger.info(f"TOOL_CALL_DEBUG: Tool called - name: {name}, arguments: {arguments}") if name != "search": logger.error(f"Unknown tool requested: {name}") return [TextContent( type="text", text=f"Error: Unknown tool '{name}'. Only 'search' is supported." )] if not isinstance(arguments, dict) or "query" not in arguments: logger.error(f"Invalid arguments provided: {arguments}") return [TextContent( type="text", text="Error: Invalid arguments. A 'query' parameter is required." )] try: client = AsyncTavilyClient(API_KEY) query = arguments["query"] logger.info(f"Executing search with query: '{query}'") search_task = client.search( query=query, search_depth=arguments.get("search_depth", "basic"), include_images=False, include_answer=True, max_results=3, topic="general" ) try: # Execute the search with a timeout results = await asyncio.wait_for(search_task, timeout=30.0) logger.debug(f"Raw search results: {results}") # Process the results return [await process_search_results(results)] except asyncio.TimeoutError: logger.error("Search operation timed out after 30 seconds") return [TextContent( type="text", text="The search operation timed out. Please try again with a more specific query or check your internet connection." )] except Exception as e: error_message = str(e) logger.error(f"Search failed: {error_message}", exc_info=True) # Convert error message to user-friendly format if "api_key" in error_message.lower(): return [TextContent( type="text", text="Authentication error occurred. Please check the API key configuration." )] elif "rate limit" in error_message.lower(): return [TextContent( type="text", text="Rate limit exceeded. Please wait a moment before trying again." )] else: return [TextContent( type="text", text=f"An unexpected error occurred during the search. Please try again later. Error: {error_message}" )] # start the server async def main(): logger.info("Starting metasearch server") try: from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): logger.info("Server initialized, starting main loop") await app.run( read_stream, write_stream, app.create_initialization_options() ) except Exception as e: logger.error(f"Server failed to start: {str(e)}", exc_info=True) raise def main_entry(): try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error(f"Server error: {e}", exc_info=True) raise if __name__ == "__main__": main_entry()