main.py•14.4 kB
"""
MCP Orchestrator - Main FastAPI Application
Entry point with:
- SkyPilot Python SDK initialization
- AI Envoy Gateway integration
- Agent initialization (RunPod & Vast.ai)
- MCPOrchestrator initialization
- Telemetry and observability
"""
import logging
import os
import sys
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import make_asgi_app
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from src.api.routes import router as api_router
from src.agents.registry import AgentRegistry
from src.core.clickhouse_client import ClickhouseClient
from src.agents.engine import RulesEngine
from src.core.envoy_client import EnvoyClient
from src.core.orchestrator import MCPOrchestrator
from src.core.redis_queue import RedisQueue
from src.core.telemetry import TelemetryClient, initialize_telemetry
# Configure logging
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Check SkyPilot availability
try:
import sky
logger.info(f"✓ SkyPilot version {sky.__version__} loaded successfully")
SKY_AVAILABLE = True
except ImportError as e:
logger.error(f"✗ SkyPilot not available: {e}")
logger.error("Install with: pip install 'skypilot[runpod]'")
SKY_AVAILABLE = False
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifecycle manager for startup and shutdown"""
# ============================================================
# STARTUP
# ============================================================
logger.info("=" * 70)
logger.info("🚀 Starting MCP Orchestrator with SkyPilot & AI Envoy")
logger.info("=" * 70)
# 1. Initialize ClickHouse
logger.info("📊 Initializing ClickHouse...")
ch = ClickhouseClient(
host=os.getenv("CLICKHOUSE_HOST", "localhost"),
port=int(os.getenv("CLICKHOUSE_PORT", 9000)),
database=os.getenv("CLICKHOUSE_DB", "mcp")
)
app.state.clickhouse = ch
try:
await ch.insert_event("mcp_startup", {
"status": "initializing",
"skypilot_available": SKY_AVAILABLE
}, source="main")
logger.info("✓ ClickHouse connected")
except Exception as e:
logger.warning(f"⚠ ClickHouse connection issue: {e}")
# 2. Initialize AI Envoy Gateway client
logger.info("🌐 Connecting to AI Envoy Gateway...")
envoy_url = os.getenv("ENVOY_GATEWAY_URL", "http://envoy:8080")
envoy_client = EnvoyClient(gateway_url=envoy_url)
envoy_healthy = await envoy_client.health_check()
if envoy_healthy:
logger.info("✓ AI Envoy Gateway connected")
app.state.envoy = envoy_client
else:
logger.warning("⚠ AI Envoy Gateway not available - running in direct mode")
app.state.envoy = None
# 3. Initialize Redis Queue (optional)
logger.info("🔴 Connecting to Redis...")
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
try:
redis_queue = RedisQueue(redis_url)
await redis_queue.connect()
app.state.redis = redis_queue
logger.info("✓ Redis queue connected")
except Exception as e:
logger.warning(f"⚠ Redis not available: {e}")
app.state.redis = None
# 4. Verify SkyPilot
if SKY_AVAILABLE:
logger.info("☁️ SkyPilot SDK ready")
else:
logger.error("✗ SkyPilot SDK not available - deployments will fail!")
# ============================================================
# 5. Initialize AgentRegistry with AGENTS
# ============================================================
logger.info("🤖 Initializing Agent Registry...")
# Prepare credentials for agent preloading
preload_creds = {}
# RunPod Agent - can be preloaded if API key available
if os.getenv("RUNPOD_API_KEY"):
preload_creds["runpod"] = {"api_key": os.getenv("RUNPOD_API_KEY")}
logger.info(" → RunPod API key loaded")
else:
logger.warning(" ⚠ RUNPOD_API_KEY not set")
# Vast.ai Agent - typically used per-request for security
# But can be preloaded if you want a persistent agent
if os.getenv("VASTAI_API_KEY"):
# Option 1: Preload for persistent agent (credentials stay in memory)
# preload_creds["vastai"] = {"api_key": os.getenv("VASTAI_API_KEY")}
# logger.info(" → Vast.ai agent preloaded (persistent)")
# Option 2: Don't preload - create transient agents per request (more secure)
logger.info(" → Vast.ai API key available (per-request mode)")
else:
logger.warning(" ⚠ VASTAI_API_KEY not set")
# Initialize registry with agents
AgentRegistry.init(preload=preload_creds)
app.state.registry = AgentRegistry
# Verify agents were added
persistent_agents = list(AgentRegistry._persistent_agents.keys())
logger.info(f"✓ Agent Registry initialized")
logger.info(f" Persistent agents: {persistent_agents}")
if "runpod" in persistent_agents:
logger.info(" ✓ RunPod agent ready for deployments")
else:
logger.warning(" ⚠ RunPod agent not initialized")
if "vastai" in persistent_agents:
logger.info(" ✓ Vast.ai agent ready (persistent mode)")
else:
logger.info(" → Vast.ai agent will be created per-request (transient mode)")
# ============================================================
# 6. Load GPU Rules Engine
# ============================================================
logger.info("⚙️ Loading GPU Rules Engine...")
rules_engine = RulesEngine()
await rules_engine.load_rules()
app.state.rules_engine = rules_engine
logger.info(f"✓ Rules engine loaded ({len(rules_engine._rules_cache)} rules)")
# ============================================================
# 7. Initialize Telemetry
# ============================================================
logger.info("📡 Initializing Telemetry...")
telemetry = initialize_telemetry(
clickhouse_client=ch,
envoy_client=app.state.envoy if envoy_healthy else None,
enable_prometheus=True,
enable_otel=False # Set to True if you want OpenTelemetry
)
app.state.telemetry = telemetry
logger.info("✓ Telemetry initialized")
# ============================================================
# 8. Initialize MCPOrchestrator
# ============================================================
logger.info("🎯 Initializing MCPOrchestrator...")
orchestrator = MCPOrchestrator(
clickhouse_client=ch,
envoy_client=app.state.envoy if envoy_healthy else None,
redis_queue=app.state.redis
)
await orchestrator.initialize()
app.state.orchestrator = orchestrator
logger.info("✓ MCPOrchestrator initialized")
# ============================================================
# 9. Register upstreams with Envoy (if available)
# ============================================================
if envoy_healthy:
logger.info("🔗 Registering provider upstreams with Envoy...")
try:
await envoy_client.register_upstream(
"runpod-api",
"https://api.runpod.io",
{"path": "/", "interval": 60}
)
logger.info(" ✓ RunPod upstream registered")
await envoy_client.register_upstream(
"vastai-api",
"https://api.vast.ai",
{"path": "/", "interval": 60}
)
logger.info(" ✓ Vast.ai upstream registered")
except Exception as e:
logger.warning(f" ⚠ Failed to register upstreams: {e}")
# ============================================================
# Startup Complete
# ============================================================
await ch.insert_event("mcp_startup_complete", {
"status": "healthy",
"agents": persistent_agents,
"skypilot": SKY_AVAILABLE,
"envoy": envoy_healthy
}, source="main")
logger.info("=" * 70)
logger.info("✅ MCP Orchestrator started successfully!")
logger.info("=" * 70)
logger.info(f"📍 API: http://localhost:8000")
logger.info(f"📖 Docs: http://localhost:8000/docs")
logger.info(f"💚 Health: http://localhost:8000/health")
logger.info(f"📊 Metrics: http://localhost:8000/metrics")
if envoy_healthy:
logger.info(f"🌐 Envoy: {envoy_url}")
logger.info(f"🔧 Envoy Admin: {envoy_url.replace('8080', '10000')}/")
logger.info("=" * 70)
logger.info(f"🤖 Active Agents: {', '.join(persistent_agents)}")
logger.info("=" * 70)
yield
# ============================================================
# SHUTDOWN
# ============================================================
logger.info("🛑 Shutting down MCP Orchestrator...")
# Shutdown orchestrator
if hasattr(app.state, "orchestrator"):
await app.state.orchestrator.shutdown()
try:
await ch.insert_event("mcp_shutdown", {"status": "graceful"}, source="main")
except Exception:
pass
logger.info("✅ MCP Orchestrator stopped")
# Create FastAPI app
app = FastAPI(
title="MCP Orchestrator API",
description="Multi-Cloud Provisioning Orchestrator with SkyPilot & AI Envoy",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=os.getenv("CORS_ORIGINS", "*").split(","),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Middleware to add request ID
@app.middleware("http")
async def add_request_id(request: Request, call_next):
"""Add request ID for tracing"""
import uuid
request_id = str(uuid.uuid4())
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
# Add Envoy routing header if available
if hasattr(app.state, "envoy") and app.state.envoy:
response.headers["X-Envoy-Routed"] = "true"
return response
# Exception handlers
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.exception(f"Unhandled exception: {exc}")
# Log to telemetry
if hasattr(app.state, "clickhouse"):
try:
await app.state.clickhouse.insert_event(
"exception",
{
"path": str(request.url),
"method": request.method,
"error": str(exc),
"request_id": getattr(request.state, "request_id", "unknown")
},
source="api"
)
except Exception:
pass
return JSONResponse(
status_code=500,
content={"error": "Internal server error", "detail": str(exc)}
)
# Health check endpoint
@app.get("/health")
async def health_check():
"""Health check endpoint for monitoring"""
# Get list of active agents
active_agents = list(AgentRegistry._persistent_agents.keys()) if hasattr(AgentRegistry, '_persistent_agents') else []
health = {
"status": "healthy",
"service": "mcp-orchestrator",
"version": "1.0.0",
"components": {
"skypilot": SKY_AVAILABLE,
"envoy": hasattr(app.state, "envoy") and app.state.envoy is not None,
"clickhouse": hasattr(app.state, "clickhouse"),
"redis": hasattr(app.state, "redis") and app.state.redis is not None,
"registry": hasattr(app.state, "registry"),
"orchestrator": hasattr(app.state, "orchestrator"),
"telemetry": hasattr(app.state, "telemetry")
},
"agents": {
"active": active_agents,
"count": len(active_agents)
}
}
# Check if all critical components are available
critical_missing = []
if not health["components"]["skypilot"]:
critical_missing.append("SkyPilot SDK")
if not health["components"]["orchestrator"]:
critical_missing.append("Orchestrator")
if not active_agents:
critical_missing.append("No agents initialized")
if critical_missing:
health["status"] = "degraded" if len(critical_missing) < 3 else "unhealthy"
health["warnings"] = critical_missing
return health
@app.get("/")
async def root():
"""Root endpoint"""
active_agents = list(AgentRegistry._persistent_agents.keys()) if hasattr(AgentRegistry, '_persistent_agents') else []
return {
"service": "MCP Orchestrator API",
"version": "1.0.0",
"powered_by": ["PantheonLab"],
"providers": {
"supported": ["RunPod", "Vast.ai"],
"active_agents": active_agents
},
"endpoints": {
"docs": "/docs",
"health": "/health",
"metrics": "/metrics",
"deploy": "/api/v1/deploy",
"providers": "/api/v1/providers/{provider}/deploy",
"register": "/api/v1/register"
}
}
# Prometheus metrics endpoint
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
# Include API routes
app.include_router(api_router, prefix="/api/v1", tags=["mcp"])
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 8000))
workers = int(os.getenv("WORKERS", 1))
# Note: In production, use uvicorn or gunicorn with multiple workers
# For development with reload, workers must be 1
reload = os.getenv("RELOAD", "false").lower() == "true"
if reload and workers > 1:
logger.warning("Reload mode requires workers=1, adjusting")
workers = 1
uvicorn.run(
"main:app",
host="0.0.0.0",
port=port,
workers=workers,
log_level=os.getenv("LOG_LEVEL", "info").lower(),
reload=reload
)