"""
FastAPI Server for MCP-MAF
Provides HTTP/SSE endpoints for MCP protocol communication.
"""
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict, Any, Optional
from datetime import datetime
import os
from dotenv import load_dotenv
from .agent import MCPMAFAgent
from .mcp.protocol import MCPProtocol
from .mcp.sse_transport import SSETransport
from .utils.logger import setup_logger
# Load environment variables
load_dotenv()
# Setup logger
logger = setup_logger(__name__)
# Create FastAPI app
app = FastAPI(
title="MCP-MAF Server",
description="Microsoft Agent Framework with MCP Protocol (SSE/HTTP Transport)",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify allowed origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global variables
agent: Optional[MCPMAFAgent] = None
protocol: Optional[MCPProtocol] = None
sse_transport: Optional[SSETransport] = None
@app.on_event("startup")
async def startup():
"""
Server startup initialization
Initializes:
- MCP protocol handler
- SSE transport layer
- Microsoft Agent Framework agent
"""
global agent, protocol, sse_transport
logger.info("🚀 Starting MCP-MAF Server...")
logger.info("=" * 60)
# Check environment variables
azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
api_key = os.getenv("AZURE_OPENAI_API_KEY")
deployment = os.getenv("AZURE_AI_MODEL_DEPLOYMENT_NAME", "gpt-4.1")
if not azure_endpoint or not api_key:
logger.error("❌ Missing required environment variables")
logger.error(" Required: AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_API_KEY")
raise ValueError("Missing Azure OpenAI credentials")
# Initialize MCP protocol handler
protocol = MCPProtocol()
logger.info("✅ MCP Protocol initialized")
# Initialize SSE transport
sse_transport = SSETransport(protocol)
logger.info("✅ SSE Transport initialized")
# Initialize Agent
try:
agent = MCPMAFAgent(
azure_endpoint=azure_endpoint,
api_key=api_key,
deployment_name=deployment
)
await agent.initialize()
logger.info("✅ Microsoft Agent Framework initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize agent: {e}")
logger.warning("⚠️ Server will continue without agent functionality")
logger.info("=" * 60)
logger.info("🎉 MCP-MAF Server Ready!")
logger.info("=" * 60)
logger.info(f"📍 SSE Endpoint: http://localhost:8000/sse")
logger.info(f"📍 Messages Endpoint: http://localhost:8000/messages")
logger.info(f"📍 Health Check: http://localhost:8000/health")
logger.info(f"📍 Chat Endpoint: http://localhost:8000/chat")
logger.info(f"📚 API Docs: http://localhost:8000/docs")
logger.info("=" * 60)
@app.on_event("shutdown")
async def shutdown():
"""Server shutdown cleanup"""
logger.info("🛑 Shutting down MCP-MAF Server...")
# Disconnect all SSE clients
if sse_transport:
await sse_transport.disconnect_all()
# Close agent
if agent:
await agent.close()
logger.info("✅ Server shutdown complete")
@app.get("/")
async def root():
"""
Root endpoint - Server information
Returns server metadata and available endpoints.
"""
return {
"name": "MCP-MAF Server",
"version": "1.0.0",
"description": "Microsoft Agent Framework with MCP Protocol",
"endpoints": {
"sse": "/sse",
"messages": "/messages",
"chat": "/chat",
"health": "/health",
"docs": "/docs"
},
"transport": "SSE (Server-Sent Events)",
"protocol": "MCP (Model Context Protocol)",
"framework": "Microsoft Agent Framework",
"timestamp": datetime.now().isoformat()
}
@app.get("/health")
async def health():
"""
Health check endpoint
Returns server health status and component status.
"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"components": {
"agent_initialized": agent.is_initialized() if agent else False,
"protocol_initialized": protocol is not None,
"sse_transport_initialized": sse_transport is not None,
"connected_clients": len(sse_transport) if sse_transport else 0
}
}
@app.get("/sse")
async def sse_endpoint(request: Request):
"""
SSE (Server-Sent Events) endpoint
This is the main MCP transport endpoint.
Codex CLI and other MCP clients connect here.
Returns:
StreamingResponse with SSE events
"""
if sse_transport is None:
raise HTTPException(500, "SSE transport not initialized")
logger.info(f"📡 New SSE connection from {request.client.host if request.client else 'unknown'}")
return StreamingResponse(
sse_transport.stream_events(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable Nginx buffering
}
)
@app.post("/messages")
async def handle_message(request: Request):
"""
Messages endpoint - JSON-RPC 2.0 handler
Handles MCP protocol messages:
- initialize: Client initialization
- tools/list: List available tools
- tools/call: Execute a tool
- resources/list: List resources
Args:
request: FastAPI Request with JSON-RPC payload
Returns:
JSON-RPC response
"""
try:
data = await request.json()
logger.debug(f"📨 Received message: method={data.get('method')}, id={data.get('id')}")
# Validate request
error = protocol.validate_request(data)
if error:
logger.warning(f"⚠️ Invalid request: {error}")
return JSONResponse(
protocol.handle_error(
data.get("id"),
protocol.ERROR_INVALID_REQUEST,
error
),
status_code=400
)
# Extract request components
request_id = data.get("id")
method = data.get("method")
params = data.get("params", {})
# Route to appropriate handler
if method == "initialize":
response = protocol.handle_initialize(request_id)
logger.info("🔧 Client initialized")
elif method == "tools/list":
tools = await agent.get_tools_list() if agent else []
response = protocol.handle_tools_list(request_id, tools)
logger.info(f"📋 Tools list requested ({len(tools)} tools)")
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
logger.info(f"🔨 Tool call: {tool_name}({arguments})")
try:
if not agent:
raise ValueError("Agent not initialized")
result = await agent.call_tool(tool_name, arguments)
response = protocol.handle_tools_call(request_id, result)
logger.info(f"✅ Tool executed successfully")
except Exception as e:
logger.error(f"❌ Tool execution failed: {e}")
response = protocol.handle_tools_call(
request_id,
None,
error=str(e)
)
elif method == "resources/list":
response = protocol.handle_resources_list(request_id)
logger.debug("📦 Resources list requested")
else:
logger.warning(f"⚠️ Unknown method: {method}")
response = protocol.handle_error(
request_id,
protocol.ERROR_METHOD_NOT_FOUND,
f"Method not found: {method}"
)
return JSONResponse(response)
except Exception as e:
logger.error(f"❌ Message handling failed: {e}")
return JSONResponse(
protocol.handle_error(
None,
protocol.ERROR_INTERNAL_ERROR,
str(e)
),
status_code=500
)
@app.post("/chat")
async def chat(request: Request):
"""
Direct chat endpoint (non-MCP)
Provides a simple HTTP interface for chat without MCP protocol.
Useful for testing and simple integrations.
Request body:
{
"message": "Your message here"
}
Returns:
{
"response": "Agent response",
"timestamp": "2025-11-27T..."
}
"""
try:
data = await request.json()
message = data.get("message")
if not message:
raise HTTPException(400, "Missing 'message' field")
logger.info(f"💬 Chat request: {message[:100]}...")
if not agent:
raise HTTPException(500, "Agent not initialized")
response = await agent.run(message)
return {
"response": response,
"timestamp": datetime.now().isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Chat failed: {e}")
raise HTTPException(500, f"Chat failed: {str(e)}")
def main():
"""Main entry point for running the server"""
import uvicorn
port = int(os.getenv("SERVER_PORT", "8000"))
host = os.getenv("SERVER_HOST", "0.0.0.0")
logger.info(f"🚀 Starting server on {host}:{port}")
uvicorn.run(
app,
host=host,
port=port,
log_level="info"
)
if __name__ == "__main__":
main()