Skip to main content
Glama
mcp_protocol.py9.7 kB
""" MCP Protocol Handler - Core implementation of Model Context Protocol """ import json import uuid import logging from typing import Dict, Any, List, Optional, Callable, Awaitable from datetime import datetime from models import ( MCPRequest, MCPResponse, MCPNotification, MCPError, ToolDefinition, InitializeResult, ServerCapabilities, ServerInfo ) logger = logging.getLogger(__name__) class MCPProtocolHandler: """ Handles MCP protocol message processing and tool management """ def __init__(self): self.tools: Dict[str, Dict[str, Any]] = {} self.initialized = False self.client_info: Optional[Dict[str, Any]] = None self.session_id = str(uuid.uuid4()) logger.info(f"MCP Protocol Handler initialized with session: {self.session_id}") def register_tool( self, name: str, description: str, input_schema: Dict[str, Any], handler: Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]] ) -> None: """ Register a tool with the MCP server Args: name: Tool name description: Tool description input_schema: JSON schema for tool input handler: Async function to execute the tool """ self.tools[name] = { "definition": ToolDefinition( name=name, description=description, inputSchema=input_schema ), "handler": handler } logger.info(f"Registered tool: {name}") async def handle_message(self, message_str: str) -> str: """ Process incoming MCP message and return response Args: message_str: JSON-RPC message string Returns: JSON-RPC response string """ try: message = json.loads(message_str) # Validate JSON-RPC version if message.get("jsonrpc") != "2.0": return self._error_response( message.get("id"), -32600, "Invalid JSON-RPC version" ) # Check if it's a request or notification if "id" in message: # Request - needs response return await self._handle_request(message) else: # Notification - no response needed await self._handle_notification(message) return "" except json.JSONDecodeError as e: logger.error(f"Invalid JSON: {e}") return self._error_response(None, -32700, "Parse error") except Exception as e: logger.error(f"Error handling message: {e}", exc_info=True) return self._error_response(None, -32603, f"Internal error: {str(e)}") async def _handle_request(self, message: Dict[str, Any]) -> str: """Handle JSON-RPC request""" request_id = message.get("id") method = message.get("method") params = message.get("params", {}) logger.debug(f"Handling request: {method} (id: {request_id})") try: # Route to appropriate handler if method == "initialize": result = await self._handle_initialize(params) elif method == "initialized": # Client confirmation of initialization return "" elif method == "tools/list": result = await self._handle_tools_list(params) elif method == "tools/call": result = await self._handle_tools_call(params) elif method == "ping": result = {"status": "pong", "timestamp": datetime.utcnow().isoformat()} else: return self._error_response( request_id, -32601, f"Method not found: {method}" ) # Build success response response = MCPResponse( id=request_id, result=result ) return response.model_dump_json() except Exception as e: logger.error(f"Error executing method {method}: {e}", exc_info=True) return self._error_response( request_id, -32603, f"Internal error: {str(e)}" ) async def _handle_notification(self, message: Dict[str, Any]) -> None: """Handle JSON-RPC notification (no response)""" method = message.get("method") params = message.get("params", {}) logger.debug(f"Handling notification: {method}") if method == "notifications/cancelled": # Handle cancellation operation_id = params.get("id") logger.info(f"Operation cancelled: {operation_id}") elif method == "notifications/progress": # Handle progress update logger.debug(f"Progress update: {params}") async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle initialize request Returns server capabilities and info """ self.client_info = params self.initialized = True logger.info(f"Client initialized: {params.get('clientInfo', {})}") result = InitializeResult( protocolVersion="2024-11-05", capabilities=ServerCapabilities( tools={"list": True, "call": True}, resources={}, prompts={}, logging={} ), serverInfo=ServerInfo( name="mcp-sse-server-python", version="1.0.0" ) ) return result.model_dump() async def _handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle tools/list request Returns list of available tools """ if not self.initialized: raise Exception("Server not initialized") tools_list = [ tool["definition"].model_dump() for tool in self.tools.values() ] logger.debug(f"Listing {len(tools_list)} tools") return { "tools": tools_list } async def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle tools/call request Executes the requested tool and returns result """ if not self.initialized: raise Exception("Server not initialized") tool_name = params.get("name") tool_params = params.get("arguments", {}) if tool_name not in self.tools: raise Exception(f"Tool not found: {tool_name}") logger.info(f"Calling tool: {tool_name}") # Execute tool handler tool = self.tools[tool_name] start_time = datetime.utcnow() try: result = await tool["handler"](tool_params) execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000 logger.info(f"Tool {tool_name} completed in {execution_time:.2f}ms") return { "content": [ { "type": "text", "text": json.dumps(result, indent=2, default=str) } ], "isError": False } except Exception as e: execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000 logger.error(f"Tool {tool_name} failed after {execution_time:.2f}ms: {e}", exc_info=True) return { "content": [ { "type": "text", "text": f"Error executing tool: {str(e)}" } ], "isError": True } def _error_response( self, request_id: Optional[Any], code: int, message: str, data: Optional[Dict[str, Any]] = None ) -> str: """ Build JSON-RPC error response Args: request_id: Request ID (can be None) code: Error code message: Error message data: Additional error data Returns: JSON-RPC error response string """ error = MCPError(code=code, message=message, data=data) response = MCPResponse( id=request_id if request_id is not None else 0, error=error ) return response.model_dump_json() def create_notification(self, method: str, params: Dict[str, Any]) -> str: """ Create a notification message (server to client) Args: method: Notification method name params: Notification parameters Returns: JSON-RPC notification string """ notification = MCPNotification( method=method, params=params ) return notification.model_dump_json() def get_tool_list(self) -> List[Dict[str, Any]]: """Get list of registered tools""" return [ tool["definition"].model_dump() for tool in self.tools.values() ] def get_server_info(self) -> Dict[str, Any]: """Get server information""" return { "name": "mcp-sse-server-python", "version": "1.0.0", "session_id": self.session_id, "initialized": self.initialized, "tools_count": len(self.tools), "client_info": self.client_info }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nguyenxtan/mcpwn8n'

If you have feedback or need assistance with the MCP directory API, please join our Discord server