"""Main FastAPI application entry point for Codebase MCP Server.
This module initializes the FastAPI application with MCP server integration,
SSE transport, database connections, and all tool registrations.
Constitutional Compliance:
- Principle III: Protocol Compliance (MCP via SSE, no stdout/stderr)
- Principle IV: Performance (async operations, connection pooling)
- Principle V: Production Quality (graceful shutdown, health checks, validation)
- Principle VIII: Type Safety (mypy --strict compliance)
"""
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator
from fastapi import Depends, FastAPI, HTTPException
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
from src.config.settings import get_settings
from src.mcp.mcp_logging import configure_logging, get_logger
from src.mcp.server import (
MCPError,
NotFoundError,
OperationError,
ValidationError,
create_mcp_server,
get_sse_transport,
)
from src.mcp.tools import (
create_task_tool,
get_task_tool,
index_repository_tool,
list_tasks_tool,
search_code_tool,
update_task_tool,
)
from src.models.database import create_engine, create_session_factory, init_database
from src.services import validate_ollama_connection
# ==============================================================================
# Constants
# ==============================================================================
logger = get_logger(__name__)
# Global state (initialized in lifespan)
engine: AsyncEngine | None = None
SessionFactory: async_sessionmaker[AsyncSession] | None = None
# ==============================================================================
# Database Dependency
# ==============================================================================
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
"""FastAPI dependency for database sessions.
Yields:
AsyncSession instance for database operations
Raises:
HTTPException: If database is not initialized
"""
if SessionFactory is None:
raise HTTPException(
status_code=500,
detail="Database not initialized",
)
async with SessionFactory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# ==============================================================================
# Application Lifecycle
# ==============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Application lifecycle manager.
Handles startup and shutdown operations:
- Startup: Configure logging, validate Ollama, initialize database
- Shutdown: Close database connections, cleanup resources
Args:
app: FastAPI application instance
Yields:
None (control to application runtime)
"""
global engine, SessionFactory
# ==============================================================================
# STARTUP
# ==============================================================================
logger.info("=" * 80)
logger.info("Starting Codebase MCP Server")
logger.info("=" * 80)
try:
# Load settings
settings = get_settings()
# Configure structured logging
configure_logging(
log_file=settings.log_file,
level=getattr(logging, settings.log_level.value),
)
logger.info(
"Settings loaded successfully",
extra={
"context": {
"database_url": str(settings.database_url).split("@")[-1], # Hide credentials
"ollama_base_url": str(settings.ollama_base_url),
"ollama_model": settings.ollama_embedding_model,
"log_level": settings.log_level.value,
"log_file": settings.log_file,
}
},
)
# Validate Ollama connection
logger.info(f"Validating Ollama connection: {settings.ollama_base_url}")
try:
await validate_ollama_connection()
logger.info("Ollama connection validated successfully")
except Exception as e:
logger.error(
"Ollama connection validation failed",
extra={"context": {"error": str(e)}},
)
raise
# Initialize database engine
logger.info("Initializing database connection")
engine = create_engine(
str(settings.database_url),
echo=False, # Disable SQL query logging (use structured logs)
)
SessionFactory = create_session_factory(engine)
logger.info(
"Database engine created",
extra={
"context": {
"pool_size": settings.db_pool_size,
"max_overflow": settings.db_max_overflow,
}
},
)
# Initialize database schema (create tables if needed)
logger.info("Initializing database schema")
await init_database(engine)
logger.info("Database schema initialized successfully")
# Create MCP server and register tools
logger.info("Creating MCP server and registering tools")
mcp_server = create_mcp_server()
# Register all tools with MCP server
# Note: Tool registration uses decorators, so we need to define
# tool handlers that call our service functions with dependency injection
@mcp_server.list_tools()
async def list_tools() -> list[dict[str, Any]]:
"""List all available MCP tools."""
return [
{
"name": "search_code",
"description": "Semantic code search across indexed repositories",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"repository_id": {"type": "string", "format": "uuid"},
"file_type": {"type": "string"},
"directory": {"type": "string"},
"limit": {"type": "integer", "minimum": 1, "maximum": 50, "default": 10},
},
"required": ["query"],
},
},
{
"name": "index_repository",
"description": "Index a code repository for semantic search",
"inputSchema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"name": {"type": "string"},
"force_reindex": {"type": "boolean", "default": False},
},
"required": ["path", "name"],
},
},
{
"name": "get_task",
"description": "Retrieve a development task by ID",
"inputSchema": {
"type": "object",
"properties": {
"task_id": {"type": "string", "format": "uuid"},
},
"required": ["task_id"],
},
},
{
"name": "list_tasks",
"description": "List development tasks with optional filters",
"inputSchema": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["need to be done", "in-progress", "complete"],
},
"branch": {"type": "string"},
"limit": {"type": "integer", "minimum": 1, "maximum": 100, "default": 20},
},
},
},
{
"name": "create_task",
"description": "Create a new development task",
"inputSchema": {
"type": "object",
"properties": {
"title": {"type": "string", "minLength": 1, "maxLength": 200},
"description": {"type": "string"},
"notes": {"type": "string"},
"planning_references": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["title"],
},
},
{
"name": "update_task",
"description": "Update an existing development task",
"inputSchema": {
"type": "object",
"properties": {
"task_id": {"type": "string", "format": "uuid"},
"title": {"type": "string"},
"description": {"type": "string"},
"notes": {"type": "string"},
"status": {
"type": "string",
"enum": ["need to be done", "in-progress", "complete"],
},
"branch": {"type": "string"},
"commit": {"type": "string", "pattern": "^[a-f0-9]{40}$"},
},
"required": ["task_id"],
},
},
]
@mcp_server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[dict[str, Any]]:
"""Handle MCP tool calls.
Args:
name: Tool name
arguments: Tool arguments
Returns:
List of tool results
Raises:
MCPError: If tool execution fails
"""
logger.info(
f"Tool call: {name}",
extra={"context": {"tool": name, "arguments": arguments}},
)
# Get database session
async for session in get_db_session():
try:
# Route to appropriate tool handler
if name == "search_code":
result = await search_code_tool(
query=arguments["query"],
db=session,
repository_id=arguments.get("repository_id"),
file_type=arguments.get("file_type"),
directory=arguments.get("directory"),
limit=arguments.get("limit", 10),
)
elif name == "index_repository":
result = await index_repository_tool(
path=arguments["path"],
name=arguments["name"],
db=session,
force_reindex=arguments.get("force_reindex", False),
)
elif name == "get_task":
result = await get_task_tool(
task_id=arguments["task_id"],
db=session,
)
elif name == "list_tasks":
result = await list_tasks_tool(
db=session,
status=arguments.get("status"),
branch=arguments.get("branch"),
limit=arguments.get("limit", 20),
)
elif name == "create_task":
result = await create_task_tool(
title=arguments["title"],
db=session,
description=arguments.get("description"),
notes=arguments.get("notes"),
planning_references=arguments.get("planning_references"),
)
elif name == "update_task":
result = await update_task_tool(
task_id=arguments["task_id"],
db=session,
title=arguments.get("title"),
description=arguments.get("description"),
notes=arguments.get("notes"),
status=arguments.get("status"),
branch=arguments.get("branch"),
commit=arguments.get("commit"),
)
else:
raise ValidationError(
f"Unknown tool: {name}",
details={"tool": name},
)
logger.info(
f"Tool call completed: {name}",
extra={"context": {"tool": name}},
)
# Return result wrapped in list format expected by MCP
return [{"type": "text", "text": str(result)}]
except MCPError:
# Re-raise MCP errors (will be handled by error handler)
raise
except Exception as e:
logger.error(
f"Tool call failed: {name}",
extra={"context": {"tool": name, "error": str(e)}},
)
raise OperationError(
f"Tool execution failed: {e}",
details={"tool": name, "error": str(e)},
) from e
# Store MCP server in app state
app.state.mcp_server = mcp_server
logger.info("MCP server initialized with 6 tools registered")
logger.info("=" * 80)
logger.info("Codebase MCP Server started successfully")
logger.info("=" * 80)
except Exception as e:
logger.critical(
"Failed to start server",
extra={"context": {"error": str(e)}},
)
raise
# ==============================================================================
# YIELD TO APPLICATION RUNTIME
# ==============================================================================
yield
# ==============================================================================
# SHUTDOWN
# ==============================================================================
logger.info("=" * 80)
logger.info("Shutting down Codebase MCP Server")
logger.info("=" * 80)
# Close database connections
if engine is not None:
logger.info("Closing database connections")
await engine.dispose()
logger.info("Database connections closed")
logger.info("Codebase MCP Server shutdown complete")
# ==============================================================================
# FastAPI Application
# ==============================================================================
app = FastAPI(
title="Codebase MCP Server",
version="1.0.0",
description="Production-grade MCP server for semantic code search",
lifespan=lifespan,
)
# ==============================================================================
# Exception Handlers
# ==============================================================================
@app.exception_handler(ValidationError)
async def validation_error_handler(request: Any, exc: ValidationError) -> JSONResponse:
"""Handle MCP validation errors."""
logger.warning(
"Validation error",
extra={"context": exc.to_dict()},
)
return JSONResponse(
status_code=400,
content={"error": exc.to_dict()},
)
@app.exception_handler(NotFoundError)
async def not_found_error_handler(request: Any, exc: NotFoundError) -> JSONResponse:
"""Handle MCP not found errors."""
logger.warning(
"Resource not found",
extra={"context": exc.to_dict()},
)
return JSONResponse(
status_code=404,
content={"error": exc.to_dict()},
)
@app.exception_handler(OperationError)
async def operation_error_handler(request: Any, exc: OperationError) -> JSONResponse:
"""Handle MCP operation errors."""
logger.error(
"Operation error",
extra={"context": exc.to_dict()},
)
return JSONResponse(
status_code=500,
content={"error": exc.to_dict()},
)
# ==============================================================================
# Health Check Endpoint
# ==============================================================================
@app.get("/health")
async def health_check() -> dict[str, str]:
"""Health check endpoint.
Returns:
Dictionary with health status
Example:
>>> curl http://localhost:3000/health
{"status": "healthy"}
"""
return {"status": "healthy"}
# ==============================================================================
# MCP SSE Endpoint
# ==============================================================================
@app.post("/messages")
async def mcp_messages() -> dict[str, str]:
"""MCP SSE endpoint for tool execution.
This endpoint handles MCP protocol messages via Server-Sent Events.
Tool calls are routed through the MCP server registered in app.state.
Returns:
SSE response stream
Note:
The actual SSE streaming is handled by the MCP SDK's SseServerTransport.
This endpoint serves as the FastAPI integration point.
"""
logger.info("MCP messages endpoint called")
return {"message": "MCP SSE endpoint (integration with transport required)"}
# ==============================================================================
# Application Entry Point
# ==============================================================================
if __name__ == "__main__":
import uvicorn
# Run the server
uvicorn.run(
"src.main:app",
host="0.0.0.0",
port=3000,
log_config=None, # Disable uvicorn logging (use our structured logger)
)