Skip to main content
Glama

Codebase MCP Server

by Ravenight13
data-model.md20.2 kB
# Data Model: Production-Grade Connection Management **Branch**: 009-v2-connection-mgmt | **Date**: 2025-10-13 **Phase**: 1 (Design & Contracts) ## Overview This document defines all data models, configuration structures, and state representations for the connection pool implementation. All models use Pydantic for runtime validation and type safety (Constitutional Principle VIII). ## Configuration Models ### PoolConfig **Purpose**: Connection pool configuration with environment variable support and validation. **Pydantic Model**: ```python from pydantic_settings import BaseSettings from pydantic import Field, field_validator class PoolConfig(BaseSettings): """Connection pool configuration with environment variable overrides.""" min_size: int = Field( default=2, ge=1, le=100, description="Minimum number of connections to maintain" ) max_size: int = Field( default=10, ge=1, le=100, description="Maximum number of connections allowed" ) max_queries: int = Field( default=50000, ge=1000, description="Queries per connection before recycling" ) max_idle_time: float = Field( default=60.0, ge=10.0, description="Seconds before idle connections are closed" ) timeout: float = Field( default=30.0, gt=0.0, lt=300.0, description="Seconds to wait for connection acquisition" ) command_timeout: float = Field( default=60.0, gt=0.0, description="Seconds for query execution timeout" ) max_connection_lifetime: float = Field( default=3600.0, ge=60.0, description="Maximum age of connection in seconds" ) leak_detection_timeout: float = Field( default=30.0, ge=0.0, description="Seconds before connection leak warning (0 = disabled)" ) enable_leak_detection: bool = Field( default=True, description="Toggle leak detection for production/development" ) database_url: str = Field( ..., description="PostgreSQL connection string" ) @field_validator("max_size") @classmethod def validate_max_size_greater_than_min(cls, v, info): """Ensure max_size >= min_size.""" if "min_size" in info.data and v < info.data["min_size"]: raise ValueError( f"max_size ({v}) must be >= min_size ({info.data['min_size']}). " f"Suggestion: Increase POOL_MAX_SIZE to {info.data['min_size']} or reduce POOL_MIN_SIZE to {v}" ) return v class Config: env_prefix = "POOL_" # POOL_MIN_SIZE, POOL_MAX_SIZE, etc. env_file = ".env" frozen = True # Immutable after initialization ``` **Validation Rules**: - min_size: 1 <= value <= 100 - max_size: 1 <= value <= 100, must be >= min_size - max_queries: >= 1000 - max_idle_time: >= 10.0 seconds - timeout: 0 < value < 300 seconds - command_timeout: > 0 seconds - max_connection_lifetime: >= 60 seconds - leak_detection_timeout: >= 0 (0 = disabled) - database_url: Required, must be valid PostgreSQL connection string **Environment Variables**: - `POOL_MIN_SIZE`: Minimum pool size (default: 2) - `POOL_MAX_SIZE`: Maximum pool size (default: 10) - `POOL_MAX_QUERIES`: Queries before recycling (default: 50000) - `POOL_MAX_IDLE_TIME`: Idle timeout in seconds (default: 60.0) - `POOL_TIMEOUT`: Acquisition timeout (default: 30.0) - `POOL_COMMAND_TIMEOUT`: Query timeout (default: 60.0) - `POOL_MAX_CONNECTION_LIFETIME`: Connection max age (default: 3600.0) - `POOL_LEAK_DETECTION_TIMEOUT`: Leak warning threshold (default: 30.0) - `POOL_ENABLE_LEAK_DETECTION`: Enable leak detection (default: true) - `POOL_DATABASE_URL` or `DATABASE_URL`: PostgreSQL connection string (required) --- ## Statistics Models ### PoolStatistics **Purpose**: Immutable snapshot of connection pool state for monitoring and observability. **Pydantic Model**: ```python from pydantic import BaseModel, Field from datetime import datetime class PoolStatistics(BaseModel): """Real-time connection pool statistics.""" total_connections: int = Field( ge=0, description="Current total connections in pool" ) idle_connections: int = Field( ge=0, description="Connections available for use" ) active_connections: int = Field( ge=0, description="Connections currently in use" ) waiting_requests: int = Field( ge=0, description="Requests waiting for connection" ) total_acquisitions: int = Field( ge=0, description="Lifetime connection acquisitions" ) total_releases: int = Field( ge=0, description="Lifetime connection releases" ) avg_acquisition_time_ms: float = Field( ge=0.0, description="Average time to acquire connection (milliseconds)" ) peak_active_connections: int = Field( ge=0, description="Max active connections since server start" ) peak_wait_time_ms: float = Field( ge=0.0, description="Max time request waited for connection (milliseconds)" ) pool_created_at: datetime = Field( description="When pool was initialized" ) last_health_check: datetime = Field( description="Last successful health check" ) last_error: str | None = Field( default=None, description="Last error message if any" ) last_error_time: datetime | None = Field( default=None, description="Timestamp of last error" ) class Config: frozen = True # Immutable snapshot ``` **Invariants**: - total_connections = idle_connections + active_connections - total_acquisitions >= total_releases - peak_active_connections >= active_connections - All counts >= 0 **Serialization**: - JSON serializable for health check API responses - datetime fields use ISO 8601 format --- ## Health Check Models ### PoolHealthStatus **Purpose**: Enum for health status values. **Enum Definition**: ```python from enum import Enum class PoolHealthStatus(str, Enum): """Connection pool health status.""" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" ``` **Status Determination Logic**: - **HEALTHY**: total_connections > 0, idle >= 80% capacity, no errors in last 60s - **DEGRADED**: total_connections > 0, idle 50-79% capacity, some recent errors - **UNHEALTHY**: total_connections == 0 OR idle < 50% capacity OR initialization failed --- ### DatabaseStatus **Purpose**: Database connectivity status details. **Pydantic Model**: ```python from pydantic import BaseModel, Field class DatabaseStatus(BaseModel): """Database connectivity status.""" status: str = Field( description="Connection status (connected/disconnected)" ) pool: dict = Field( description="Pool statistics (total, idle, active, waiting)" ) latency_ms: float | None = Field( default=None, description="Last query latency in milliseconds" ) last_error: str | None = Field( default=None, description="Last error message" ) ``` **Usage**: Embedded in HealthStatus response for detailed database information. --- ### HealthStatus **Purpose**: Health check response with pool status and diagnostic information. **Pydantic Model**: ```python from pydantic import BaseModel, Field from datetime import datetime class HealthStatus(BaseModel): """Complete health check response.""" status: PoolHealthStatus = Field( description="Overall health status" ) timestamp: datetime = Field( description="Health check execution time" ) database: DatabaseStatus = Field( description="Database connectivity and pool state" ) uptime_seconds: float = Field( ge=0.0, description="Server uptime in seconds" ) class Config: json_schema_extra = { "example": { "status": "healthy", "timestamp": "2025-10-13T14:30:00.000Z", "database": { "status": "connected", "pool": { "total": 5, "idle": 3, "active": 2, "waiting": 0 }, "latency_ms": 2.3, "last_error": None }, "uptime_seconds": 3600.5 } } ``` **Health Status Calculation Function**: ```python def calculate_health_status( stats: PoolStatistics, config: PoolConfig ) -> PoolHealthStatus: """ Calculate health status from pool statistics. Rules: - UNHEALTHY: No connections available OR initialization failed OR critical error - DEGRADED: Partial capacity (50-79%) OR recent errors OR high wait times - HEALTHY: Full capacity (≥80%) AND no recent errors AND normal wait times """ if stats.total_connections == 0: return PoolHealthStatus.UNHEALTHY capacity_ratio = stats.idle_connections / stats.total_connections # Check for critical conditions if capacity_ratio < 0.5: return PoolHealthStatus.UNHEALTHY # Check for degraded conditions if capacity_ratio < 0.8: return PoolHealthStatus.DEGRADED # Check for recent errors (within last 60 seconds) if stats.last_error_time: time_since_error = (datetime.utcnow() - stats.last_error_time).total_seconds() if time_since_error < 60: return PoolHealthStatus.DEGRADED # Check for high wait times (>100ms average) if stats.peak_wait_time_ms > 100: return PoolHealthStatus.DEGRADED return PoolHealthStatus.HEALTHY ``` --- ## State Management Models ### PoolState **Purpose**: Internal pool lifecycle state tracking. **Enum Definition**: ```python from enum import Enum class PoolState(str, Enum): """Connection pool lifecycle states.""" INITIALIZING = "initializing" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" RECOVERING = "recovering" SHUTTING_DOWN = "shutting_down" TERMINATED = "terminated" ``` **State Transitions**: - `INITIALIZING` → `HEALTHY`: Pool created min_size connections successfully - `HEALTHY` → `DEGRADED`: Some connections failed, 50-79% capacity - `DEGRADED` → `UNHEALTHY`: <50% capacity or critical failure - `UNHEALTHY` → `RECOVERING`: Reconnection attempt in progress - `RECOVERING` → `HEALTHY`: Successful reconnection - `HEALTHY/DEGRADED/UNHEALTHY` → `SHUTTING_DOWN`: Graceful shutdown initiated - `SHUTTING_DOWN` → `TERMINATED`: All connections closed, resources released **State Diagram**: ``` ┌──────────────┐ │ INITIALIZING │ └──────┬───────┘ │ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ HEALTHY │────►│ DEGRADED │────►│ UNHEALTHY │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ ▼ │ │ ┌──────────────┐ │ │ │ RECOVERING │ │ │ └──────┬───────┘ │ │ │ │◄───────────────────┴─────────────────────┘ │ │ (shutdown signal) ▼ ┌──────────────┐ │SHUTTING_DOWN │ └──────┬───────┘ │ ▼ ┌──────────────┐ │ TERMINATED │ └──────────────┘ ``` --- ## Error Models ### ConnectionPoolError **Purpose**: Base exception for connection pool errors. **Exception Hierarchy**: ```python class ConnectionPoolError(Exception): """Base exception for connection pool errors.""" pass class PoolConfigurationError(ConnectionPoolError): """Configuration validation failed.""" pass class PoolInitializationError(ConnectionPoolError): """Pool initialization failed.""" pass class PoolTimeoutError(ConnectionPoolError): """Connection acquisition timeout.""" pass class ConnectionValidationError(ConnectionPoolError): """Connection validation failed.""" pass class PoolClosedError(ConnectionPoolError): """Pool is closed and cannot accept requests.""" pass ``` **Error Context**: All exceptions include: - Error message with context - Suggested action for resolution - Pool state at time of error (if applicable) **Example Error Messages**: ```python # PoolConfigurationError raise PoolConfigurationError( "max_size (5) must be >= min_size (10). " "Suggestion: Increase POOL_MAX_SIZE to 10 or reduce POOL_MIN_SIZE to 5" ) # PoolTimeoutError raise PoolTimeoutError( f"Connection acquisition timeout after {timeout}s. " f"Pool state: {stats.total_connections} total, {stats.active_connections} active, " f"{stats.waiting_requests} waiting. " "Suggestion: Increase POOL_MAX_SIZE or optimize query performance" ) # ConnectionValidationError raise ConnectionValidationError( f"Connection validation failed: {error_details}. " "Connection has been recycled. Retrying acquisition." ) ``` --- ## Internal Data Structures ### ConnectionMetadata **Purpose**: Track connection lifecycle and leak detection. **Structure** (not exposed in API): ```python from dataclasses import dataclass from datetime import datetime @dataclass class ConnectionMetadata: """Internal connection tracking metadata.""" connection_id: str acquired_at: datetime acquisition_stack_trace: str query_count: int created_at: datetime ``` **Usage**: - Stored in weakref dictionary keyed by connection object - Used for leak detection warnings - Tracks query count for recycling decisions - Captures stack trace for debugging **Leak Detection Warning Format**: ```python logger.warning( f"Potential connection leak detected: {metadata.connection_id}", extra={ "connection_id": metadata.connection_id, "held_duration_seconds": (datetime.utcnow() - metadata.acquired_at).total_seconds(), "stack_trace": metadata.acquisition_stack_trace, } ) ``` --- ## Model Relationships ``` PoolConfig ↓ (validates & configures) ConnectionPoolManager ↓ (creates & manages) ConnectionPool (asyncpg.Pool) ↓ (tracks via) ConnectionMetadata ↓ (aggregates into) PoolStatistics ↓ (includes in) HealthStatus ``` --- ## Validation Examples ### Valid Configuration ```python config = PoolConfig( min_size=2, max_size=10, timeout=30.0, database_url="postgresql+asyncpg://localhost/codebase_mcp" ) # ✅ Passes validation ``` ### Invalid Configuration (max_size < min_size) ```python config = PoolConfig( min_size=15, max_size=10, database_url="postgresql+asyncpg://localhost/codebase_mcp" ) # ❌ Raises ValidationError: # "max_size (10) must be >= min_size (15). # Suggestion: Increase POOL_MAX_SIZE to 15 or reduce POOL_MIN_SIZE to 10" ``` ### Invalid Configuration (timeout too high) ```python config = PoolConfig( timeout=600.0, # 10 minutes database_url="postgresql+asyncpg://localhost/codebase_mcp" ) # ❌ Raises ValidationError: # "timeout must be less than 300.0 seconds (5 minutes)" ``` ### Invalid Configuration (timeout zero) ```python config = PoolConfig( timeout=0.0, database_url="postgresql+asyncpg://localhost/codebase_mcp" ) # ❌ Raises ValidationError: # "timeout must be greater than 0.0" ``` --- ## Serialization Formats ### PoolStatistics JSON ```json { "total_connections": 5, "idle_connections": 3, "active_connections": 2, "waiting_requests": 0, "total_acquisitions": 1523, "total_releases": 1521, "avg_acquisition_time_ms": 2.8, "peak_active_connections": 8, "peak_wait_time_ms": 45.2, "pool_created_at": "2025-10-13T10:00:00Z", "last_health_check": "2025-10-13T14:30:00Z", "last_error": null, "last_error_time": null } ``` ### HealthStatus JSON ```json { "status": "healthy", "timestamp": "2025-10-13T14:30:00.000Z", "database": { "status": "connected", "pool": { "total": 5, "idle": 3, "active": 2, "waiting": 0 }, "latency_ms": 2.3, "last_error": null }, "uptime_seconds": 3600.5 } ``` ### HealthStatus JSON (Degraded State) ```json { "status": "degraded", "timestamp": "2025-10-13T15:00:00.000Z", "database": { "status": "connected", "pool": { "total": 10, "idle": 6, "active": 4, "waiting": 2 }, "latency_ms": 45.8, "last_error": "Connection validation failed" }, "uptime_seconds": 5400.2 } ``` ### HealthStatus JSON (Unhealthy State) ```json { "status": "unhealthy", "timestamp": "2025-10-13T16:00:00.000Z", "database": { "status": "disconnected", "pool": { "total": 0, "idle": 0, "active": 0, "waiting": 5 }, "latency_ms": null, "last_error": "Database connection refused" }, "uptime_seconds": 7200.5 } ``` --- ## Type Safety Enforcement All models enforce type safety through: 1. **Pydantic validation**: Runtime type checking and constraint validation 2. **mypy --strict**: Static type checking during development 3. **Field constraints**: Explicit bounds (ge, le, gt, lt) on numeric fields 4. **Custom validators**: Cross-field validation (e.g., max_size >= min_size) 5. **Immutability**: PoolStatistics uses frozen=True to prevent modification 6. **Type hints**: All function signatures include complete type annotations **mypy --strict Compliance Checklist**: - ✅ All class attributes have type annotations - ✅ All function parameters have type hints - ✅ All function return types specified explicitly - ✅ No implicit Optional types - ✅ No use of `Any` type (except in justified cases with comments) - ✅ All Pydantic models inherit from BaseModel or BaseSettings - ✅ Field validators use proper type annotations --- ## Constitutional Compliance - **Principle VIII (Pydantic Type Safety)**: All models use Pydantic BaseModel ✅ - **Principle V (Production Quality)**: Comprehensive validation with actionable error messages ✅ - **Principle III (Protocol Compliance)**: Models are JSON serializable for MCP transport ✅ - **Principle IV (Performance)**: Statistics model optimized for <10ms health checks ✅ --- ## Summary of Models ### Configuration (1 model) - **PoolConfig**: Environment-driven configuration with Pydantic validation ### Statistics & Monitoring (4 models) - **PoolStatistics**: Immutable statistics snapshot - **PoolHealthStatus**: Three-tier health status enum - **DatabaseStatus**: Database connectivity details - **HealthStatus**: Complete health check response ### State Management (1 enum) - **PoolState**: Connection pool lifecycle states ### Error Handling (6 exception classes) - **ConnectionPoolError**: Base exception - **PoolConfigurationError**: Configuration validation failures - **PoolInitializationError**: Pool creation failures - **PoolTimeoutError**: Acquisition timeout failures - **ConnectionValidationError**: Connection validation failures - **PoolClosedError**: Operations on closed pool ### Internal Tracking (1 dataclass) - **ConnectionMetadata**: Connection lifecycle and leak detection metadata **Total Data Structures**: 13 models (1 config, 4 monitoring, 1 state, 6 errors, 1 internal)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Ravenight13/codebase-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server