Skip to main content
Glama
kalpalathika

MCP Enhanced Data Retrieval System

by kalpalathika
main.py8.53 kB
""" FastAPI server with MCP protocol support. This is the main entry point for the MCP server. Start here - this file just handles HTTP routing. The actual MCP logic is in mcp_handler.py. """ from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager import structlog from typing import AsyncIterator import json from config import settings from src.server.mcp_handler import MCPHandler from src.auth.oauth import get_oauth_manager # Setup logging logger = structlog.get_logger() @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: """ Runs once when server starts and once when it stops. Think of it like: - Startup: Turn on the lights, open the doors - Shutdown: Turn off the lights, lock the doors """ # STARTUP logger.info("Starting MCP server", version=settings.mcp_version, server_name=settings.mcp_server_name) # Create the MCP handler (handles all the protocol logic) app.state.mcp_handler = MCPHandler() await app.state.mcp_handler.initialize() logger.info("MCP server ready") yield # Server runs here # SHUTDOWN logger.info("Shutting down MCP server") await app.state.mcp_handler.cleanup() logger.info("Goodbye") # Create the FastAPI app app = FastAPI( title="MCP Enhanced Data Retrieval System", description="GitHub data retrieval for AI assistants via MCP protocol", version=settings.mcp_version, lifespan=lifespan ) # Add CORS (so Claude can call us from different domains) app.add_middleware( CORSMiddleware, allow_origins=["*"], # TODO: Restrict in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def root(): """ Simple health check. Visit http://localhost:8000/ to see if server is running. """ return { "name": settings.mcp_server_name, "version": settings.mcp_version, "status": "running", "message": "MCP server is alive!" } @app.get("/health") async def health_check(): """ Detailed health check. Shows what features are available. """ return { "status": "healthy", "server": settings.mcp_server_name, "version": settings.mcp_version, "features": { "github_integration": True, "vector_search": True, "oauth_2_1": True, "context_chunking": True } } @app.post("/mcp/message") async def handle_mcp_message(request: Request): """ THE MAIN ENDPOINT - Claude calls this! Receives JSON-RPC 2.0 requests from Claude. Passes them to MCPHandler for processing. Returns responses back to Claude. Example request from Claude: { "jsonrpc": "2.0", "method": "tools/call", "params": {"name": "github_get_repo", "arguments": {...}}, "id": 1 } """ try: # Get the JSON body body = await request.json() logger.info("Received MCP request", method=body.get("method")) # Get the MCP handler mcp_handler: MCPHandler = request.app.state.mcp_handler # Process the request response = await mcp_handler.handle_request(body) # Return response to Claude return JSONResponse(content=response) except json.JSONDecodeError as e: logger.error("Invalid JSON", error=str(e)) raise HTTPException(status_code=400, detail="Invalid JSON") except Exception as e: logger.error("Error processing request", error=str(e)) raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # OAUTH 2.1 ENDPOINTS # ============================================================================ @app.get("/auth/login") async def oauth_login(): """ Initiate OAuth 2.1 login flow. User visits this endpoint → Gets redirected to GitHub → Approves access Example: http://localhost:8000/auth/login """ oauth_manager = get_oauth_manager() # Generate authorization URL auth_url = oauth_manager.get_authorization_url() logger.info("Redirecting to GitHub for OAuth login") # Redirect user to GitHub from fastapi.responses import RedirectResponse return RedirectResponse(url=auth_url) @app.get("/auth/callback") async def oauth_callback(code: str = None, error: str = None): """ OAuth callback endpoint. GitHub redirects here after user approves/denies access. We exchange the code for an access token. Args: code: Authorization code from GitHub (if approved) error: Error message from GitHub (if denied) """ oauth_manager = get_oauth_manager() # Check if user denied access if error: logger.warning("OAuth authorization denied", error=error) return { "status": "error", "message": f"Authorization denied: {error}" } # Check if code is missing if not code: logger.error("No authorization code received") raise HTTPException(status_code=400, detail="No authorization code received") try: # Exchange code for token token_data = await oauth_manager.exchange_code_for_token(code) logger.info("OAuth login successful") return { "status": "success", "message": "Successfully authenticated with GitHub!", "token_type": token_data.get("token_type"), "scope": token_data.get("scope") } except Exception as e: logger.error("OAuth callback failed", error=str(e)) raise HTTPException(status_code=500, detail=f"OAuth failed: {str(e)}") @app.get("/auth/status") async def oauth_status(): """ Check OAuth authentication status. Returns whether user is authenticated and token details. """ oauth_manager = get_oauth_manager() is_authenticated = oauth_manager.has_valid_token() if is_authenticated: token_data = oauth_manager.storage.load_token() return { "authenticated": True, "token_type": token_data.get("token_type"), "scope": token_data.get("scope"), "stored_at": token_data.get("stored_at") } else: return { "authenticated": False, "message": "Not authenticated. Visit /auth/login to authenticate." } @app.post("/auth/logout") async def oauth_logout(): """ Logout by deleting stored OAuth token. User will need to re-authenticate next time. """ oauth_manager = get_oauth_manager() oauth_manager.logout() logger.info("User logged out") return { "status": "success", "message": "Successfully logged out" } # Error handlers @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): """ Handles HTTP errors (400, 404, 500, etc.) Returns nice JSON error responses. """ logger.warning("HTTP error", status_code=exc.status_code, detail=exc.detail) return JSONResponse( status_code=exc.status_code, content={ "error": { "code": exc.status_code, "message": exc.detail } } ) @app.exception_handler(Exception) async def general_exception_handler(request: Request, exc: Exception): """ Catches any unexpected errors. Prevents server from crashing. """ logger.error("Unexpected error", error=str(exc), exc_info=True) return JSONResponse( status_code=500, content={ "error": { "code": 500, "message": "Internal server error" } } ) # Run the server if this file is executed directly if __name__ == "__main__": import uvicorn print("Starting MCP Enhanced Data Retrieval Server...") print(f"Server will run at http://{settings.server_host}:{settings.server_port}") print(f"API docs at http://{settings.server_host}:{settings.server_port}/docs") print() uvicorn.run( "src.server.main:app", host=settings.server_host, port=settings.server_port, reload=settings.server_reload, log_level=settings.log_level.lower() )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kalpalathika/MCP-Enhanced-Data-Retrieval-System'

If you have feedback or need assistance with the MCP directory API, please join our Discord server