Skip to main content
Glama
m2-production.md20 kB
# Milestone 2: Production Readiness **Status:** Not Started **Target:** Production-grade gateway with HTTP transport, health checks, and robust error handling --- ## Overview M2 transforms the gateway from a development tool to a production-ready service. This milestone adds: - HTTP transport with Streamable HTTP/SSE support - Health check endpoints for monitoring - Comprehensive error handling with all defined error codes - HTTP session management with session IDs - Performance optimizations and monitoring - Security hardening **Key Success Metric:** Gateway can run as a production HTTP service with <100ms P95 latency, proper error handling, and monitoring capabilities. --- ## Core Components ### 1. HTTP Transport Implementation ### 2. Health Check Endpoints ### 3. Error Handling System ### 4. Session Management ### 5. Security Hardening ### 6. Performance Optimization --- ## Detailed Task Checklist ### HTTP Transport Setup - [ ] Configure gateway for HTTP transport - [ ] Add HTTP transport support to gateway.run() - [ ] Configure host and port from environment variables - [ ] Support both stdio and HTTP concurrently (if needed) - [ ] Implement Streamable HTTP with SSE support - [ ] Add CORS handling for web clients - [ ] Environment variable configuration - [ ] `GATEWAY_TRANSPORT` (stdio|http) - [ ] `GATEWAY_HTTP_HOST` (default: 127.0.0.1) - [ ] `GATEWAY_HTTP_PORT` (default: 8000) - [ ] `GATEWAY_CORS_ORIGINS` (comma-separated allowed origins) - [ ] Test HTTP transport - [ ] Verify HTTP POST for sending messages - [ ] Verify SSE streaming for server messages - [ ] Test session creation and reuse - [ ] Test concurrent HTTP connections **Code Reference:** ```python # main.py updates import os def main(): # ... existing setup ... # Configure transport transport = os.getenv("GATEWAY_TRANSPORT", "stdio") if transport == "http": host = os.getenv("GATEWAY_HTTP_HOST", "127.0.0.1") port = int(os.getenv("GATEWAY_HTTP_PORT", "8000")) # Run with HTTP transport gateway.run( transport="http", host=host, port=port ) else: # Run with stdio transport (default) gateway.run() if __name__ == "__main__": main() ``` **Documentation Reference:** - FastMCP HTTP Transport - https://gofastmcp.com/servers/server#running-the-server - MCP Streamable HTTP - https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#streamable-http ### Health Check Endpoints - [ ] Implement health check endpoint - [ ] Add `/health` endpoint using `@gateway.custom_route` - [ ] Return 200 OK with basic status - [ ] Include gateway version info - [ ] Check downstream server connectivity (optional) - [ ] Implement readiness check endpoint - [ ] Add `/ready` endpoint - [ ] Verify all proxy connections are initialized - [ ] Return 503 if not ready - [ ] Return 200 when ready - [ ] Implement metrics endpoint - [ ] Add `/metrics` endpoint - [ ] Return metrics in JSON format - [ ] Include operation counts, latencies, error rates - [ ] Support Prometheus format (optional) **Code Reference:** ```python from fastmcp import FastMCP from starlette.requests import Request from starlette.responses import JSONResponse, PlainTextResponse import time # Store startup time STARTUP_TIME = time.time() @gateway.custom_route("/health", methods=["GET"]) async def health_check(request: Request) -> JSONResponse: """Basic health check endpoint.""" return JSONResponse({ "status": "healthy", "service": "agent-mcp-gateway", "version": "1.0.0", "uptime_seconds": time.time() - STARTUP_TIME }) @gateway.custom_route("/ready", methods=["GET"]) async def readiness_check(request: Request) -> JSONResponse: """Readiness check - verifies gateway is ready to serve requests.""" # Get proxy clients from state # In production, you'd store this in a way accessible to routes proxy_clients = getattr(gateway, "_proxy_clients", {}) if not proxy_clients: return JSONResponse( {"status": "not_ready", "reason": "no_proxy_clients"}, status_code=503 ) return JSONResponse({ "status": "ready", "servers_configured": len(proxy_clients) }) @gateway.custom_route("/metrics", methods=["GET"]) async def metrics_endpoint(request: Request) -> JSONResponse: """Metrics endpoint for monitoring.""" metrics_collector = getattr(gateway, "_metrics_collector", None) if not metrics_collector: return JSONResponse({"error": "metrics not available"}, status_code=503) summary = metrics_collector.get_summary() return JSONResponse({ "timestamp": time.time(), "operations": summary, "uptime_seconds": time.time() - STARTUP_TIME }) ``` **Documentation Reference:** - FastMCP Custom Routes - https://gofastmcp.com/servers/server#custom-routes ### Comprehensive Error Handling - [ ] Define error code constants - [ ] `DENIED_BY_POLICY` (-32000) - [ ] `SERVER_UNAVAILABLE` (-32001) - [ ] `TOOL_NOT_FOUND` (-32002) - [ ] `INVALID_AGENT_ID` (-32003) - [ ] `TIMEOUT` (-32004) - [ ] Implement custom exception classes - [ ] `DeniedByPolicyError` with rule reference - [ ] `ServerUnavailableError` with server name - [ ] `ToolNotFoundError` with tool and server - [ ] `InvalidAgentIdError` with validation details - [ ] `TimeoutError` with duration info - [ ] Update all tools to use error codes - [ ] Standardize error responses - [ ] Include helpful error messages - [ ] Add rule references for policy denials - [ ] Log all errors appropriately - [ ] Add error response formatting - [ ] Consistent JSON structure - [ ] Include error code, message, data - [ ] Support MCP error format **Code Reference:** ```python # src/errors.py from fastmcp.exceptions import ToolError class GatewayError(ToolError): """Base class for gateway errors.""" code: int = -32000 class DeniedByPolicyError(GatewayError): """Raised when operation is denied by policy.""" code = -32000 def __init__(self, agent_id: str, operation: str, rule: str): self.agent_id = agent_id self.operation = operation self.rule = rule super().__init__( f"Agent '{agent_id}' denied {operation}", data={ "code": "DENIED_BY_POLICY", "agent_id": agent_id, "operation": operation, "rule": rule } ) class ServerUnavailableError(GatewayError): """Raised when downstream server is unavailable.""" code = -32001 def __init__(self, server: str, reason: str = ""): self.server = server self.reason = reason super().__init__( f"Server '{server}' is unavailable" + (f": {reason}" if reason else ""), data={ "code": "SERVER_UNAVAILABLE", "server": server, "reason": reason } ) class ToolNotFoundError(GatewayError): """Raised when requested tool doesn't exist.""" code = -32002 def __init__(self, tool: str, server: str): self.tool = tool self.server = server super().__init__( f"Tool '{tool}' not found on server '{server}'", data={ "code": "TOOL_NOT_FOUND", "tool": tool, "server": server } ) class InvalidAgentIdError(GatewayError): """Raised when agent_id is invalid or missing.""" code = -32003 def __init__(self, reason: str): self.reason = reason super().__init__( f"Invalid agent_id: {reason}", data={ "code": "INVALID_AGENT_ID", "reason": reason } ) class GatewayTimeoutError(GatewayError): """Raised when operation times out.""" code = -32004 def __init__(self, operation: str, timeout_ms: int): self.operation = operation self.timeout_ms = timeout_ms super().__init__( f"Operation '{operation}' timed out after {timeout_ms}ms", data={ "code": "TIMEOUT", "operation": operation, "timeout_ms": timeout_ms } ) ``` - [ ] Update tools to use new error classes - [ ] Replace generic exceptions with specific errors - [ ] Include all required context in errors - [ ] Test error responses ### Session Management - [ ] Implement HTTP session tracking - [ ] Generate secure session IDs (UUID4) - [ ] Store session state (agent_id, created_at, last_used) - [ ] Return `Mcp-Session-Id` header on initialization - [ ] Validate `Mcp-Session-Id` on subsequent requests - [ ] Support session termination via DELETE - [ ] Add session timeout - [ ] Configure session TTL (default: 1 hour) - [ ] Clean up expired sessions - [ ] Return 404 for expired sessions - [ ] Implement session storage - [ ] In-memory session store - [ ] Thread-safe access - [ ] Support session persistence (optional) **Code Reference:** ```python # src/session.py import uuid from datetime import datetime, timedelta from typing import Dict, Optional import threading class Session: def __init__(self, session_id: str, agent_id: str = ""): self.session_id = session_id self.agent_id = agent_id self.created_at = datetime.utcnow() self.last_used = datetime.utcnow() self.data = {} def touch(self): """Update last used timestamp.""" self.last_used = datetime.utcnow() def is_expired(self, ttl_seconds: int) -> bool: """Check if session has expired.""" return (datetime.utcnow() - self.last_used).total_seconds() > ttl_seconds class SessionManager: """Manage HTTP sessions for the gateway.""" def __init__(self, ttl_seconds: int = 3600): self.sessions: Dict[str, Session] = {} self.ttl_seconds = ttl_seconds self._lock = threading.Lock() def create_session(self, agent_id: str = "") -> Session: """Create a new session.""" session_id = str(uuid.uuid4()) session = Session(session_id, agent_id) with self._lock: self.sessions[session_id] = session return session def get_session(self, session_id: str) -> Optional[Session]: """Get session by ID.""" with self._lock: session = self.sessions.get(session_id) if session: if session.is_expired(self.ttl_seconds): del self.sessions[session_id] return None session.touch() return session def delete_session(self, session_id: str): """Delete a session.""" with self._lock: self.sessions.pop(session_id, None) def cleanup_expired(self): """Remove all expired sessions.""" with self._lock: expired = [ sid for sid, session in self.sessions.items() if session.is_expired(self.ttl_seconds) ] for sid in expired: del self.sessions[sid] ``` **Documentation Reference:** - MCP Session Management - https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#session-management ### Security Hardening - [ ] Implement Origin validation for HTTP - [ ] Validate `Origin` header on all HTTP requests - [ ] Prevent DNS rebinding attacks - [ ] Configure allowed origins via environment - [ ] Add rate limiting (optional) - [ ] Limit requests per agent per minute - [ ] Return 429 Too Many Requests when exceeded - [ ] Configurable rate limits - [ ] Secure session IDs - [ ] Use cryptographically secure random generation - [ ] Validate session ID format - [ ] Prevent session fixation attacks - [ ] Add request validation - [ ] Validate all input parameters - [ ] Sanitize error messages - [ ] Prevent information leakage **Code Reference:** ```python # src/security.py from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import JSONResponse class OriginValidationMiddleware(BaseHTTPMiddleware): """Validate Origin header to prevent DNS rebinding attacks.""" def __init__(self, app, allowed_origins: list[str]): super().__init__(app) self.allowed_origins = set(allowed_origins) async def dispatch(self, request, call_next): origin = request.headers.get("origin") # If running locally, only allow localhost if request.url.hostname in ["127.0.0.1", "localhost"]: if origin and not origin.startswith("http://localhost") \ and not origin.startswith("http://127.0.0.1"): return JSONResponse( {"error": "Invalid origin for local server"}, status_code=403 ) # For remote servers, validate against allowed origins elif origin and origin not in self.allowed_origins: return JSONResponse( {"error": "Origin not allowed"}, status_code=403 ) response = await call_next(request) return response ``` ### Performance Optimization - [ ] Implement connection pooling - [ ] Reuse ProxyClient connections when safe - [ ] Configure pool size limits - [ ] Monitor pool utilization - [ ] Add caching layer - [ ] Cache get_server_tools results (short TTL) - [ ] Cache policy evaluations - [ ] Invalidate on configuration changes - [ ] Optimize policy evaluation - [ ] Pre-compile wildcard patterns - [ ] Cache policy decisions - [ ] Index rules for faster lookup - [ ] Add performance monitoring - [ ] Track operation durations - [ ] Identify slow operations - [ ] Alert on performance degradation **Code Reference:** ```python # src/cache.py from typing import Any, Optional from datetime import datetime, timedelta import hashlib import json class Cache: """Simple TTL-based cache.""" def __init__(self, ttl_seconds: int = 60): self.ttl_seconds = ttl_seconds self._cache = {} def _make_key(self, *args, **kwargs) -> str: """Generate cache key from arguments.""" data = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True) return hashlib.md5(data.encode()).hexdigest() def get(self, key: str) -> Optional[Any]: """Get value from cache if not expired.""" if key in self._cache: value, expires_at = self._cache[key] if datetime.utcnow() < expires_at: return value else: del self._cache[key] return None def set(self, key: str, value: Any): """Store value in cache with TTL.""" expires_at = datetime.utcnow() + timedelta(seconds=self.ttl_seconds) self._cache[key] = (value, expires_at) def clear(self): """Clear all cache entries.""" self._cache.clear() ``` ### Integration & Testing - [ ] Update main.py for production - [ ] Add HTTP transport configuration - [ ] Initialize session manager - [ ] Add security middleware - [ ] Configure health check endpoints - [ ] Add graceful shutdown handling - [ ] Create HTTP integration tests - [ ] Test HTTP POST for tool calls - [ ] Test SSE streaming - [ ] Test session management - [ ] Test health check endpoints - [ ] Test error responses - [ ] Performance testing - [ ] Load test with multiple concurrent clients - [ ] Verify P95 latency <100ms - [ ] Test under sustained load - [ ] Monitor memory usage - [ ] Security testing - [ ] Test Origin validation - [ ] Test session security - [ ] Test rate limiting (if implemented) - [ ] Verify no information leakage in errors **Code Reference:** ```python # tests/test_http.py import httpx import pytest @pytest.mark.asyncio async def test_http_transport(): """Test gateway over HTTP transport.""" base_url = "http://127.0.0.1:8000" async with httpx.AsyncClient() as client: # Test health check response = await client.get(f"{base_url}/health") assert response.status_code == 200 assert response.json()["status"] == "healthy" # Test initialize (MCP protocol) response = await client.post( base_url, json={ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "test-client", "version": "1.0.0"} } }, headers={"Accept": "application/json"} ) assert response.status_code == 200 result = response.json()["result"] # Get session ID session_id = response.headers.get("Mcp-Session-Id") assert session_id is not None # Test tool call with session response = await client.post( base_url, json={ "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "list_servers", "arguments": {"agent_id": "researcher"} } }, headers={ "Mcp-Session-Id": session_id, "Accept": "application/json" } ) assert response.status_code == 200 ``` --- ## Success Criteria ### Functional Requirements - [ ] Gateway runs as HTTP service - [ ] Health check endpoints respond correctly - [ ] All error codes implemented and tested - [ ] Session management works correctly - [ ] Security validations in place ### Performance Requirements - [ ] P95 latency <100ms for all operations - [ ] Handle 100+ concurrent connections - [ ] No memory leaks under sustained load - [ ] Graceful degradation under high load ### Security Requirements - [ ] Origin validation prevents DNS rebinding - [ ] Session IDs are cryptographically secure - [ ] No sensitive information in error messages - [ ] Rate limiting prevents abuse (if implemented) ### Operational Requirements - [ ] Health checks enable monitoring - [ ] Metrics expose operational insights - [ ] Logs are structured and parseable - [ ] Graceful shutdown handling --- ## Performance Targets | Metric | Target | Measurement | |--------|--------|-------------| | HTTP request latency | <100ms (P95) | End-to-end including downstream | | Gateway overhead | <30ms (P95) | Gateway processing only | | Concurrent connections | 100+ | Simultaneous active connections | | Session lookup | <1ms | Session retrieval from store | --- ## Dependencies **External:** - FastMCP 2.13.0.1+ - httpx (for testing) - Starlette (included with FastMCP) **Internal:** - M0 (Foundation) - Config, policy, audit - M1 (Core) - All three gateway tools, middleware --- ## Documentation References - **MCP Streamable HTTP:** https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#streamable-http - **MCP Session Management:** https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#session-management - **FastMCP Custom Routes:** https://gofastmcp.com/servers/server#custom-routes - **FastMCP HTTP Transport:** https://gofastmcp.com/servers/server#running-the-server --- ## Notes - HTTP transport is critical for remote access and multi-client scenarios - Session management enables stateful interactions over HTTP - Security is paramount - Origin validation prevents DNS rebinding attacks - Health checks are essential for production monitoring - Performance optimization should be data-driven based on actual usage patterns - Consider implementing Prometheus metrics format for better monitoring integration

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/roddutra/agent-mcp-gateway'

If you have feedback or need assistance with the MCP directory API, please join our Discord server