Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
inspector_compatible_sse.py11.4 kB
""" MCP Inspector-compatible SSE implementation. This implements a proper bidirectional SSE handler that works with MCP Inspector, handling JSON-RPC messages directly over the SSE connection. """ import asyncio import json import logging from typing import Any from uuid import uuid4 from starlette.requests import Request from starlette.responses import StreamingResponse from maverick_mcp.api.server import mcp logger = logging.getLogger(__name__) class InspectorCompatibleSSE: """SSE handler that properly implements MCP protocol for Inspector.""" def __init__(self): self.sessions: dict[str, dict[str, Any]] = {} self.message_queues: dict[str, asyncio.Queue] = {} async def handle_sse(self, request: Request): """Handle SSE connection from MCP Inspector.""" session_id = str(uuid4()) logger.info(f"New Inspector SSE connection: {session_id}") # Create a message queue for this session message_queue: asyncio.Queue = asyncio.Queue() self.message_queues[session_id] = message_queue # Create a simple session state tracker session_state = { "initialized": False, "server_name": "MaverickMCP", "server_version": "1.0.0", "protocol_version": "2024-11-05", } self.sessions[session_id] = session_state async def event_generator(): """Generate SSE events and handle bidirectional communication.""" try: # Send initial connection event with session info connection_msg = { "type": "connection", "sessionId": session_id, "endpoint": f"/inspector/message?session_id={session_id}", } yield f"data: {json.dumps(connection_msg)}\n\n" # Process incoming messages from the queue while True: try: # Wait for messages with timeout for keepalive message = await asyncio.wait_for( message_queue.get(), timeout=30.0 ) # Process the message through MCP session if isinstance(message, dict) and "jsonrpc" in message: # Handle the JSON-RPC request response = await self._process_message( session_state, message ) if response: yield f"data: {json.dumps(response)}\n\n" except TimeoutError: # Send keepalive yield ": keepalive\n\n" except Exception as e: logger.error(f"Error processing message: {e}") error_response = { "jsonrpc": "2.0", "error": {"code": -32603, "message": str(e)}, "id": None, } yield f"data: {json.dumps(error_response)}\n\n" finally: # Cleanup on disconnect if session_id in self.sessions: del self.sessions[session_id] if session_id in self.message_queues: del self.message_queues[session_id] logger.info(f"Inspector SSE connection closed: {session_id}") return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "*", }, ) async def handle_message(self, request: Request): """Handle incoming JSON-RPC messages from Inspector.""" session_id = request.query_params.get("session_id") if not session_id or session_id not in self.message_queues: return {"error": "Invalid or missing session_id"} try: message = await request.json() logger.info(f"Inspector message for session {session_id}: {message}") # Put message in queue for processing await self.message_queues[session_id].put(message) # Return acknowledgment return {"status": "queued"} except Exception as e: logger.error(f"Failed to process message: {e}") return {"error": str(e)} async def _process_message( self, session_state: dict[str, Any], message: dict[str, Any] ) -> dict[str, Any] | None: """Process a JSON-RPC message through the MCP session.""" method = message.get("method") params = message.get("params", {}) msg_id = message.get("id") try: # Handle different MCP methods if method == "initialize": # Mark session as initialized session_state["initialized"] = True protocol_version = params.get("protocolVersion", "2024-11-05") # Get server capabilities return { "jsonrpc": "2.0", "id": msg_id, "result": { "protocolVersion": protocol_version, "capabilities": { "tools": {"listChanged": True} if hasattr(mcp, "_tool_manager") and hasattr(mcp._tool_manager, "tools") and mcp._tool_manager.tools else {}, "resources": {"listChanged": True} if hasattr(mcp, "_resource_manager") and hasattr(mcp._resource_manager, "resources") and mcp._resource_manager.resources else {}, "prompts": {"listChanged": True} if hasattr(mcp, "_prompt_manager") and hasattr(mcp._prompt_manager, "prompts") and mcp._prompt_manager.prompts else {}, }, "serverInfo": { "name": session_state["server_name"], "version": session_state["server_version"], }, }, } elif method == "tools/list": # List available tools tools = [] if ( hasattr(mcp, "_tool_manager") and hasattr(mcp._tool_manager, "tools") and hasattr(mcp._tool_manager.tools, "items") ): for tool_name, tool_func in mcp._tool_manager.tools.items(): # type: ignore[attr-defined] tools.append( { "name": tool_name, "description": tool_func.__doc__ or "No description", "inputSchema": getattr(tool_func, "input_schema", {}), } ) return {"jsonrpc": "2.0", "id": msg_id, "result": {"tools": tools}} elif method == "resources/list": # List available resources resources = [] if ( hasattr(mcp, "_resource_manager") and hasattr(mcp._resource_manager, "resources") and hasattr(mcp._resource_manager.resources, "items") ): for ( resource_uri, resource_func, ) in mcp._resource_manager.resources.items(): # type: ignore[attr-defined] resources.append( { "uri": resource_uri, "name": getattr( resource_func, "__name__", str(resource_func) ), "description": getattr(resource_func, "__doc__", None) or "No description", } ) return { "jsonrpc": "2.0", "id": msg_id, "result": {"resources": resources}, } elif method == "tools/call": # Call a tool tool_name = params.get("name") tool_args = params.get("arguments", {}) if ( hasattr(mcp, "_tool_manager") and hasattr(mcp._tool_manager, "tools") and hasattr(mcp._tool_manager.tools, "__contains__") and tool_name in mcp._tool_manager.tools # type: ignore[operator] ): tool_func = mcp._tool_manager.tools[tool_name] # type: ignore[index] try: # Execute the tool result = await tool_func(**tool_args) return { "jsonrpc": "2.0", "id": msg_id, "result": { "content": [ { "type": "text", "text": json.dumps(result, default=str), } ] }, } except Exception as tool_error: return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": -32603, "message": f"Tool execution error: {str(tool_error)}", }, } else: return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": -32601, "message": f"Tool not found: {tool_name}", }, } else: # Method not found return { "jsonrpc": "2.0", "id": msg_id, "error": {"code": -32601, "message": f"Method not found: {method}"}, } except Exception as e: logger.error(f"Error processing {method}: {e}") return { "jsonrpc": "2.0", "id": msg_id, "error": {"code": -32603, "message": str(e)}, } # Create global handler instance inspector_sse = InspectorCompatibleSSE()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server