"""
FastAPI server with MCP protocol support.
This is the main entry point for the MCP server.
Start here - this file just handles HTTP routing.
The actual MCP logic is in mcp_handler.py.
"""
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import structlog
from typing import AsyncIterator
import json
from config import settings
from src.server.mcp_handler import MCPHandler
from src.auth.oauth import get_oauth_manager
# Setup logging
logger = structlog.get_logger()
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""
Runs once when server starts and once when it stops.
Think of it like:
- Startup: Turn on the lights, open the doors
- Shutdown: Turn off the lights, lock the doors
"""
# STARTUP
logger.info("Starting MCP server",
version=settings.mcp_version,
server_name=settings.mcp_server_name)
# Create the MCP handler (handles all the protocol logic)
app.state.mcp_handler = MCPHandler()
await app.state.mcp_handler.initialize()
logger.info("MCP server ready")
yield # Server runs here
# SHUTDOWN
logger.info("Shutting down MCP server")
await app.state.mcp_handler.cleanup()
logger.info("Goodbye")
# Create the FastAPI app
app = FastAPI(
title="MCP Enhanced Data Retrieval System",
description="GitHub data retrieval for AI assistants via MCP protocol",
version=settings.mcp_version,
lifespan=lifespan
)
# Add CORS (so Claude can call us from different domains)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # TODO: Restrict in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
"""
Simple health check.
Visit http://localhost:8000/ to see if server is running.
"""
return {
"name": settings.mcp_server_name,
"version": settings.mcp_version,
"status": "running",
"message": "MCP server is alive!"
}
@app.get("/health")
async def health_check():
"""
Detailed health check.
Shows what features are available.
"""
return {
"status": "healthy",
"server": settings.mcp_server_name,
"version": settings.mcp_version,
"features": {
"github_integration": True,
"vector_search": True,
"oauth_2_1": True,
"context_chunking": True
}
}
@app.post("/mcp/message")
async def handle_mcp_message(request: Request):
"""
THE MAIN ENDPOINT - Claude calls this!
Receives JSON-RPC 2.0 requests from Claude.
Passes them to MCPHandler for processing.
Returns responses back to Claude.
Example request from Claude:
{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {"name": "github_get_repo", "arguments": {...}},
"id": 1
}
"""
try:
# Get the JSON body
body = await request.json()
logger.info("Received MCP request", method=body.get("method"))
# Get the MCP handler
mcp_handler: MCPHandler = request.app.state.mcp_handler
# Process the request
response = await mcp_handler.handle_request(body)
# Return response to Claude
return JSONResponse(content=response)
except json.JSONDecodeError as e:
logger.error("Invalid JSON", error=str(e))
raise HTTPException(status_code=400, detail="Invalid JSON")
except Exception as e:
logger.error("Error processing request", error=str(e))
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# OAUTH 2.1 ENDPOINTS
# ============================================================================
@app.get("/auth/login")
async def oauth_login():
"""
Initiate OAuth 2.1 login flow.
User visits this endpoint → Gets redirected to GitHub → Approves access
Example: http://localhost:8000/auth/login
"""
oauth_manager = get_oauth_manager()
# Generate authorization URL
auth_url = oauth_manager.get_authorization_url()
logger.info("Redirecting to GitHub for OAuth login")
# Redirect user to GitHub
from fastapi.responses import RedirectResponse
return RedirectResponse(url=auth_url)
@app.get("/auth/callback")
async def oauth_callback(code: str = None, error: str = None):
"""
OAuth callback endpoint.
GitHub redirects here after user approves/denies access.
We exchange the code for an access token.
Args:
code: Authorization code from GitHub (if approved)
error: Error message from GitHub (if denied)
"""
oauth_manager = get_oauth_manager()
# Check if user denied access
if error:
logger.warning("OAuth authorization denied", error=error)
return {
"status": "error",
"message": f"Authorization denied: {error}"
}
# Check if code is missing
if not code:
logger.error("No authorization code received")
raise HTTPException(status_code=400, detail="No authorization code received")
try:
# Exchange code for token
token_data = await oauth_manager.exchange_code_for_token(code)
logger.info("OAuth login successful")
return {
"status": "success",
"message": "Successfully authenticated with GitHub!",
"token_type": token_data.get("token_type"),
"scope": token_data.get("scope")
}
except Exception as e:
logger.error("OAuth callback failed", error=str(e))
raise HTTPException(status_code=500, detail=f"OAuth failed: {str(e)}")
@app.get("/auth/status")
async def oauth_status():
"""
Check OAuth authentication status.
Returns whether user is authenticated and token details.
"""
oauth_manager = get_oauth_manager()
is_authenticated = oauth_manager.has_valid_token()
if is_authenticated:
token_data = oauth_manager.storage.load_token()
return {
"authenticated": True,
"token_type": token_data.get("token_type"),
"scope": token_data.get("scope"),
"stored_at": token_data.get("stored_at")
}
else:
return {
"authenticated": False,
"message": "Not authenticated. Visit /auth/login to authenticate."
}
@app.post("/auth/logout")
async def oauth_logout():
"""
Logout by deleting stored OAuth token.
User will need to re-authenticate next time.
"""
oauth_manager = get_oauth_manager()
oauth_manager.logout()
logger.info("User logged out")
return {
"status": "success",
"message": "Successfully logged out"
}
# Error handlers
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""
Handles HTTP errors (400, 404, 500, etc.)
Returns nice JSON error responses.
"""
logger.warning("HTTP error",
status_code=exc.status_code,
detail=exc.detail)
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"code": exc.status_code,
"message": exc.detail
}
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""
Catches any unexpected errors.
Prevents server from crashing.
"""
logger.error("Unexpected error", error=str(exc), exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": {
"code": 500,
"message": "Internal server error"
}
}
)
# Run the server if this file is executed directly
if __name__ == "__main__":
import uvicorn
print("Starting MCP Enhanced Data Retrieval Server...")
print(f"Server will run at http://{settings.server_host}:{settings.server_port}")
print(f"API docs at http://{settings.server_host}:{settings.server_port}/docs")
print()
uvicorn.run(
"src.server.main:app",
host=settings.server_host,
port=settings.server_port,
reload=settings.server_reload,
log_level=settings.log_level.lower()
)