Skip to main content
Glama
http_server.py•31.8 kB
# SPDX-License-Identifier: GPL-3.0-only OR MIT """ HTTP Server for MCP using SSE (Server-Sent Events) transport. This module provides HTTP transport support for Smithery and other HTTP-based MCP clients, while keeping STDIO transport for Docker MCP Catalog. """ import asyncio import logging import os from typing import Any try: from starlette.applications import Starlette from starlette.routing import Route from starlette.responses import Response from starlette.requests import Request from starlette.middleware.cors import CORSMiddleware import uvicorn STARLETTE_AVAILABLE = True except ImportError: STARLETTE_AVAILABLE = False try: from mcp.server.sse import SseServerTransport SSE_AVAILABLE = True except ImportError: SSE_AVAILABLE = False from .server import server from . import __version__ logger = logging.getLogger(__name__) # Import handlers - try to get them from server if direct import fails try: from .server import list_tools, list_resources, list_prompts, call_tool, read_resource, get_prompt logger.info("Successfully imported all handlers") except ImportError as e: logger.warning(f"Could not import handlers directly: {e}") # Fallback: we'll access them through server handlers if needed list_tools = None list_resources = None list_prompts = None call_tool = None read_resource = None get_prompt = None except Exception as e: logger.error(f"Unexpected error importing handlers: {e}", exc_info=True) # Set to None to prevent crashes list_tools = None list_resources = None list_prompts = None call_tool = None read_resource = None get_prompt = None async def _handle_direct_mcp_request(request_data: dict) -> dict: """ Handle MCP request directly without SSE session. This is used when Smithery POSTs directly without establishing SSE connection. Processes requests using the server's registered handlers. Args: request_data: JSON-RPC request data Returns: JSON-RPC response data """ import json try: method = request_data.get("method", "") params = request_data.get("params", {}) request_id = request_data.get("id") if method == "initialize": # Handle initialize request - return proper capabilities # Indicate that we support tools, resources, and prompts result = { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": params.get("protocolVersion", "2025-06-18"), "capabilities": { "tools": { "listChanged": False }, "resources": { "subscribe": False, "listChanged": False }, "prompts": { "listChanged": False } }, "serverInfo": { "name": "arch-ops-server", "version": __version__ } } } return result elif method == "tools/list": # Call the server's list_tools handler directly logger.info("Handling tools/list request") try: tools = await list_tools() logger.info(f"Got {len(tools)} tools") # Convert Tool objects to dicts tools_list = [] for tool in tools: tools_list.append({ "name": tool.name, "description": tool.description, "inputSchema": tool.inputSchema }) logger.info(f"Returning {len(tools_list)} tools") return { "jsonrpc": "2.0", "id": request_id, "result": { "tools": tools_list } } except Exception as e: logger.error(f"Error in tools/list: {e}", exc_info=True) return { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Failed to list tools: {str(e)}" }, "id": request_id } elif method == "resources/list": # Call the server's list_resources handler directly logger.info("Handling resources/list request") try: # Ensure we await the async function properly resources = await list_resources() logger.info(f"Got {len(resources)} resources") # Convert Resource objects to dicts resources_list = [] for resource in resources: try: resources_list.append({ "uri": str(resource.uri), "name": str(resource.name) if resource.name else "", "mimeType": str(resource.mimeType) if resource.mimeType else "text/plain", "description": str(resource.description) if resource.description else "" }) except Exception as e: logger.error(f"Error converting resource to dict: {e}", exc_info=True) # Skip this resource but continue with others continue logger.info(f"Returning {len(resources_list)} resources") return { "jsonrpc": "2.0", "id": request_id, "result": { "resources": resources_list } } except Exception as e: logger.error(f"Error in resources/list: {e}", exc_info=True) import traceback logger.error(traceback.format_exc()) return { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Failed to list resources: {str(e)}" }, "id": request_id } elif method == "prompts/list": # Call the server's list_prompts handler directly logger.info("Handling prompts/list request - starting") try: # Try to get the handler function if list_prompts is None: # Fallback: try to get it from server's registered handlers logger.warning("list_prompts not imported, trying to access via server") # The server object should have the handler registered # We can't easily access it, so return an error return { "jsonrpc": "2.0", "error": { "code": -32603, "message": "list_prompts handler not available" }, "id": request_id } # Verify list_prompts is callable if not callable(list_prompts): logger.error("list_prompts is not callable!") return { "jsonrpc": "2.0", "error": { "code": -32603, "message": "list_prompts function is not callable" }, "id": request_id } # Log before calling the function logger.info("About to call list_prompts()") # Ensure we await the async function properly prompts = await list_prompts() logger.info(f"Got {len(prompts)} prompts from list_prompts()") # Convert Prompt objects to dicts prompts_list = [] for idx, prompt in enumerate(prompts): try: logger.debug(f"Processing prompt {idx+1}/{len(prompts)}: {getattr(prompt, 'name', 'unknown')}") # Safely extract prompt fields prompt_dict = { "name": str(prompt.name) if hasattr(prompt, 'name') and prompt.name else "", } # Handle description (may be None) if hasattr(prompt, 'description'): prompt_dict["description"] = str(prompt.description) if prompt.description else "" else: prompt_dict["description"] = "" # Handle arguments (may be None or empty list) if hasattr(prompt, 'arguments') and prompt.arguments: # Ensure arguments is a list if isinstance(prompt.arguments, list): prompt_dict["arguments"] = prompt.arguments else: # Try to convert to list if it's not prompt_dict["arguments"] = list(prompt.arguments) if prompt.arguments else [] else: prompt_dict["arguments"] = [] prompts_list.append(prompt_dict) logger.debug(f"Added prompt: {prompt_dict['name']}") except Exception as e: logger.error(f"Error converting prompt {idx+1} to dict: {e}", exc_info=True) import traceback logger.error(traceback.format_exc()) # Skip this prompt but continue with others continue logger.info(f"Returning {len(prompts_list)} prompts") result = { "jsonrpc": "2.0", "id": request_id, "result": { "prompts": prompts_list } } logger.info("Successfully prepared prompts/list response") return result except Exception as e: logger.error(f"Error in prompts/list: {e}", exc_info=True) import traceback logger.error(traceback.format_exc()) return { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Failed to list prompts: {str(e)}" }, "id": request_id } elif method == "tools/call": # Call tool execution logger.info(f"Direct HTTP tools/call: {params.get('name')}") tool_name = params.get("name", "") tool_arguments = params.get("arguments", {}) try: # Execute the tool result_content = await call_tool(tool_name, tool_arguments) # Convert content objects to dicts content_list = [] for content in result_content: if hasattr(content, 'type') and content.type == "text": content_list.append({ "type": "text", "text": content.text }) elif hasattr(content, 'type') and content.type == "image": content_list.append({ "type": "image", "data": content.data, "mimeType": content.mimeType }) elif hasattr(content, 'type') and content.type == "resource": content_list.append({ "type": "resource", "resource": { "uri": content.resource.uri, "mimeType": content.resource.mimeType, "text": content.resource.text if hasattr(content.resource, 'text') else None, "blob": content.resource.blob if hasattr(content.resource, 'blob') else None, } }) return { "jsonrpc": "2.0", "id": request_id, "result": { "content": content_list } } except ValueError as e: # Tool not found or invalid arguments logger.error(f"Tool error: {e}") return { "jsonrpc": "2.0", "error": { "code": -32602, "message": str(e) }, "id": request_id } except Exception as e: # Other errors during tool execution logger.error(f"Tool execution error: {e}", exc_info=True) return { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Tool execution failed: {str(e)}" }, "id": request_id } elif method == "resources/read": # Read resource logger.info(f"Direct HTTP resources/read: {params.get('uri')}") uri = params.get("uri", "") try: # Read the resource resource_content = await read_resource(uri) return { "jsonrpc": "2.0", "id": request_id, "result": { "contents": [ { "uri": uri, "mimeType": "text/plain", "text": resource_content } ] } } except ValueError as e: # Resource not found or invalid URI logger.error(f"Resource error: {e}") return { "jsonrpc": "2.0", "error": { "code": -32602, "message": str(e) }, "id": request_id } except Exception as e: # Other errors during resource read logger.error(f"Resource read error: {e}", exc_info=True) return { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Resource read failed: {str(e)}" }, "id": request_id } elif method == "prompts/get": # Get prompt logger.info(f"Direct HTTP prompts/get: {params.get('name')}") prompt_name = params.get("name", "") prompt_arguments = params.get("arguments", {}) try: # Get the prompt prompt_result = await get_prompt(prompt_name, prompt_arguments) # Convert PromptMessage objects to dicts messages_list = [] for message in prompt_result.messages: msg_dict = { "role": message.role, "content": { "type": "text", "text": message.content.text } } messages_list.append(msg_dict) return { "jsonrpc": "2.0", "id": request_id, "result": { "description": prompt_result.description, "messages": messages_list } } except ValueError as e: # Prompt not found or invalid arguments logger.error(f"Prompt error: {e}") return { "jsonrpc": "2.0", "error": { "code": -32602, "message": str(e) }, "id": request_id } except Exception as e: # Other errors during prompt generation logger.error(f"Prompt generation error: {e}", exc_info=True) return { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Prompt generation failed: {str(e)}" }, "id": request_id } else: # For other methods, return method not found return { "jsonrpc": "2.0", "error": { "code": -32601, "message": f"Method '{method}' not supported in direct HTTP mode. Please use SSE connection." }, "id": request_id } except Exception as e: logger.error(f"Error handling direct MCP request: {e}", exc_info=True) import traceback logger.error(traceback.format_exc()) return { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Internal error: {str(e)}" }, "id": request_data.get("id") } # Initialize SSE transport at module level sse: Any = None if SSE_AVAILABLE: sse = SseServerTransport("/messages") async def handle_sse_raw(scope: dict, receive: Any, send: Any) -> None: """ Raw ASGI handler for Server-Sent Events (SSE) endpoint for MCP. This is the main MCP endpoint that Smithery will connect to. Args: scope: ASGI scope dictionary receive: ASGI receive callable send: ASGI send callable """ if not SSE_AVAILABLE or sse is None: logger.error("SSE transport not available - MCP package needs SSE support") await send({ "type": "http.response.start", "status": 500, "headers": [[b"content-type", b"text/plain"]], }) await send({ "type": "http.response.body", "body": b"SSE transport not available. Install mcp package with SSE support.", }) return logger.info("New SSE connection established") try: async with sse.connect_sse(scope, receive, send) as streams: await server.run( streams[0], streams[1], server.create_initialization_options() ) except Exception as e: logger.error(f"SSE connection error: {e}", exc_info=True) raise async def handle_sse(request: Request) -> None: """ Starlette request handler wrapper for SSE endpoint. Args: request: Starlette Request object """ await handle_sse_raw(request.scope, request.receive, request._send) async def handle_messages_raw(scope: dict, receive: Any, send: Any) -> None: """ Raw ASGI handler for POST requests to /messages endpoint for SSE transport. Args: scope: ASGI scope dictionary receive: ASGI receive callable send: ASGI send callable """ if not SSE_AVAILABLE or sse is None: await send({ "type": "http.response.start", "status": 500, "headers": [[b"content-type", b"text/plain"]], }) await send({ "type": "http.response.body", "body": b"SSE transport not available.", }) return try: await sse.handle_post_message(scope, receive, send) except Exception as e: logger.error(f"Message handling error: {e}", exc_info=True) await send({ "type": "http.response.start", "status": 500, "headers": [[b"content-type", b"application/json"]], }) await send({ "type": "http.response.body", "body": f'{{"jsonrpc": "2.0", "error": {{"code": -32603, "message": "Internal error: {str(e)}"}}, "id": null}}'.encode(), }) async def handle_messages(request: Request) -> None: """ Starlette request handler wrapper for messages endpoint. Args: request: Starlette Request object """ await handle_messages_raw(request.scope, request.receive, request._send) async def handle_mcp_raw(scope: dict, receive: Any, send: Any) -> None: """ Raw ASGI handler for /mcp endpoint (Smithery requirement). Smithery expects a single /mcp endpoint that handles: - GET: Establish SSE connection (streamable HTTP) - POST: Send messages - DELETE: Close connection Args: scope: ASGI scope dictionary receive: ASGI receive callable send: ASGI send callable """ async def handle_mcp_raw(scope: dict, receive: Any, send: Any) -> None: """ Raw ASGI handler for /mcp endpoint (Smithery requirement). Smithery expects a single /mcp endpoint that handles: - GET: Establish SSE connection (streamable HTTP) - POST: Send messages - DELETE: Close connection Args: scope: ASGI scope dictionary receive: ASGI receive callable send: ASGI send callable """ method = scope.get("method", "") # Wrap everything in a try-except to catch any unhandled exceptions try: if method == "GET": # GET /mcp establishes SSE connection # The SSE transport might check the path, so we ensure compatibility logger.info("GET /mcp - Establishing SSE connection") # For GET, the path doesn't matter for connect_sse, but we keep original await handle_sse_raw(scope, receive, send) elif method == "POST": # POST /mcp sends messages # Check if session_id exists in query string query_string = scope.get("query_string", b"").decode("utf-8") has_session_id = "session_id" in query_string if not has_session_id: # Smithery POSTs directly without establishing SSE connection first # Handle as regular HTTP request-response (non-SSE) logger.info("POST /mcp without session_id - handling as regular HTTP request") request_data = None try: # Read request body body = b"" more_body = True while more_body: message = await receive() if message["type"] == "http.request": body += message.get("body", b"") more_body = message.get("more_body", False) # Parse JSON-RPC request import json request_data = json.loads(body.decode("utf-8")) logger.info(f"Processing MCP request: {request_data.get('method', 'unknown')}") # Handle it as a direct HTTP request-response response = await _handle_direct_mcp_request(request_data) await send({ "type": "http.response.start", "status": 200, "headers": [[b"content-type", b"application/json"], [b"access-control-allow-origin", b"*"]], }) await send({ "type": "http.response.body", "body": json.dumps(response).encode("utf-8"), }) return except json.JSONDecodeError as e: logger.error(f"JSON decode error: {e}", exc_info=True) import json await send({ "type": "http.response.start", "status": 400, "headers": [[b"content-type", b"application/json"]], }) await send({ "type": "http.response.body", "body": json.dumps({ "jsonrpc": "2.0", "error": {"code": -32700, "message": f"Parse error: {str(e)}"}, "id": None }).encode("utf-8"), }) return except Exception as e: logger.error(f"Error handling direct POST request: {e}", exc_info=True) import traceback logger.error(traceback.format_exc()) import json await send({ "type": "http.response.start", "status": 500, "headers": [[b"content-type", b"application/json"]], }) error_response = { "jsonrpc": "2.0", "error": {"code": -32603, "message": f"Internal error: {str(e)}"}, "id": request_data.get("id") if request_data else None } await send({ "type": "http.response.body", "body": json.dumps(error_response).encode("utf-8"), }) return # The SSE transport expects /messages path, so we modify the scope logger.info("POST /mcp - Handling message with session_id") # Create a modified scope with /messages path for SSE transport compatibility modified_scope = dict(scope) modified_scope["path"] = "/messages" # Preserve query string (includes session_id) modified_scope["query_string"] = scope.get("query_string", b"") await handle_messages_raw(modified_scope, receive, send) elif method == "DELETE": # DELETE /mcp closes connection logger.info("DELETE /mcp - Closing connection") # SSE connections are closed when the stream ends, so just return 200 await send({ "type": "http.response.start", "status": 200, "headers": [[b"content-type", b"text/plain"]], }) await send({ "type": "http.response.body", "body": b"Connection closed", }) else: await send({ "type": "http.response.start", "status": 405, "headers": [[b"content-type", b"text/plain"]], }) await send({ "type": "http.response.body", "body": f"Method {method} not allowed".encode(), }) except Exception as e: # Catch any unhandled exceptions at the top level logger.error(f"Unhandled exception in handle_mcp_raw: {e}", exc_info=True) import traceback logger.error(traceback.format_exc()) import json try: await send({ "type": "http.response.start", "status": 500, "headers": [[b"content-type", b"application/json"]], }) await send({ "type": "http.response.body", "body": json.dumps({ "jsonrpc": "2.0", "error": {"code": -32603, "message": f"Internal server error: {str(e)}"}, "id": None }).encode("utf-8"), }) except Exception as send_error: logger.error(f"Failed to send error response: {send_error}", exc_info=True) async def handle_mcp(request: Request) -> None: """ Starlette request handler wrapper for /mcp endpoint. Args: request: Starlette Request object """ await handle_mcp_raw(request.scope, request.receive, request._send) def create_app() -> Any: """ Create Starlette application with MCP SSE endpoints. Returns: Starlette application instance Raises: ImportError: If starlette is not installed """ if not STARLETTE_AVAILABLE: raise ImportError( "Starlette and uvicorn are required for HTTP transport. " "Install with: pip install 'arch-ops-server[http]'" ) if not SSE_AVAILABLE or sse is None: raise ImportError( "MCP SSE transport not available. Install mcp package with SSE support." ) # Create routes # - /mcp: Required by Smithery (handles GET/POST/DELETE for streamable HTTP) # - /sse and /messages: Alternative endpoints for other clients routes = [ Route("/mcp", endpoint=handle_mcp, methods=["GET", "POST", "DELETE"]), Route("/sse", endpoint=handle_sse), Route("/messages", endpoint=handle_messages, methods=["POST"]), ] # Create app app = Starlette(debug=False, routes=routes) # Add CORS middleware for browser-based clients app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["GET", "POST", "OPTIONS", "DELETE"], allow_headers=["*"], expose_headers=["*"], ) logger.info("MCP HTTP Server initialized with SSE transport") logger.info("Endpoints: GET/POST/DELETE /mcp (Smithery), GET /sse, POST /messages") return app async def run_http_server(host: str = "0.0.0.0", port: int = 8080) -> None: """ Run MCP server with HTTP transport. Args: host: Host to bind to (default: 0.0.0.0) port: Port to listen on (default: 8080, or PORT env var) """ if not STARLETTE_AVAILABLE: logger.error("HTTP transport requires starlette and uvicorn packages") logger.error("Install with: pip install starlette uvicorn") raise ImportError("starlette not available") # Get port from environment if specified (Smithery sets this) port = int(os.getenv("PORT", port)) logger.info(f"Starting Arch Linux MCP HTTP Server on {host}:{port}") logger.info("Transport: Server-Sent Events (SSE)") logger.info("Endpoints: GET/POST/DELETE /mcp (Smithery), GET /sse, POST /messages") # Create app app = create_app() # Configure uvicorn config = uvicorn.Config( app=app, host=host, port=port, log_level="info", access_log=True, ) # Run server server_instance = uvicorn.Server(config) await server_instance.serve() def main_http(): """Synchronous wrapper for HTTP server.""" asyncio.run(run_http_server()) if __name__ == "__main__": main_http()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nihalxkumar/arch-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server