Skip to main content
Glama

Multi-Cloud Infrastructure MCP Server

by nomadslayer
main.py14.4 kB
""" MCP Orchestrator - Main FastAPI Application Entry point with: - SkyPilot Python SDK initialization - AI Envoy Gateway integration - Agent initialization (RunPod & Vast.ai) - MCPOrchestrator initialization - Telemetry and observability """ import logging import os import sys from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from prometheus_client import make_asgi_app # Add src to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) from src.api.routes import router as api_router from src.agents.registry import AgentRegistry from src.core.clickhouse_client import ClickhouseClient from src.agents.engine import RulesEngine from src.core.envoy_client import EnvoyClient from src.core.orchestrator import MCPOrchestrator from src.core.redis_queue import RedisQueue from src.core.telemetry import TelemetryClient, initialize_telemetry # Configure logging logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO"), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Check SkyPilot availability try: import sky logger.info(f"✓ SkyPilot version {sky.__version__} loaded successfully") SKY_AVAILABLE = True except ImportError as e: logger.error(f"✗ SkyPilot not available: {e}") logger.error("Install with: pip install 'skypilot[runpod]'") SKY_AVAILABLE = False @asynccontextmanager async def lifespan(app: FastAPI): """Lifecycle manager for startup and shutdown""" # ============================================================ # STARTUP # ============================================================ logger.info("=" * 70) logger.info("🚀 Starting MCP Orchestrator with SkyPilot & AI Envoy") logger.info("=" * 70) # 1. Initialize ClickHouse logger.info("📊 Initializing ClickHouse...") ch = ClickhouseClient( host=os.getenv("CLICKHOUSE_HOST", "localhost"), port=int(os.getenv("CLICKHOUSE_PORT", 9000)), database=os.getenv("CLICKHOUSE_DB", "mcp") ) app.state.clickhouse = ch try: await ch.insert_event("mcp_startup", { "status": "initializing", "skypilot_available": SKY_AVAILABLE }, source="main") logger.info("✓ ClickHouse connected") except Exception as e: logger.warning(f"⚠ ClickHouse connection issue: {e}") # 2. Initialize AI Envoy Gateway client logger.info("🌐 Connecting to AI Envoy Gateway...") envoy_url = os.getenv("ENVOY_GATEWAY_URL", "http://envoy:8080") envoy_client = EnvoyClient(gateway_url=envoy_url) envoy_healthy = await envoy_client.health_check() if envoy_healthy: logger.info("✓ AI Envoy Gateway connected") app.state.envoy = envoy_client else: logger.warning("⚠ AI Envoy Gateway not available - running in direct mode") app.state.envoy = None # 3. Initialize Redis Queue (optional) logger.info("🔴 Connecting to Redis...") redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0") try: redis_queue = RedisQueue(redis_url) await redis_queue.connect() app.state.redis = redis_queue logger.info("✓ Redis queue connected") except Exception as e: logger.warning(f"⚠ Redis not available: {e}") app.state.redis = None # 4. Verify SkyPilot if SKY_AVAILABLE: logger.info("☁️ SkyPilot SDK ready") else: logger.error("✗ SkyPilot SDK not available - deployments will fail!") # ============================================================ # 5. Initialize AgentRegistry with AGENTS # ============================================================ logger.info("🤖 Initializing Agent Registry...") # Prepare credentials for agent preloading preload_creds = {} # RunPod Agent - can be preloaded if API key available if os.getenv("RUNPOD_API_KEY"): preload_creds["runpod"] = {"api_key": os.getenv("RUNPOD_API_KEY")} logger.info(" → RunPod API key loaded") else: logger.warning(" ⚠ RUNPOD_API_KEY not set") # Vast.ai Agent - typically used per-request for security # But can be preloaded if you want a persistent agent if os.getenv("VASTAI_API_KEY"): # Option 1: Preload for persistent agent (credentials stay in memory) # preload_creds["vastai"] = {"api_key": os.getenv("VASTAI_API_KEY")} # logger.info(" → Vast.ai agent preloaded (persistent)") # Option 2: Don't preload - create transient agents per request (more secure) logger.info(" → Vast.ai API key available (per-request mode)") else: logger.warning(" ⚠ VASTAI_API_KEY not set") # Initialize registry with agents AgentRegistry.init(preload=preload_creds) app.state.registry = AgentRegistry # Verify agents were added persistent_agents = list(AgentRegistry._persistent_agents.keys()) logger.info(f"✓ Agent Registry initialized") logger.info(f" Persistent agents: {persistent_agents}") if "runpod" in persistent_agents: logger.info(" ✓ RunPod agent ready for deployments") else: logger.warning(" ⚠ RunPod agent not initialized") if "vastai" in persistent_agents: logger.info(" ✓ Vast.ai agent ready (persistent mode)") else: logger.info(" → Vast.ai agent will be created per-request (transient mode)") # ============================================================ # 6. Load GPU Rules Engine # ============================================================ logger.info("⚙️ Loading GPU Rules Engine...") rules_engine = RulesEngine() await rules_engine.load_rules() app.state.rules_engine = rules_engine logger.info(f"✓ Rules engine loaded ({len(rules_engine._rules_cache)} rules)") # ============================================================ # 7. Initialize Telemetry # ============================================================ logger.info("📡 Initializing Telemetry...") telemetry = initialize_telemetry( clickhouse_client=ch, envoy_client=app.state.envoy if envoy_healthy else None, enable_prometheus=True, enable_otel=False # Set to True if you want OpenTelemetry ) app.state.telemetry = telemetry logger.info("✓ Telemetry initialized") # ============================================================ # 8. Initialize MCPOrchestrator # ============================================================ logger.info("🎯 Initializing MCPOrchestrator...") orchestrator = MCPOrchestrator( clickhouse_client=ch, envoy_client=app.state.envoy if envoy_healthy else None, redis_queue=app.state.redis ) await orchestrator.initialize() app.state.orchestrator = orchestrator logger.info("✓ MCPOrchestrator initialized") # ============================================================ # 9. Register upstreams with Envoy (if available) # ============================================================ if envoy_healthy: logger.info("🔗 Registering provider upstreams with Envoy...") try: await envoy_client.register_upstream( "runpod-api", "https://api.runpod.io", {"path": "/", "interval": 60} ) logger.info(" ✓ RunPod upstream registered") await envoy_client.register_upstream( "vastai-api", "https://api.vast.ai", {"path": "/", "interval": 60} ) logger.info(" ✓ Vast.ai upstream registered") except Exception as e: logger.warning(f" ⚠ Failed to register upstreams: {e}") # ============================================================ # Startup Complete # ============================================================ await ch.insert_event("mcp_startup_complete", { "status": "healthy", "agents": persistent_agents, "skypilot": SKY_AVAILABLE, "envoy": envoy_healthy }, source="main") logger.info("=" * 70) logger.info("✅ MCP Orchestrator started successfully!") logger.info("=" * 70) logger.info(f"📍 API: http://localhost:8000") logger.info(f"📖 Docs: http://localhost:8000/docs") logger.info(f"💚 Health: http://localhost:8000/health") logger.info(f"📊 Metrics: http://localhost:8000/metrics") if envoy_healthy: logger.info(f"🌐 Envoy: {envoy_url}") logger.info(f"🔧 Envoy Admin: {envoy_url.replace('8080', '10000')}/") logger.info("=" * 70) logger.info(f"🤖 Active Agents: {', '.join(persistent_agents)}") logger.info("=" * 70) yield # ============================================================ # SHUTDOWN # ============================================================ logger.info("🛑 Shutting down MCP Orchestrator...") # Shutdown orchestrator if hasattr(app.state, "orchestrator"): await app.state.orchestrator.shutdown() try: await ch.insert_event("mcp_shutdown", {"status": "graceful"}, source="main") except Exception: pass logger.info("✅ MCP Orchestrator stopped") # Create FastAPI app app = FastAPI( title="MCP Orchestrator API", description="Multi-Cloud Provisioning Orchestrator with SkyPilot & AI Envoy", version="1.0.0", lifespan=lifespan ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=os.getenv("CORS_ORIGINS", "*").split(","), allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Middleware to add request ID @app.middleware("http") async def add_request_id(request: Request, call_next): """Add request ID for tracing""" import uuid request_id = str(uuid.uuid4()) request.state.request_id = request_id response = await call_next(request) response.headers["X-Request-ID"] = request_id # Add Envoy routing header if available if hasattr(app.state, "envoy") and app.state.envoy: response.headers["X-Envoy-Routed"] = "true" return response # Exception handlers @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.exception(f"Unhandled exception: {exc}") # Log to telemetry if hasattr(app.state, "clickhouse"): try: await app.state.clickhouse.insert_event( "exception", { "path": str(request.url), "method": request.method, "error": str(exc), "request_id": getattr(request.state, "request_id", "unknown") }, source="api" ) except Exception: pass return JSONResponse( status_code=500, content={"error": "Internal server error", "detail": str(exc)} ) # Health check endpoint @app.get("/health") async def health_check(): """Health check endpoint for monitoring""" # Get list of active agents active_agents = list(AgentRegistry._persistent_agents.keys()) if hasattr(AgentRegistry, '_persistent_agents') else [] health = { "status": "healthy", "service": "mcp-orchestrator", "version": "1.0.0", "components": { "skypilot": SKY_AVAILABLE, "envoy": hasattr(app.state, "envoy") and app.state.envoy is not None, "clickhouse": hasattr(app.state, "clickhouse"), "redis": hasattr(app.state, "redis") and app.state.redis is not None, "registry": hasattr(app.state, "registry"), "orchestrator": hasattr(app.state, "orchestrator"), "telemetry": hasattr(app.state, "telemetry") }, "agents": { "active": active_agents, "count": len(active_agents) } } # Check if all critical components are available critical_missing = [] if not health["components"]["skypilot"]: critical_missing.append("SkyPilot SDK") if not health["components"]["orchestrator"]: critical_missing.append("Orchestrator") if not active_agents: critical_missing.append("No agents initialized") if critical_missing: health["status"] = "degraded" if len(critical_missing) < 3 else "unhealthy" health["warnings"] = critical_missing return health @app.get("/") async def root(): """Root endpoint""" active_agents = list(AgentRegistry._persistent_agents.keys()) if hasattr(AgentRegistry, '_persistent_agents') else [] return { "service": "MCP Orchestrator API", "version": "1.0.0", "powered_by": ["PantheonLab"], "providers": { "supported": ["RunPod", "Vast.ai"], "active_agents": active_agents }, "endpoints": { "docs": "/docs", "health": "/health", "metrics": "/metrics", "deploy": "/api/v1/deploy", "providers": "/api/v1/providers/{provider}/deploy", "register": "/api/v1/register" } } # Prometheus metrics endpoint metrics_app = make_asgi_app() app.mount("/metrics", metrics_app) # Include API routes app.include_router(api_router, prefix="/api/v1", tags=["mcp"]) if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", 8000)) workers = int(os.getenv("WORKERS", 1)) # Note: In production, use uvicorn or gunicorn with multiple workers # For development with reload, workers must be 1 reload = os.getenv("RELOAD", "false").lower() == "true" if reload and workers > 1: logger.warning("Reload mode requires workers=1, adjusting") workers = 1 uvicorn.run( "main:app", host="0.0.0.0", port=port, workers=workers, log_level=os.getenv("LOG_LEVEL", "info").lower(), reload=reload )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nomadslayer/infra-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server