Skip to main content
Glama
mcp_protocol_handler.py20 kB
""" MCPProtocolHandler module for mcpware Handles MCP protocol operations and message routing """ import logging from typing import Dict, List, Any, Optional import asyncio import uuid import json from .config import ConfigurationManager, BackendMCPConfig from .backend_forwarder import BackendForwarder logger = logging.getLogger(__name__) class MCPProtocolHandler: """Handles MCP protocol operations""" def __init__(self, config_manager: ConfigurationManager, backend_forwarder: BackendForwarder): self.config_manager = config_manager self.backend_forwarder = backend_forwarder self._backend_capabilities = {} self._backend_tools = {} # Cache tools by backend # Track request sessions self._request_sessions: Dict[str, str] = {} # request_id -> session_id async def handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle initialize request""" # Initialize all backends and cache their capabilities for backend_name, backend_config in self.config_manager.backends.items(): try: # Get backend capabilities init_request = { "jsonrpc": "2.0", "method": "initialize", "params": params, "id": f"init-{backend_name}" } backend_response = await self.backend_forwarder.forward_request( backend_name, init_request ) if "result" in backend_response: capabilities = backend_response["result"].get("capabilities", {}) self._backend_capabilities[backend_name] = capabilities except Exception as e: logger.error(f"Failed to initialize backend {backend_name}: {e}") # Return gateway capabilities with only our single tool return { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} # We support tools }, "serverInfo": { "name": "mcpware", "version": "1.0.0", "vendor": "MCP Gateway" } } async def handle_initialized_notification(self) -> None: """Handle initialized notification by forwarding to all backends""" logger.info("Forwarding initialized notification to all backends") # Send initialized notification to each backend for backend_name in self.config_manager.backends.keys(): try: # Send notification (no id, no response expected) notification = { "jsonrpc": "2.0", "method": "notifications/initialized", "params": {} } # Forward without expecting response await self.backend_forwarder.send_notification(backend_name, notification) logger.info(f"Sent initialized notification to backend {backend_name}") except Exception as e: logger.error(f"Failed to send initialized notification to backend {backend_name}: {e}") async def handle_list_tools(self, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Handle tools/list request - return only the gateway's routing tool""" # Define the gateway's single routing tool use_tool = { "name": "use_tool", "description": "Route a tool call to a specific backend MCP server", "inputSchema": { "type": "object", "properties": { "backend_server": { "type": "string", "description": f"The backend server to use. Available servers: {', '.join(self.config_manager.backends.keys())}", "enum": list(self.config_manager.backends.keys()) }, "server_tool": { "type": "string", "description": "The name of the tool to call on the backend server" }, "tool_arguments": { "type": "object", "description": "Arguments to pass to the backend server's tool", "additionalProperties": True } }, "required": ["backend_server", "server_tool"], "additionalProperties": False } } # Optionally add a discovery tool discover_tools = { "name": "discover_backend_tools", "description": "Discover available tools on backend MCP servers", "inputSchema": { "type": "object", "properties": { "backend_server": { "type": "string", "description": f"The backend server to query for available tools. Available servers: {', '.join(self.config_manager.backends.keys())}", "enum": list(self.config_manager.backends.keys()) } }, "required": ["backend_server"], "additionalProperties": False }, "outputSchema": { "type": "object", "properties": { "tools": { "type": "array", "description": "List of available tools from backend servers" } }, "required": ["tools"], "additionalProperties": False } } return {"tools": [use_tool, discover_tools]} async def handle_tool_call(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle tools/call request""" tool_name = params.get("name", "") arguments = params.get("arguments", {}) # Get or create session ID for this request context request_id = params.get("_request_id", str(uuid.uuid4())) session_id = self._get_or_create_session(request_id) if tool_name == "use_tool": return await self._handle_use_tool(arguments, session_id) elif tool_name == "discover_backend_tools": return await self._handle_discover_tools(arguments) else: return self._create_error_response(f"Unknown tool: {tool_name}") def _get_or_create_session(self, request_id: str) -> str: """Get or create a session ID for the request""" if request_id not in self._request_sessions: # Create a new session ID session_id = str(uuid.uuid4()) self._request_sessions[request_id] = session_id logger.info(f"Created new session {session_id} for request {request_id}") return self._request_sessions[request_id] async def _handle_use_tool(self, arguments: Dict[str, Any], session_id: str) -> Dict[str, Any]: """Handle the use_tool routing call with security validation""" backend_server = arguments.get("backend_server") server_tool = arguments.get("server_tool") tool_arguments = arguments.get("tool_arguments", {}) # Validate backend server if not backend_server: return self._create_error_response("Missing required parameter: backend_server") if backend_server not in self.config_manager.backends: available = ", ".join(self.config_manager.backends.keys()) return self._create_error_response( f"Unknown backend server: {backend_server}. Available servers: {available}" ) # Validate tool name if not server_tool: return self._create_error_response("Missing required parameter: server_tool") try: # Forward the tool call to the backend tool_request = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": server_tool, "arguments": tool_arguments }, "id": f"tool-call-{backend_server}-{server_tool}" } response = await self.backend_forwarder.forward_request( backend_server, tool_request ) if "result" in response: # Pass through the backend response as-is return response["result"] elif "error" in response: return self._create_error_response( f"Backend error from {backend_server}: {response['error'].get('message', 'Unknown error')}" ) else: return self._create_error_response("Invalid response from backend") except Exception as e: logger.error(f"Error calling tool {server_tool} on {backend_server}: {e}") return self._create_error_response(f"Error: {str(e)}") async def _handle_discover_tools(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle the discover_backend_tools call""" backend_server = arguments.get("backend_server") # backend_server is now required if not backend_server: return self._create_error_response("Missing required parameter: backend_server") if backend_server not in self.config_manager.backends: available = ", ".join(self.config_manager.backends.keys()) return self._create_error_response( f"Unknown backend server: {backend_server}. Available servers: {available}" ) return await self._discover_single_backend_tools(backend_server) async def _get_backend_tools(self, backend_name: str) -> List[Dict[str, Any]]: """Get tools from a specific backend, returns empty list on error""" try: # Request tools from backend list_request = { "jsonrpc": "2.0", "method": "tools/list", "id": f"list-tools-{backend_name}" } response = await self.backend_forwarder.forward_request( backend_name, list_request ) if "result" in response and "tools" in response["result"]: tools = response["result"]["tools"] # Cache tools self._backend_tools[backend_name] = tools # Return tools as-is without backend prefixing clean_tools = [] for tool in tools: clean_tool = { "name": tool['name'], "description": tool.get('description', 'No description'), "inputSchema": tool.get("inputSchema", { "type": "object", "properties": {}, "additionalProperties": True }) } clean_tools.append(clean_tool) return clean_tools else: return [] except Exception as e: logger.error(f"Error discovering tools from {backend_name}: {e}") return [] async def _discover_single_backend_tools(self, backend_name: str) -> Dict[str, Any]: """Discover tools for a single backend""" if backend_name not in self.config_manager.backends: return self._create_error_response(f"Unknown backend server: {backend_name}") tools = await self._get_backend_tools(backend_name) if not tools: return self._create_error_response(f"Failed to list tools from {backend_name}") # Return with both structured data and text content return { "content": [{ "type": "text", "text": json.dumps({"tools": tools}) }], "structuredContent": { "tools": tools } } async def handle_list_resources(self) -> Dict[str, List[Dict[str, Any]]]: """Handle resources/list request - aggregate resources from all backends""" all_resources = [] # Get resources from each backend for backend_name in self.config_manager.backends.keys(): if backend_name not in self._backend_capabilities: continue capabilities = self._backend_capabilities[backend_name] if "resources" not in capabilities: continue try: # Request resources from backend list_request = { "jsonrpc": "2.0", "method": "resources/list", "id": f"list-resources-{backend_name}" } response = await self.backend_forwarder.forward_request( backend_name, list_request ) if "result" in response and "resources" in response["result"]: backend_resources = response["result"]["resources"] # Prefix resource URIs with backend name for resource in backend_resources: prefixed_resource = { "uri": f"{backend_name}:{resource['uri']}", "name": f"[{backend_name}] {resource.get('name', '')}", "description": resource.get("description", ""), "mimeType": resource.get("mimeType", "text/plain") } all_resources.append(prefixed_resource) except Exception as e: logger.error(f"Failed to list resources from backend {backend_name}: {e}") return {"resources": all_resources} async def handle_read_resource(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle resources/read request - forward to appropriate backend""" uri = params.get("uri", "") # Extract backend name from prefixed URI if ":" not in uri: return self._create_error_response(f"Invalid resource URI format: {uri}") backend_name, original_uri = uri.split(":", 1) if backend_name not in self.config_manager.backends: return self._create_error_response(f"Unknown backend: {backend_name}") try: # Forward the resource read to the backend read_request = { "jsonrpc": "2.0", "method": "resources/read", "params": { "uri": original_uri }, "id": f"read-resource-{backend_name}" } response = await self.backend_forwarder.forward_request( backend_name, read_request ) if "result" in response: return response["result"] elif "error" in response: return self._create_error_response( f"Backend error: {response['error'].get('message', 'Unknown error')}" ) else: return self._create_error_response("Invalid response from backend") except Exception as e: logger.error(f"Error reading resource {uri}: {e}") return self._create_error_response(str(e)) async def handle_list_prompts(self) -> Dict[str, List[Dict[str, Any]]]: """Handle prompts/list request - aggregate prompts from all backends""" all_prompts = [] # Get prompts from each backend for backend_name in self.config_manager.backends.keys(): if backend_name not in self._backend_capabilities: continue capabilities = self._backend_capabilities[backend_name] if "prompts" not in capabilities: continue try: # Request prompts from backend list_request = { "jsonrpc": "2.0", "method": "prompts/list", "id": f"list-prompts-{backend_name}" } response = await self.backend_forwarder.forward_request( backend_name, list_request ) if "result" in response and "prompts" in response["result"]: backend_prompts = response["result"]["prompts"] # Prefix prompt names with backend name for prompt in backend_prompts: prefixed_prompt = { "name": f"{backend_name}_{prompt['name']}", "description": f"[{backend_name}] {prompt.get('description', '')}", "arguments": prompt.get("arguments", []) } all_prompts.append(prefixed_prompt) except Exception as e: logger.error(f"Failed to list prompts from backend {backend_name}: {e}") return {"prompts": all_prompts} async def handle_get_prompt(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle prompts/get request - forward to appropriate backend""" prompt_name = params.get("name", "") arguments = params.get("arguments", {}) # Extract backend name from prefixed prompt name parts = prompt_name.split("_", 1) if len(parts) < 2: return self._create_error_response(f"Invalid prompt name format: {prompt_name}") backend_name, original_prompt_name = parts if backend_name not in self.config_manager.backends: return self._create_error_response(f"Unknown backend: {backend_name}") try: # Forward the prompt get to the backend get_request = { "jsonrpc": "2.0", "method": "prompts/get", "params": { "name": original_prompt_name, "arguments": arguments }, "id": f"get-prompt-{backend_name}" } response = await self.backend_forwarder.forward_request( backend_name, get_request ) if "result" in response: return response["result"] elif "error" in response: return self._create_error_response( f"Backend error: {response['error'].get('message', 'Unknown error')}" ) else: return self._create_error_response("Invalid response from backend") except Exception as e: logger.error(f"Error getting prompt {prompt_name}: {e}") return self._create_error_response(str(e)) def _create_error_response(self, error_message: str) -> Dict[str, Any]: """Create an error response""" return { "content": [{ "type": "text", "text": f"Error: {error_message}" }], "isError": True }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/delexw/mcpware'

If you have feedback or need assistance with the MCP directory API, please join our Discord server