Skip to main content
Glama
server.py13.6 kB
import os import logging from collections.abc import Sequence from typing import Any, Dict, Union from tavily import AsyncTavilyClient from dotenv import load_dotenv from mcp.server import Server from mcp.types import ( Resource, Tool, TextContent, ImageContent, ) from pydantic import AnyUrl import asyncio import aiohttp load_dotenv() # create logger logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("metasearch-mcp") # API key for Tavily API API_KEY = os.getenv("TAVILY_API_KEY", "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") if not API_KEY: logger.error("TAVILY_API_KEY environment variable not found") raise ValueError("TAVILY_API_KEY environment variable required") # SearXNG URL SEARXNG_URL = os.getenv("SEARXNG_URL", "http://searxng:8080") logger.info(f"Using SearXNG URL: {SEARXNG_URL}") # Prepare the server app = Server("metasearch-mcp") @app.list_resources() async def list_resources() -> list[Resource]: logger.info("Listing available resources") resources = [ Resource( uri=AnyUrl("websearch://query=`who is current Prime Minister of Japan 2024`,search_depth=`basic`"), name="Web Search about `who is current Prime Minister of Japan 2024`.\ There are two types of search_depth: 'basic' and 'advanced', with 'advanced' searching deeper.'", mimeType="application/json", description="General web search using Tavily API" ), Resource( uri=AnyUrl("imagesearch://query=`Tokyo skyline`,limit=5"), name="Image Search for `Tokyo skyline` with limit of 5 results", mimeType="application/json", description="Image search using SearXNG API" ) ] logger.debug(f"Returning resources with full content: {resources}") return resources @app.list_tools() async def list_tools() -> list[Tool]: """Return a list of available tools""" logger.info("Listing available tools") tools = [ Tool( name="search", description="Search the web using Tavily API", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query" }, "search_depth": { "type": "string", "description": "Search depth (basic or advanced)", "enum": ["basic", "advanced"] } }, "required": ["query"] } ), Tool( name="image_search", description="Search for images using SearXNG API", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Image search query" }, "limit": { "type": "integer", "description": "Maximum number of images to return (default: 5)", "default": 5 } }, "required": ["query"] } ) ] logger.debug(f"Returning tools: {tools}") return tools async def process_search_results(results: dict) -> TextContent: """Process search results and return TextContent""" if not results: logger.warning("Empty search results received") return TextContent( type="text", text="No results were found for your query. Please try a different search term." ) response_text = [] if 'answer' in results and results['answer']: logger.info("Search successful - Answer generated") response_text.append("AI Answer:") response_text.append(results['answer']) response_text.append("\n") if 'results' in results and results['results']: logger.info("Search successful - Results available") response_text.append("\nSearch Results:") for i, result in enumerate(results['results'], 1): response_text.append(f"\n{i}. {result.get('title', 'Title not found')}") response_text.append(f"URL: {result.get('url', 'URL not found')}") response_text.append(f"Summary: {result.get('snippet', 'Summary not found')}\n") if response_text: return TextContent(type="text", text="\n".join(response_text)) logger.warning("No answer or results found in search results") return TextContent( type="text", text="The search was completed but no relevant information was found. Please try refining your query." ) async def perform_searxng_image_search(query: str, limit: int = 5) -> Dict: """Perform image search using SearXNG API""" logger.info(f"Performing SearXNG image search for: '{query}' with limit {limit}") params = { "q": query, "format": "json", "categories": "images", "language": "en", "pageno": 1, "time_range": "", "safesearch": 1, "theme": "simple" } try: async with aiohttp.ClientSession() as session: async with session.get(f"{SEARXNG_URL}/search", params=params, timeout=30) as response: if response.status != 200: logger.error(f"SearXNG API returned status code {response.status}") return {"error": f"SearXNG API returned status code {response.status}"} data = await response.json() logger.debug(f"SearXNG raw response: {data}") # Extract only relevant image results results = [] if "results" in data: for item in data["results"][:limit]: result = { "title": item.get("title", "No title"), "url": item.get("url", ""), "img_src": item.get("img_src", ""), "source": item.get("source", "Unknown source"), "thumbnail": item.get("thumbnail", ""), "height": item.get("img_height", 0), "width": item.get("img_width", 0) } results.append(result) return {"query": query, "results": results} except aiohttp.ClientError as e: logger.error(f"Error connecting to SearXNG: {str(e)}") return {"error": f"Connection error: {str(e)}"} except asyncio.TimeoutError: logger.error("SearXNG request timed out") return {"error": "Request timed out"} except Exception as e: logger.error(f"Error during SearXNG image search: {str(e)}", exc_info=True) return {"error": f"Error during search: {str(e)}"} async def process_image_search_results(results: Dict) -> Sequence[Union[TextContent, ImageContent]]: """Process image search results and return a sequence of content items""" contents = [] if "error" in results: return [TextContent( type="text", text=f"Error in image search: {results['error']}" )] if not results.get("results"): return [TextContent( type="text", text="No image results were found for your query. Please try a different search term." )] # Add text summary of results summary_text = f"Found {len(results['results'])} images matching '{results['query']}':\n\n" for idx, img in enumerate(results['results'], 1): summary_text += f"{idx}. {img['title']}\n" summary_text += f" Source: {img['source']}\n" summary_text += f" URL: {img['url']}\n" if img['height'] and img['width']: summary_text += f" Size: {img['width']}x{img['height']}\n" summary_text += "\n" contents.append(TextContent(type="text", text=summary_text)) # Add actual images for img in results['results']: if img.get('img_src'): contents.append(ImageContent( type="image", url=img['img_src'], title=img['title'] )) return contents @app.call_tool() async def call_tool(name: str, arguments: Any) -> Sequence[Union[TextContent, ImageContent]]: """Calling a tool based on name""" logger.info(f"TOOL_CALL_DEBUG: Tool called - name: {name}, arguments: {arguments}") if name == "search": return await handle_tavily_search(arguments) elif name == "image_search": return await handle_searxng_image_search(arguments) else: logger.error(f"Unknown tool requested: {name}") return [TextContent( type="text", text=f"Error: Unknown tool '{name}'. Supported tools are 'search' and 'image_search'." )] async def handle_tavily_search(arguments: Dict) -> Sequence[TextContent]: """Handle Tavily web search""" if not isinstance(arguments, dict) or "query" not in arguments: logger.error(f"Invalid arguments provided: {arguments}") return [TextContent( type="text", text="Error: Invalid arguments. A 'query' parameter is required." )] try: client = AsyncTavilyClient(API_KEY) query = arguments["query"] logger.info(f"Executing Tavily search with query: '{query}'") search_task = client.search( query=query, search_depth=arguments.get("search_depth", "basic"), include_images=False, include_answer=True, max_results=3, topic="general" ) try: # Execute the search with a timeout results = await asyncio.wait_for(search_task, timeout=30.0) logger.debug(f"Raw search results: {results}") # Process the results return [await process_search_results(results)] except asyncio.TimeoutError: logger.error("Search operation timed out after 30 seconds") return [TextContent( type="text", text="The search operation timed out. Please try again with a more specific query or check your internet connection." )] except Exception as e: error_message = str(e) logger.error(f"Search failed: {error_message}", exc_info=True) # Convert error message to user-friendly format if "api_key" in error_message.lower(): return [TextContent( type="text", text="Authentication error occurred. Please check the API key configuration." )] elif "rate limit" in error_message.lower(): return [TextContent( type="text", text="Rate limit exceeded. Please wait a moment before trying again." )] else: return [TextContent( type="text", text=f"An unexpected error occurred during the search. Please try again later. Error: {error_message}" )] async def handle_searxng_image_search(arguments: Dict) -> Sequence[Union[TextContent, ImageContent]]: """Handle SearXNG image search""" if not isinstance(arguments, dict) or "query" not in arguments: logger.error(f"Invalid arguments provided for image search: {arguments}") return [TextContent( type="text", text="Error: Invalid arguments. A 'query' parameter is required for image search." )] try: query = arguments["query"] limit = int(arguments.get("limit", 5)) # Ensure limit is reasonable if limit < 1: limit = 1 elif limit > 20: limit = 20 logger.info(f"Executing SearXNG image search with query: '{query}', limit: {limit}") # Perform the search search_results = await perform_searxng_image_search(query, limit) # Process the results return await process_image_search_results(search_results) except Exception as e: error_message = str(e) logger.error(f"Image search failed: {error_message}", exc_info=True) return [TextContent( type="text", text=f"An unexpected error occurred during image search. Please try again later. Error: {error_message}" )] # start the server async def main(): logger.info("Starting metasearch server") try: from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): logger.info("Server initialized, starting main loop") await app.run( read_stream, write_stream, app.create_initialization_options() ) except Exception as e: logger.error(f"Server failed to start: {str(e)}", exc_info=True) raise def main_entry(): try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error(f"Server error: {e}", exc_info=True) raise if __name__ == "__main__": main_entry()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/YeonwooSung/metasearch-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server