Skip to main content
Glama

Codebase MCP Server

by Ravenight13
session.py45.6 kB
"""Async SQLAlchemy session factory for PostgreSQL with database-per-project architecture. This module provides production-grade database session management with: - Database-per-project isolation instead of schema-based isolation - Registry database for project metadata and lookups - Per-project connection pools with AsyncPG - Context manager pattern for automatic transaction management - Session-based project resolution via .codebase-mcp/config.json - Proper cleanup and error handling - Health check functionality for monitoring Constitutional Compliance: - Principle IV: Performance (connection pooling, async operations, 30s timeout) - Principle V: Production quality (comprehensive error handling, graceful cleanup) - Principle VIII: Type safety (mypy --strict compliance, complete type annotations) - Principle XI: FastMCP Foundation (async session for MCP tool integration) Usage: >>> from src.database.session import get_session, check_database_health >>> # In MCP tools: >>> async with get_session(ctx=ctx) as session: ... result = await session.execute(select(Repository)) ... # Session automatically commits on success, rolls back on error >>> # Health monitoring: >>> is_healthy = await check_database_health() """ from __future__ import annotations import os from contextlib import asynccontextmanager from typing import AsyncGenerator, Dict import asyncpg from sqlalchemy.ext.asyncio import ( AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine, ) from sqlalchemy.pool import NullPool from sqlalchemy.sql import text from src.mcp.mcp_logging import get_logger from src.config.settings import Settings, get_settings # Session-based project resolution imports from pathlib import Path from fastmcp import Context from src.auto_switch.session_context import get_session_context_manager from src.auto_switch.discovery import find_config_file from src.auto_switch.validation import validate_config_syntax from src.auto_switch.cache import get_config_cache # Database provisioning and registry imports from src.database.provisioning import create_pool # ============================================================================== # Module Configuration # ============================================================================== logger = get_logger(__name__) # Registry database URL from environment with secure default # Note: REGISTRY_DATABASE_URL should NEVER fall back to DATABASE_URL since they serve different purposes # DATABASE_URL is for legacy single-database mode, REGISTRY_DATABASE_URL is for project metadata REGISTRY_DATABASE_URL: str = os.getenv( "REGISTRY_DATABASE_URL", "postgresql+asyncpg://localhost/codebase_mcp_registry" ) # SQL echo mode for debugging (disabled in production for performance) SQL_ECHO: bool = os.getenv("SQL_ECHO", "false").lower() == "true" # Connection pool configuration for project databases POOL_MIN_SIZE: int = int(os.getenv("POOL_MIN_SIZE", "2")) POOL_MAX_SIZE: int = int(os.getenv("POOL_MAX_SIZE", "10")) # Default project database name DEFAULT_PROJECT_DB: str = "cb_proj_default_00000000" # ============================================================================== # Global Connection Pools # ============================================================================== # Registry database pool (initialized on first use) _registry_pool: asyncpg.Pool | None = None # Per-project database pools: {database_name: asyncpg.Pool} _project_pools: Dict[str, asyncpg.Pool] = {} # Legacy global engine for backward compatibility (will be phased out) # This is kept to avoid breaking existing code that imports it directly DATABASE_URL: str = os.getenv( "DATABASE_URL", "postgresql+asyncpg://localhost/codebase_mcp" ) engine: AsyncEngine = create_async_engine( DATABASE_URL, echo=SQL_ECHO, poolclass=NullPool, # Disabled pooling (using AsyncPG pools instead) ) logger.info( "Legacy async database engine created (no pooling - will be phased out)", extra={ "context": { "operation": "engine_creation", "database_host": DATABASE_URL.split("@")[-1] if "@" in DATABASE_URL else "localhost", "note": "Use get_session() for production access" } }, ) # Legacy session factory (kept for backward compatibility) SessionLocal: async_sessionmaker[AsyncSession] = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, ) # ============================================================================== # Registry Pool Initialization # ============================================================================== async def _initialize_registry_pool() -> asyncpg.Pool: """Initialize the registry database connection pool. Creates a connection pool to the codebase_mcp_registry database for project metadata lookups. This pool is used to map project_id/name to database_name. Returns: AsyncPG connection pool for registry database Raises: asyncpg.PostgresError: If pool creation fails """ global _registry_pool if _registry_pool is not None: return _registry_pool # Extract database name from registry URL # Format: postgresql+asyncpg://user:pass@host:port/database if "@" in REGISTRY_DATABASE_URL: db_part = REGISTRY_DATABASE_URL.split("@")[1] registry_db_name = db_part.split("/")[-1] if "/" in db_part else "codebase_mcp_registry" else: registry_db_name = "codebase_mcp_registry" logger.info( "Initializing registry database pool", extra={ "context": { "operation": "initialize_registry_pool", "database_name": registry_db_name, "min_size": POOL_MIN_SIZE, "max_size": POOL_MAX_SIZE, } }, ) try: # Use provisioning utility to create pool _registry_pool = await create_pool( database_name=registry_db_name, min_size=POOL_MIN_SIZE, max_size=POOL_MAX_SIZE, ) logger.info( "Registry database pool initialized successfully", extra={ "context": { "operation": "initialize_registry_pool", "database_name": registry_db_name, } }, ) return _registry_pool except asyncpg.PostgresError as e: logger.error( "Failed to initialize registry database pool", extra={ "context": { "operation": "initialize_registry_pool", "database_name": registry_db_name, "error": str(e), "error_type": type(e).__name__, } }, exc_info=True, ) raise # ============================================================================== # Per-Project Pool Management # ============================================================================== async def get_or_create_project_pool(database_name: str) -> asyncpg.Pool: """Get or create a connection pool for a specific project database. Manages per-project connection pools with lazy initialization. Pools are cached in module-level dict for reuse across sessions. Args: database_name: Project database name (cb_proj_*) Returns: AsyncPG connection pool for project database Raises: asyncpg.PostgresError: If pool creation fails Example: >>> pool = await get_or_create_project_pool("cb_proj_my_project_abc123de") >>> async with pool.acquire() as conn: ... result = await conn.fetch("SELECT * FROM repositories") """ # Check if pool already exists if database_name in _project_pools: logger.debug( f"Using existing pool for database: {database_name}", extra={ "context": { "operation": "get_or_create_project_pool", "database_name": database_name, "pool_exists": True, } }, ) return _project_pools[database_name] logger.info( f"Creating new pool for database: {database_name}", extra={ "context": { "operation": "get_or_create_project_pool", "database_name": database_name, "min_size": POOL_MIN_SIZE, "max_size": POOL_MAX_SIZE, } }, ) try: # Use provisioning utility to create pool pool = await create_pool( database_name=database_name, min_size=POOL_MIN_SIZE, max_size=POOL_MAX_SIZE, ) # Cache pool for reuse _project_pools[database_name] = pool logger.info( f"Project pool created successfully: {database_name}", extra={ "context": { "operation": "get_or_create_project_pool", "database_name": database_name, "total_pools": len(_project_pools), } }, ) return pool except asyncpg.PostgresError as e: logger.error( f"Failed to create pool for database: {database_name}", extra={ "context": { "operation": "get_or_create_project_pool", "database_name": database_name, "error": str(e), "error_type": type(e).__name__, } }, exc_info=True, ) raise # ============================================================================== # Project Resolution Utility # ============================================================================== async def _resolve_project_context( ctx: Context | None ) -> tuple[str, str] | None: """Resolve project from session config (graceful fallback). Resolution algorithm: 1. Get session_id from FastMCP Context (explicit, async-safe) 2. Look up working_directory for that session 3. Search for .codebase-mcp/config.json (up to 20 levels) 4. Check cache (async LRU with mtime invalidation) 5. Parse and validate config (if cache miss) 6. Extract project.name or project.id 7. Look up project in registry database to get database_name 8. Return (project_id, database_name) Args: ctx: FastMCP Context (contains session_id) Returns: Tuple of (project_id, database_name) or None if resolution fails (None enables graceful fallback to other resolution methods) Note: Does NOT raise exceptions - returns None for graceful fallback """ # Must have context for session-based resolution if ctx is None: logger.debug("No FastMCP Context provided, cannot resolve from session") return None # Get session-specific working directory session_id = ctx.session_id # ✅ From FastMCP Context (explicit) session_ctx_mgr = get_session_context_manager() try: working_dir = await session_ctx_mgr.get_working_directory(session_id) except Exception as e: logger.debug(f"Error getting working directory for session {session_id}: {e}") return None if not working_dir: logger.debug( f"No working directory set for session {session_id}. " f"Call set_working_directory() first for session-based resolution." ) return None # Check cache first cache = get_config_cache() config = None config_path = None try: cache_result = await cache.get(working_dir) if cache_result is not None: config, config_path = cache_result except Exception as e: logger.debug(f"Cache lookup failed for {working_dir}: {e}") config = None config_path = None if config is None: # Cache miss: search for config file try: config_path = find_config_file(Path(working_dir)) except Exception as e: logger.debug(f"Config file search failed for {working_dir}: {e}") return None if not config_path: logger.debug( f"No .codebase-mcp/config.json found in {working_dir} or ancestors " f"(searched up to 20 levels)" ) return None # Parse and validate config try: config = validate_config_syntax(config_path) except ValueError as e: logger.debug(f"Config validation failed for {config_path}: {e}") return None except Exception as e: logger.error(f"Unexpected error validating config {config_path}: {e}") return None # Store in cache try: await cache.set(working_dir, config, config_path) except Exception as e: logger.debug(f"Failed to cache config for {working_dir}: {e}") # Continue anyway (caching is optional) # At this point, we must have both config and config_path # Either from cache hit or from cache miss + filesystem search assert config is not None and config_path is not None, \ "Internal error: config and config_path should be set by this point" # Get or create project from config using auto-create module try: from src.database.auto_create import get_or_create_project_from_config project = await get_or_create_project_from_config( config_path=config_path, registry=None # Uses singleton registry ) logger.debug( f"Resolved project from config: {project.name}", extra={ "context": { "operation": "_resolve_project_context", "project_id": project.project_id, "database_name": project.database_name, } } ) return (project.project_id, project.database_name) except Exception as e: logger.error( f"Failed to get/create project from config: {e}", extra={ "context": { "operation": "_resolve_project_context", "config_path": str(config_path), "error": str(e), } }, exc_info=True, ) return None async def resolve_project_id( explicit_id: str | None = None, settings: Settings | None = None, ctx: Context | None = None, ) -> tuple[str, str]: """Resolve project_id and database_name with 4-tier resolution chain. Implements the multi-project workspace resolution strategy with four-tier fallback logic to determine which workspace context to use for database operations. Resolution Order (FR-012, FR-013, FR-014): 1. **Explicit ID**: If explicit_id is provided, look up in registry (highest priority, user-specified context) 2. **Session-based config**: Query .codebase-mcp/config.json via FastMCP Context 3. **workflow-mcp Integration**: Query external workflow-mcp server for active project context with timeout protection 4. **Default Workspace**: Fallback to default project database when all resolution methods are unavailable, timeout occurs, or no active project exists Args: explicit_id: Explicitly provided project identifier (from MCP tool parameters). Takes precedence over all other resolution methods. settings: Optional Settings instance for workflow-mcp configuration. If not provided, uses global settings singleton. ctx: Optional FastMCP Context for session-based config resolution. Returns: Tuple of (project_id, database_name): - project_id: Project UUID or name - database_name: Physical PostgreSQL database name (cb_proj_*) Error Handling: All workflow-mcp errors are handled gracefully by returning default: - Timeout: workflow-mcp query exceeds timeout threshold - Connection refused: workflow-mcp server not running - Invalid response: malformed JSON or unexpected error Performance: - Explicit ID: <10ms (registry lookup) - Session config: <1ms (cached) or <60ms (uncached) - workflow-mcp query: <1000ms (timeout protection) - Default fallback: <1ms (constant) Example: >>> # Explicit ID takes precedence >>> project_id, db_name = await resolve_project_id("client-a") >>> assert db_name.startswith("cb_proj_") >>> # Query workflow-mcp when no explicit ID >>> project_id, db_name = await resolve_project_id(None, ctx=ctx) >>> # Returns active project or default if unavailable Constitutional Compliance: - Principle II: Local-first (graceful degradation when workflow-mcp unavailable) - Principle V: Production quality (comprehensive error handling) - Principle VIII: Type safety (mypy --strict compliance) Functional Requirements: - FR-012: Detect workflow-mcp active project with caching - FR-013: Handle workflow-mcp unavailability gracefully - FR-014: TTL-based cache invalidation (60s default) """ # 1. Explicit ID takes precedence (highest priority) if explicit_id is not None: logger.info( "Attempting Tier 1: Explicit ID resolution", extra={ "context": { "operation": "resolve_project_id", "tier_name": "explicit_id", "tier_number": 1, "explicit_id": explicit_id, "session_id": ctx.session_id if ctx else None, } }, ) logger.debug( "Using explicit project_id, looking up in registry", extra={ "context": { "operation": "resolve_project_id", "project_id": explicit_id, "resolution_method": "explicit", } }, ) try: registry_pool = await _initialize_registry_pool() async with registry_pool.acquire() as conn: row = await conn.fetchrow( """ SELECT id, database_name FROM projects WHERE id::text = $1 OR name = $1 LIMIT 1 """, explicit_id ) if row is None: raise ValueError( f"Project '{explicit_id}' not found in registry. " f"To create this project:\n" f"1. Add .codebase-mcp/config.json with project.id = '{explicit_id}'\n" f"2. Call set_working_directory() to enable auto-creation" ) else: # Found in registry - return immediately logger.info( "✓ Tier 1 success: Resolved via explicit ID", extra={ "context": { "operation": "resolve_project_id", "tier_name": "explicit_id", "tier_number": 1, "project_id": row['id'], "database_name": row['database_name'], } }, ) return (row['id'], row['database_name']) except ValueError: # Re-raise ValueError from not-found check above raise except Exception as e: # Registry lookup error - re-raise as ValueError with clear message logger.error( f"Registry lookup failed for explicit project_id: {explicit_id}", extra={ "context": { "operation": "resolve_project_id", "project_id": explicit_id, "error": str(e), "error_type": type(e).__name__, } }, exc_info=True, ) raise ValueError( f"Failed to look up project '{explicit_id}' in registry: {e}" ) from e # 2. Try session-based config resolution (if Context provided) if ctx is not None: logger.info( "Attempting Tier 2: Session config resolution", extra={ "context": { "operation": "resolve_project_id", "tier_name": "session_config", "tier_number": 2, "session_id": ctx.session_id, "explicit_id": explicit_id, } }, ) try: result = await _resolve_project_context(ctx) if result is not None: project_id, database_name = result logger.info( "✓ Tier 2 success: Resolved via session config", extra={ "context": { "operation": "resolve_project_id", "tier_name": "session_config", "tier_number": 2, "project_id": project_id, "database_name": database_name, } } ) logger.debug( "Resolved project via session config", extra={ "context": { "operation": "resolve_project_id", "project_id": project_id, "database_name": database_name, "resolution_method": "session_config" } } ) return (project_id, database_name) except Exception as e: logger.debug(f"Session-based resolution failed: {e}") # ✅ FIX 2 + BUG 9: Prevent fallthrough when config exists # If config file exists but resolution failed, raise exception instead of falling through if ctx is not None and ctx.session_id: try: working_dir = await get_session_context_manager().get_working_directory(ctx.session_id) if working_dir: config_path = find_config_file(Path(working_dir)) if config_path: logger.error( f"Config exists but project resolution failed: {config_path}", extra={ "context": { "config_path": str(config_path), "session_id": ctx.session_id, "operation": "resolve_project_id" } } ) raise ValueError( f"Project resolution failed despite config at {config_path}. " f"Check config file for errors (project.id or project.name required)." ) else: logger.info( "No config file found, allowing fallthrough to Tier 3 (workflow-mcp)", extra={ "context": { "operation": "resolve_project_id", "working_directory": str(working_dir), "session_id": ctx.session_id } } ) except ValueError: # Re-raise ValueError from config validation check above raise except Exception as e: # Graceful fallthrough if session context lookup fails logger.debug(f"Config existence check failed, allowing fallthrough: {e}") # 3. Try workflow-mcp integration (if configured) if settings is None: settings = get_settings() if settings.workflow_mcp_url is not None: logger.info( "Attempting Tier 3: workflow-mcp fallback", extra={ "context": { "operation": "resolve_project_id", "tier_name": "workflow_mcp", "tier_number": 3, "workflow_mcp_url": str(settings.workflow_mcp_url), "session_id": ctx.session_id if ctx else None, "explicit_id": explicit_id, } }, ) logger.debug( "Querying workflow-mcp for active project", extra={ "context": { "operation": "resolve_project_id", "workflow_mcp_url": str(settings.workflow_mcp_url), "timeout": settings.workflow_mcp_timeout, } }, ) from src.services.workflow_client import WorkflowIntegrationClient client = WorkflowIntegrationClient( base_url=str(settings.workflow_mcp_url), timeout=settings.workflow_mcp_timeout, cache_ttl=settings.workflow_mcp_cache_ttl, ) try: active_project = await client.get_active_project() if active_project is not None: # Look up in registry to get database_name registry_pool = await _initialize_registry_pool() async with registry_pool.acquire() as conn: row = await conn.fetchrow( """ SELECT id, database_name FROM projects WHERE id::text = $1 OR name = $1 LIMIT 1 """, active_project ) if row is not None: logger.info( "✓ Tier 3 success: Resolved via workflow-mcp", extra={ "context": { "operation": "resolve_project_id", "tier_name": "workflow_mcp", "tier_number": 3, "project_id": row['id'], "database_name": row['database_name'], } }, ) logger.info( "Resolved project from workflow-mcp", extra={ "context": { "operation": "resolve_project_id", "project_id": row['id'], "database_name": row['database_name'], "resolution_method": "workflow_mcp", } }, ) return (row['id'], row['database_name']) else: logger.warning( f"workflow-mcp project not found in registry: {active_project}", extra={ "context": { "operation": "resolve_project_id", "project_id": active_project, } }, ) else: logger.debug( "workflow-mcp returned no active project, using default workspace", extra={ "context": { "operation": "resolve_project_id", "resolution_method": "default_fallback", } }, ) except Exception as e: logger.warning( "Failed to query workflow-mcp, using default workspace", extra={ "context": { "operation": "resolve_project_id", "error": str(e), "error_type": type(e).__name__, "resolution_method": "default_fallback", } }, ) finally: # Always close client connection to prevent resource leaks await client.close() # 4. Fallback to default workspace logger.info( "Attempting Tier 4: Default project", extra={ "context": { "operation": "resolve_project_id", "tier_name": "default_project", "tier_number": 4, "session_id": ctx.session_id if ctx else None, "explicit_id": explicit_id, } }, ) logger.debug( "Using default workspace (no explicit ID, session config unavailable, workflow-mcp unavailable/disabled)", extra={ "context": { "operation": "resolve_project_id", "resolution_method": "default_fallback", } }, ) logger.info( "✓ Tier 4 success: Resolved via default project", extra={ "context": { "operation": "resolve_project_id", "tier_name": "default_project", "tier_number": 4, "project_id": "default", "database_name": DEFAULT_PROJECT_DB, } }, ) return ("default", DEFAULT_PROJECT_DB) # ============================================================================== # Session Context Manager # ============================================================================== @asynccontextmanager async def get_session( project_id: str | None = None, ctx: Context | None = None, ) -> AsyncGenerator[AsyncSession, None]: """Async context manager for database sessions with automatic transaction management. Provides a database session connected to the correct project database with automatic commit on success and rollback on error. This is the primary way to interact with the database in MCP tools. Args: project_id: Optional project identifier for workspace isolation. If None, uses 4-tier resolution chain (explicit → session → workflow-mcp → default). ctx: Optional FastMCP Context for session-based project resolution. Yields: AsyncSession: Configured database session connected to project database Raises: ValueError: If project_id format is invalid Exception: Any exception from database operations (after rollback) Transaction Management: - Resolves project_id and database_name via 4-tier chain - Creates session from project-specific connection pool - Automatically commits on successful completion - Automatically rolls back on any exception - Ensures session is closed after use Example: >>> # In MCP tool with project isolation: >>> async with get_session(project_id="client-a") as session: ... result = await session.execute(select(Repository)) ... repos = result.scalars().all() >>> # With session-based resolution: >>> async with get_session(ctx=ctx) as session: ... result = await session.execute(select(Repository)) ... # Uses project from .codebase-mcp/config.json >>> # With error handling: >>> try: ... async with get_session(ctx=ctx) as session: ... await session.execute(insert(Repository).values(...)) ... except IntegrityError: ... logger.error("Duplicate repository") Performance: - Uses per-project connection pools for efficient resource usage - Connection recycling prevents stale connections - Registry lookup cached for repeated access Constitutional Compliance: - Principle V: Production quality (proper error handling, cleanup) - Principle VIII: Type safety (mypy --strict compliance) - Principle XI: FastMCP Foundation (async pattern for MCP tools) Multi-Project Support: - Resolves project_id to physical database name (cb_proj_*) - Uses dedicated connection pool per project database - Complete data isolation (no shared tables or schemas) - Backward compatible with project_id=None """ # Resolve project_id and database_name via 4-tier resolution chain resolved_project_id, database_name = await resolve_project_id( explicit_id=project_id, ctx=ctx, ) logger.debug( f"Resolved project for session: {resolved_project_id} (database: {database_name})", extra={ "context": { "operation": "get_session", "project_id": resolved_project_id, "database_name": database_name, } }, ) # Get or create connection pool for project database try: pool = await get_or_create_project_pool(database_name) except Exception as e: logger.error( f"Failed to get pool for database: {database_name}", extra={ "context": { "operation": "get_session", "database_name": database_name, "error": str(e), "error_type": type(e).__name__, } }, exc_info=True, ) raise # Create SQLAlchemy engine from AsyncPG pool (temporary bridge) # TODO: Move to pure AsyncPG for better performance from sqlalchemy.ext.asyncio import create_async_engine # Build connection string for this project database db_user = os.getenv("DB_USER", os.getenv("USER", "postgres")) db_host = os.getenv("DB_HOST", "localhost") db_port = int(os.getenv("DB_PORT", "5432")) db_password = os.getenv("DB_PASSWORD", "") if db_password: project_db_url = f"postgresql+asyncpg://{db_user}:{db_password}@{db_host}:{db_port}/{database_name}" else: project_db_url = f"postgresql+asyncpg://{db_user}@{db_host}:{db_port}/{database_name}" # Create engine with pooling disabled (we manage pools separately) project_engine = create_async_engine( project_db_url, echo=SQL_ECHO, poolclass=NullPool, # No SQLAlchemy pooling (using AsyncPG pools) ) # Create session factory for this project ProjectSessionLocal = async_sessionmaker( project_engine, class_=AsyncSession, expire_on_commit=False, ) async with ProjectSessionLocal() as session: try: logger.debug( "Database session started for project", extra={ "context": { "operation": "get_session", "project_id": resolved_project_id, "database_name": database_name, } }, ) yield session # Commit transaction on success await session.commit() logger.debug( "Database session committed successfully", extra={ "context": { "operation": "get_session", "project_id": resolved_project_id, } }, ) except Exception as e: # Rollback transaction on error await session.rollback() logger.error( "Database session rolled back due to error", extra={ "context": { "operation": "get_session", "project_id": resolved_project_id, "database_name": database_name, "error": str(e), "error_type": type(e).__name__, } }, exc_info=True, ) raise finally: # Ensure session is closed (cleanup) await session.close() logger.debug( "Database session closed", extra={ "context": { "operation": "get_session", "project_id": resolved_project_id, } }, ) # Dispose of project-specific engine await project_engine.dispose() # ============================================================================== # Session Factory Access # ============================================================================== def get_session_factory() -> async_sessionmaker[AsyncSession]: """Get the initialized session factory. Provides access to the global SessionLocal factory for creating database sessions. This is useful when you need the factory itself rather than a session instance (e.g., for dependency injection). Returns: async_sessionmaker[AsyncSession]: Initialized session factory Note: This returns the LEGACY session factory. For production use, prefer get_session() which uses per-project pools. Example: >>> # Legacy usage (backward compatibility): >>> factory = get_session_factory() >>> async with factory() as session: ... result = await session.execute(select(Repository)) Constitutional Compliance: - Principle VIII: Type safety (explicit return type annotation) """ return SessionLocal # ============================================================================== # Database Health Check # ============================================================================== async def check_database_health() -> bool: """Check database connection health. Performs a simple query (`SELECT 1`) to verify the registry database is accessible and responsive. This is useful for health check endpoints and monitoring. Returns: bool: True if database is healthy, False otherwise Example: >>> # In health check endpoint: >>> @mcp.tool() >>> async def health_check() -> dict[str, str]: ... db_healthy = await check_database_health() ... return { ... "status": "healthy" if db_healthy else "unhealthy", ... "database": "ok" if db_healthy else "error", ... } Performance: - Uses connection pool for efficient health checks - Minimal overhead (<1ms typical latency) Constitutional Compliance: - Principle IV: Performance (<1ms query execution) - Principle V: Production quality (comprehensive error logging) """ try: # Check registry pool health registry_pool = await _initialize_registry_pool() async with registry_pool.acquire() as conn: await conn.fetchval("SELECT 1") logger.debug( "Database health check passed", extra={"context": {"operation": "check_database_health"}}, ) return True except Exception as e: logger.error( "Database health check failed", extra={ "context": { "operation": "check_database_health", "error": str(e), "error_type": type(e).__name__, } }, exc_info=True, ) return False # ============================================================================== # Connection Lifecycle Management # ============================================================================== async def init_db_connection() -> None: """Initialize database connection pools. This function is called during server startup to ensure the registry database connection pool is properly initialized and ready to serve requests. Raises: RuntimeError: If database initialization fails (connection error, etc.) Example: >>> # In FastMCP lifespan: >>> @asynccontextmanager >>> async def lifespan(app: FastMCP): ... await init_db_connection() # BLOCKING - wait for DB to be ready ... yield ... await close_db_connection() Constitutional Compliance: - Principle V: Production quality (explicit initialization order) - Principle IV: Performance (connection pool pre-warming) """ try: # Initialize registry pool await _initialize_registry_pool() # Verify registry database is accessible is_healthy = await check_database_health() if not is_healthy: raise RuntimeError("Registry database health check failed during initialization") logger.info( "Database connection pools initialized successfully", extra={ "context": { "operation": "init_db_connection", "registry_pool_ready": True, } }, ) except Exception as e: logger.critical( "Failed to initialize database connection pools", extra={ "context": { "operation": "init_db_connection", "error": str(e), "error_type": type(e).__name__, } }, exc_info=True, ) raise RuntimeError(f"Database initialization failed: {e}") from e async def close_db_connection() -> None: """Close database connection pools gracefully. This function is called during server shutdown to ensure all database connections are properly closed and resources are cleaned up. Example: >>> # In FastMCP lifespan: >>> @asynccontextmanager >>> async def lifespan(app: FastMCP): ... await init_db_connection() ... yield ... await close_db_connection() # Graceful shutdown Constitutional Compliance: - Principle V: Production quality (proper cleanup, no resource leaks) """ try: logger.info( "Closing database connection pools...", extra={"context": {"operation": "close_db_connection"}}, ) # Close all project pools for database_name, pool in _project_pools.items(): try: await pool.close() logger.debug(f"Closed pool for database: {database_name}") except Exception as e: logger.error( f"Error closing pool for database: {database_name}", extra={ "context": { "operation": "close_db_connection", "database_name": database_name, "error": str(e), } }, ) _project_pools.clear() # Close registry pool global _registry_pool if _registry_pool is not None: try: await _registry_pool.close() logger.debug("Closed registry pool") _registry_pool = None except Exception as e: logger.error( "Error closing registry pool", extra={ "context": { "operation": "close_db_connection", "error": str(e), } }, ) # Dispose of legacy engine await engine.dispose() logger.info( "Database connection pools closed successfully", extra={"context": {"operation": "close_db_connection"}}, ) except Exception as e: logger.error( "Error closing database connection pools", extra={ "context": { "operation": "close_db_connection", "error": str(e), "error_type": type(e).__name__, } }, exc_info=True, ) # Don't raise - we're shutting down anyway # ============================================================================== # Public API # ============================================================================== __all__ = [ "engine", "SessionLocal", "get_session", "get_session_factory", "resolve_project_id", "_resolve_project_context", "check_database_health", "init_db_connection", "close_db_connection", "get_or_create_project_pool", "_initialize_registry_pool", "DATABASE_URL", "REGISTRY_DATABASE_URL", ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Ravenight13/codebase-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server