Skip to main content
Glama
main.py11.5 kB
""" FastAPI Main Application MCP Server with SSE support and n8n integration """ import asyncio import logging import os from contextlib import asynccontextmanager from typing import Dict, Any import structlog from fastapi import FastAPI, Request, HTTPException, Depends from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from sse_starlette.sse import EventSourceResponse from prometheus_client import Counter, Histogram, Gauge, generate_latest from prometheus_client import CONTENT_TYPE_LATEST from starlette.responses import Response from dotenv import load_dotenv from models import ( AppConfig, ServerConfig, ABCSystemConfig, N8NConfig, SSEConfig, N8NWebhookRequest ) from mcp_protocol import MCPProtocolHandler from sse_handler import SSEHandler, SSEConnectionManager from n8n_integration import N8NIntegration from tools.system_check import SystemCheckTool # Load environment variables load_dotenv() # Configure logging structlog.configure( processors=[ structlog.processors.TimeStamper(fmt="iso"), structlog.processors.add_log_level, structlog.processors.JSONRenderer() ] ) logger = structlog.get_logger() # Prometheus metrics REQUEST_COUNT = Counter( 'mcp_requests_total', 'Total number of requests', ['method', 'endpoint', 'status'] ) REQUEST_DURATION = Histogram( 'mcp_request_duration_seconds', 'Request duration in seconds', ['method', 'endpoint'] ) ACTIVE_SSE_CONNECTIONS = Gauge( 'mcp_sse_connections_active', 'Number of active SSE connections' ) TOOL_EXECUTIONS = Counter( 'mcp_tool_executions_total', 'Total number of tool executions', ['tool_name', 'status'] ) # Global instances mcp_handler: MCPProtocolHandler sse_manager: SSEConnectionManager sse_handler: SSEHandler n8n_integration: N8NIntegration app_config: AppConfig def load_config() -> AppConfig: """Load configuration from environment variables""" return AppConfig( server=ServerConfig( host=os.getenv("MCP_SERVER_HOST", "0.0.0.0"), port=int(os.getenv("MCP_SERVER_PORT", "3001")), workers=int(os.getenv("MCP_SERVER_WORKERS", "4")), log_level=os.getenv("LOG_LEVEL", "INFO") ), abc_system=ABCSystemConfig( base_url=os.getenv("ABC_SYSTEM_BASE_URL", "https://api.abc.com"), api_key=os.getenv("ABC_API_KEY", ""), timeout=int(os.getenv("ABC_TIMEOUT", "30")), max_retries=int(os.getenv("ABC_MAX_RETRIES", "3")), retry_backoff=float(os.getenv("ABC_RETRY_BACKOFF", "1.0")) ), n8n=N8NConfig( instance_url=os.getenv("N8N_INSTANCE_URL", "n8n-prod.iconiclogs.com"), api_key=os.getenv("N8N_API_KEY"), webhook_path=os.getenv("N8N_WEBHOOK_PATH", "mcp-system-check") ), sse=SSEConfig( heartbeat_interval=int(os.getenv("SSE_HEARTBEAT_INTERVAL", "30")), reconnect_timeout=int(os.getenv("SSE_RECONNECT_TIMEOUT", "5")), max_connections=int(os.getenv("SSE_MAX_CONNECTIONS", "100")) ) ) @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager""" global mcp_handler, sse_manager, sse_handler, n8n_integration, app_config logger.info("Starting MCP SSE Server...") # Load configuration app_config = load_config() # Initialize MCP protocol handler mcp_handler = MCPProtocolHandler() # Initialize SSE connection manager sse_manager = SSEConnectionManager(app_config.sse) # Initialize SSE handler sse_handler = SSEHandler(mcp_handler, sse_manager, app_config.sse) # Initialize n8n integration n8n_integration = N8NIntegration(app_config.n8n, app_config.abc_system) # Register tools system_check_tool = SystemCheckTool(app_config.abc_system) mcp_handler.register_tool( name=system_check_tool.name, description=system_check_tool.description, input_schema=system_check_tool.input_schema.model_dump(), handler=system_check_tool.execute ) logger.info( "MCP SSE Server started", port=app_config.server.port, tools=len(mcp_handler.tools), n8n_instance=app_config.n8n.instance_url ) yield # Cleanup logger.info("Shutting down MCP SSE Server...") # Create FastAPI app app = FastAPI( title="MCP SSE Server", description="Model Context Protocol Server with SSE transport and n8n integration", version="1.0.0", lifespan=lifespan ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure appropriately for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Middleware for metrics @app.middleware("http") async def metrics_middleware(request: Request, call_next): """Record request metrics""" method = request.method endpoint = request.url.path with REQUEST_DURATION.labels(method=method, endpoint=endpoint).time(): response = await call_next(request) REQUEST_COUNT.labels( method=method, endpoint=endpoint, status=response.status_code ).inc() return response # ==================== SSE Endpoints ==================== @app.get("/sse") async def sse_endpoint(request: Request): """ SSE endpoint for MCP protocol communication Clients connect to this endpoint to establish SSE connection and communicate with MCP protocol """ connection_id = request.query_params.get("connection_id") logger.info("New SSE connection request", connection_id=connection_id) ACTIVE_SSE_CONNECTIONS.inc() try: return EventSourceResponse( sse_handler.handle_sse_connection(connection_id), headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no" } ) finally: ACTIVE_SSE_CONNECTIONS.dec() # ==================== n8n Webhook Endpoints ==================== @app.post("/n8n/webhook/{webhook_id}") async def n8n_webhook(webhook_id: str, request: Request): """ n8n webhook endpoint Receives requests from n8n workflows and executes MCP tools """ try: # Validate webhook if not await n8n_integration.validate_webhook(webhook_id): raise HTTPException(status_code=400, detail="Invalid webhook ID") # Parse request body request_data = await request.json() logger.info( "n8n webhook request", webhook_id=webhook_id, tool=request_data.get("tool") ) # Handle webhook response = await n8n_integration.handle_webhook_request( webhook_id, request_data ) # Record metrics status = "success" if response.success else "error" TOOL_EXECUTIONS.labels( tool_name=response.tool, status=status ).inc() return JSONResponse(content=response.model_dump()) except Exception as e: logger.error("Webhook error", webhook_id=webhook_id, error=str(e)) raise HTTPException(status_code=500, detail=str(e)) @app.get("/n8n/tools") async def list_n8n_tools(): """ List available tools for n8n integration """ tools = n8n_integration.get_available_tools() return JSONResponse(content={"tools": tools}) # ==================== Health & Monitoring Endpoints ==================== @app.get("/health") async def health_check(): """Health check endpoint""" return JSONResponse(content={ "status": "healthy", "server": "mcp-sse-server-python", "version": "1.0.0", "active_connections": len(sse_manager.get_all_connections()), "registered_tools": len(mcp_handler.tools) }) @app.get("/health/live") async def liveness_probe(): """Kubernetes liveness probe""" return JSONResponse(content={"status": "alive"}) @app.get("/health/ready") async def readiness_probe(): """Kubernetes readiness probe""" # Check if server is ready to accept connections ready = mcp_handler is not None and sse_manager is not None if not ready: raise HTTPException(status_code=503, detail="Not ready") return JSONResponse(content={"status": "ready"}) @app.get("/metrics") async def metrics(): """Prometheus metrics endpoint""" return Response( content=generate_latest(), media_type=CONTENT_TYPE_LATEST ) # ==================== Info Endpoints ==================== @app.get("/") async def root(): """Root endpoint with server info""" return JSONResponse(content={ "name": "MCP SSE Server", "version": "1.0.0", "description": "Model Context Protocol Server with SSE transport", "endpoints": { "sse": "/sse", "n8n_webhook": "/n8n/webhook/{webhook_id}", "n8n_tools": "/n8n/tools", "health": "/health", "metrics": "/metrics", "info": "/info" } }) @app.get("/info") async def server_info(): """Detailed server information""" return JSONResponse(content={ "server": mcp_handler.get_server_info(), "connections": { "active": len(sse_manager.get_all_connections()), "max": app_config.sse.max_connections, "details": sse_manager.get_all_connections() }, "tools": mcp_handler.get_tool_list(), "config": { "sse_heartbeat_interval": app_config.sse.heartbeat_interval, "n8n_instance": app_config.n8n.instance_url, "abc_system_url": app_config.abc_system.base_url } }) # ==================== Admin Endpoints ==================== @app.get("/connections") async def list_connections(): """List all active SSE connections""" connections = sse_manager.get_all_connections() return JSONResponse(content={ "total": len(connections), "connections": connections }) @app.get("/tools") async def list_tools(): """List all registered MCP tools""" tools = mcp_handler.get_tool_list() return JSONResponse(content={"tools": tools}) # ==================== Error Handlers ==================== @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): """Handle HTTP exceptions""" logger.warning( "HTTP exception", status_code=exc.status_code, detail=exc.detail, path=request.url.path ) return JSONResponse( status_code=exc.status_code, content={"error": exc.detail} ) @app.exception_handler(Exception) async def general_exception_handler(request: Request, exc: Exception): """Handle general exceptions""" logger.error( "Unhandled exception", error=str(exc), path=request.url.path, exc_info=True ) return JSONResponse( status_code=500, content={"error": "Internal server error"} ) # ==================== Main Entry Point ==================== if __name__ == "__main__": import uvicorn config = load_config() uvicorn.run( "main:app", host=config.server.host, port=config.server.port, workers=config.server.workers, log_level=config.server.log_level.lower(), reload=False # Set to True for development )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nguyenxtan/mcpwn8n'

If you have feedback or need assistance with the MCP directory API, please join our Discord server