Skip to main content
Glama

MCP Web Search Server

by undici77
server.py3.31 kB
"""MCP Search Server core implementation.""" import logging from typing import Optional from models import MCPMessage from search_engine import SearchEngine from archive_service import ArchiveService from handlers import ToolHandlers from config import SEARCH_ENGINES, WEB_ARCHIVES_SEARCH logger = logging.getLogger(__name__) class MCPSearchServer: """MCP server for web search across multiple engines, social media, and web archive.""" def __init__(self): self.search_engine = SearchEngine() self.archive_service = ArchiveService() self.handlers = ToolHandlers(self.search_engine, self.archive_service) logger.info("Search server initialized with %d search engines and %d web archive", len(SEARCH_ENGINES), len(WEB_ARCHIVES_SEARCH)) async def close(self): """Close all services.""" await self.search_engine.close() await self.archive_service.close() async def handle_message(self, message: MCPMessage) -> Optional[MCPMessage]: """Handle incoming RPC messages.""" try: if message.method == "initialize": return self.handlers.handle_initialize(message) elif message.method == "tools/list": return self.handlers.handle_tools_list(message) elif message.method == "tools/call": return await self._handle_tool_call(message) else: return self.handlers._create_error_response( message.id, -32601, f"Method not found: {message.method}" ) except Exception as e: logger.error(f"Error handling message: {e}", exc_info=True) return self.handlers._create_error_response( message.id, -32603, f"Internal error: {str(e)}" ) async def _handle_tool_call(self, message: MCPMessage) -> MCPMessage: """Handle a tool call.""" try: params = message.params tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name == "web_search": return await self.handlers.handle_web_search(message.id, arguments) elif tool_name == "social_search": return await self.handlers.handle_social_search(message.id, arguments) elif tool_name == "archives_search": return await self.handlers.handle_archives_search(message.id, arguments) elif tool_name == "list_engines": return await self.handlers.handle_list_engines(message.id, arguments) elif tool_name == "list_archives_services": return await self.handlers.handle_list_archives_services(message.id, arguments) elif tool_name == "clear_cache": return await self.handlers.handle_clear_cache(message.id, arguments) else: return self.handlers._create_error_response( message.id, -32602, f"Unknown tool: {tool_name}" ) except Exception as e: logger.error(f"Error executing tool: {e}", exc_info=True) return self.handlers._create_error_response( message.id, -32603, f"Error executing tool: {str(e)}" )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/undici77/MCPWebSearch'

If you have feedback or need assistance with the MCP directory API, please join our Discord server