Skip to main content
Glama

Codebase MCP Server

by Ravenight13
test_observability.py24.2 kB
"""Observability endpoint integration tests for Phase 7 User Story 5. Validates health check and metrics endpoints meet constitutional requirements for response time, schema compliance, and structured logging. Test Coverage: - T043: Health check response time (<50ms per SC-010) - T044: Health check schema validation (OpenAPI contract compliance) - T045: Metrics endpoint format (JSON and Prometheus text per FR-012) - T046: Structured logging validation (JSON format with required fields) Constitutional Compliance: - Principle IV: Performance guarantees (SC-010: <50ms health checks) - Principle V: Production quality observability - Principle VIII: Type safety (mypy --strict compliance) - SC-010: Health checks respond within 50ms - FR-011: Health endpoint <50ms response time requirement - FR-012: Prometheus-compatible metrics format FR References: - FR-011: Health check endpoint with database connectivity check (<50ms) - FR-012: Prometheus metrics endpoint (counters, histograms, text format) """ from __future__ import annotations import json import time from datetime import datetime from decimal import Decimal from pathlib import Path from typing import Any, AsyncGenerator import pytest import pytest_asyncio from src.connection_pool.config import PoolConfig from src.connection_pool.manager import ConnectionPoolManager from src.models.health import ConnectionPoolStats, HealthCheckResponse from src.models.metrics import ( LatencyHistogram, MetricCounter, MetricHistogram, MetricsResponse, ) from src.services.health_service import HealthService from src.services.metrics_service import MetricsService @pytest_asyncio.fixture async def pool_manager() -> AsyncGenerator[ConnectionPoolManager, None]: """Create connection pool manager for health checks. Yields: Initialized ConnectionPoolManager for testing health service Note: Uses test database connection from environment """ config = PoolConfig( database_url="postgresql+asyncpg://postgres:postgres@localhost:5432/codebase_mcp_test", min_size=2, max_size=5, timeout=5.0, ) manager = ConnectionPoolManager() await manager.initialize(config) yield manager # Cleanup await manager.shutdown() @pytest_asyncio.fixture async def health_service(pool_manager: ConnectionPoolManager) -> HealthService: """Create health service for testing. Args: pool_manager: Connection pool manager fixture Returns: Initialized HealthService instance """ return HealthService(pool_manager) @pytest.fixture def metrics_service() -> MetricsService: """Create metrics service for testing. Returns: Fresh MetricsService instance with empty metrics """ service = MetricsService() service.reset_metrics() return service @pytest.mark.asyncio async def test_health_check_response_time( health_service: HealthService, ) -> None: """Validate health check responds within 50ms per SC-010. Test validates: 1. Health check completes in <50ms (p95 requirement per FR-011) 2. Response contains all required fields 3. Status values are valid (healthy/degraded/unhealthy) 4. Database status is reported correctly 5. Connection pool statistics are present Acceptance Criteria: - quickstart.md lines 413-430 - SC-010: Health checks respond within 50ms - FR-011: <50ms response time requirement Constitutional Compliance: - Principle IV: Performance guarantees (<50ms health check) - Principle V: Production quality health monitoring - Principle VIII: Type safety (HealthCheckResponse model) Args: health_service: Health service fixture Test Strategy: Execute health check 10 times and validate: - p95 latency <50ms - All responses have correct schema - Response times are consistent """ # Execute health checks and measure latency latencies: list[float] = [] responses: list[HealthCheckResponse] = [] for _ in range(10): start_time = time.time() response = await health_service.check_health() elapsed_ms = (time.time() - start_time) * 1000 latencies.append(elapsed_ms) responses.append(response) # Calculate p95 latency sorted_latencies = sorted(latencies) p95_index = int(len(sorted_latencies) * 0.95) p95_latency = sorted_latencies[p95_index] # Validate p95 latency meets constitutional requirement assert p95_latency < 50.0, ( f"Health check p95 latency {p95_latency:.2f}ms exceeds 50ms limit (SC-010)" ) # Validate all responses have correct structure for response in responses: # Validate status is one of allowed values assert response.status in ["healthy", "degraded", "unhealthy"], ( f"Invalid status: {response.status}" ) # Validate timestamp is present and recent assert isinstance(response.timestamp, datetime), ( f"Invalid timestamp type: {type(response.timestamp)}" ) # Validate database_status assert response.database_status in ["connected", "disconnected", "degraded"], ( f"Invalid database_status: {response.database_status}" ) # Validate connection pool stats are present assert isinstance(response.connection_pool, ConnectionPoolStats), ( f"Invalid connection_pool type: {type(response.connection_pool)}" ) assert response.connection_pool.size >= 0 assert response.connection_pool.min_size >= 0 assert response.connection_pool.max_size > 0 assert response.connection_pool.free >= 0 # Validate uptime assert isinstance(response.uptime_seconds, Decimal), ( f"Invalid uptime_seconds type: {type(response.uptime_seconds)}" ) assert response.uptime_seconds >= Decimal("0") # Log performance summary for monitoring avg_latency = sum(latencies) / len(latencies) max_latency = max(latencies) min_latency = min(latencies) print(f"\nHealth Check Performance Summary:") print(f" Iterations: {len(latencies)}") print(f" p95 Latency: {p95_latency:.2f}ms (target: <50ms)") print(f" Avg Latency: {avg_latency:.2f}ms") print(f" Min Latency: {min_latency:.2f}ms") print(f" Max Latency: {max_latency:.2f}ms") @pytest.mark.asyncio async def test_health_check_response_schema( health_service: HealthService, ) -> None: """Validate health check response conforms to OpenAPI contract. Test validates: 1. Response matches HealthCheckResponse Pydantic model 2. All required fields are present 3. Field types match contract specification 4. Nested connection pool stats are valid 5. Response can be serialized to JSON Acceptance Criteria: - contracts/health-endpoint.yaml compliance - HealthCheckResponse model validation Constitutional Compliance: - Principle VIII: Type safety with Pydantic models - Principle V: Production quality API contracts - FR-011: Health check endpoint specification Args: health_service: Health service fixture Test Strategy: Execute health check and validate: - Pydantic model validation passes - JSON serialization works correctly - All contract fields present """ # Execute health check response = await health_service.check_health() # Validate response is HealthCheckResponse instance assert isinstance(response, HealthCheckResponse), ( f"Response must be HealthCheckResponse, got {type(response)}" ) # Validate all required fields are present assert hasattr(response, "status"), "Missing field: status" assert hasattr(response, "timestamp"), "Missing field: timestamp" assert hasattr(response, "database_status"), "Missing field: database_status" assert hasattr(response, "connection_pool"), "Missing field: connection_pool" assert hasattr(response, "uptime_seconds"), "Missing field: uptime_seconds" assert hasattr(response, "details"), "Missing field: details" # Validate connection pool nested structure pool_stats = response.connection_pool assert hasattr(pool_stats, "size"), "Missing connection_pool field: size" assert hasattr(pool_stats, "min_size"), "Missing connection_pool field: min_size" assert hasattr(pool_stats, "max_size"), "Missing connection_pool field: max_size" assert hasattr(pool_stats, "free"), "Missing connection_pool field: free" # Validate pool utilization calculation utilization = pool_stats.utilization_percent assert isinstance(utilization, Decimal), ( f"Utilization must be Decimal, got {type(utilization)}" ) assert Decimal("0") <= utilization <= Decimal("100"), ( f"Utilization must be 0-100%, got {utilization}" ) # Validate JSON serialization (required for MCP resource response) health_dict: dict[str, Any] = response.model_dump(mode="json") json_str = json.dumps(health_dict) assert len(json_str) > 0, "JSON serialization failed" # Validate deserialized JSON contains all fields deserialized = json.loads(json_str) assert "status" in deserialized, "JSON missing field: status" assert "timestamp" in deserialized, "JSON missing field: timestamp" assert "database_status" in deserialized, "JSON missing field: database_status" assert "connection_pool" in deserialized, "JSON missing field: connection_pool" assert "uptime_seconds" in deserialized, "JSON missing field: uptime_seconds" # Validate connection_pool nested structure in JSON pool_json = deserialized["connection_pool"] assert "size" in pool_json, "JSON connection_pool missing field: size" assert "min_size" in pool_json, "JSON connection_pool missing field: min_size" assert "max_size" in pool_json, "JSON connection_pool missing field: max_size" assert "free" in pool_json, "JSON connection_pool missing field: free" # Validate status values are contract-compliant assert deserialized["status"] in ["healthy", "degraded", "unhealthy"], ( f"Invalid status in JSON: {deserialized['status']}" ) assert deserialized["database_status"] in ["connected", "disconnected", "degraded"], ( f"Invalid database_status in JSON: {deserialized['database_status']}" ) @pytest.mark.asyncio async def test_metrics_prometheus_format( metrics_service: MetricsService, ) -> None: """Validate metrics endpoint supports both JSON and Prometheus text formats. Test validates: 1. Metrics are collected in memory 2. JSON format contains counters and histograms 3. Prometheus text format conforms to exposition format spec 4. Counter values are monotonically increasing 5. Histogram buckets are sorted and cumulative Acceptance Criteria: - quickstart.md lines 435-468 - FR-012: Prometheus-compatible metrics format - contracts/metrics-endpoint.yaml compliance Constitutional Compliance: - Principle V: Production quality observability - Principle VIII: Type safety (MetricsResponse model) - FR-012: Prometheus format with counters and histograms Args: metrics_service: Metrics service fixture Test Strategy: Record test metrics and validate: - JSON format with MetricsResponse schema - Prometheus text format compliance - Bucket ordering and cumulative counts """ # Record test counter metrics metrics_service.increment_counter( name="codebase_mcp_requests_total", help_text="Total number of requests", value=100, ) metrics_service.increment_counter( name="codebase_mcp_errors_total", help_text="Total number of errors", value=5, ) # Record test histogram metrics (search latencies in seconds) search_latencies = [0.05, 0.15, 0.25, 0.35, 0.45, 0.75, 0.85, 1.5, 2.5] for latency in search_latencies: metrics_service.observe_histogram( name="codebase_mcp_search_latency_seconds", help_text="Search query latency", value=latency, buckets=[0.1, 0.5, 1.0, 2.0, 5.0], ) # === Test JSON Format === metrics_response = metrics_service.get_metrics() # Validate MetricsResponse type assert isinstance(metrics_response, MetricsResponse), ( f"Response must be MetricsResponse, got {type(metrics_response)}" ) # Validate counters structure assert len(metrics_response.counters) == 2, ( f"Expected 2 counters, got {len(metrics_response.counters)}" ) # Find request counter request_counter = next( (c for c in metrics_response.counters if c.name == "codebase_mcp_requests_total"), None, ) assert request_counter is not None, "Missing counter: codebase_mcp_requests_total" assert isinstance(request_counter, MetricCounter), ( f"Counter must be MetricCounter, got {type(request_counter)}" ) assert request_counter.value == 100, ( f"Expected counter value 100, got {request_counter.value}" ) assert request_counter.help_text == "Total number of requests" # Find error counter error_counter = next( (c for c in metrics_response.counters if c.name == "codebase_mcp_errors_total"), None, ) assert error_counter is not None, "Missing counter: codebase_mcp_errors_total" assert error_counter.value == 5 # Validate histograms structure assert len(metrics_response.histograms) == 1, ( f"Expected 1 histogram, got {len(metrics_response.histograms)}" ) # Validate search latency histogram search_histogram = metrics_response.histograms[0] assert isinstance(search_histogram, MetricHistogram), ( f"Histogram must be MetricHistogram, got {type(search_histogram)}" ) assert search_histogram.name == "codebase_mcp_search_latency_seconds" assert search_histogram.help_text == "Search query latency" assert search_histogram.count == len(search_latencies), ( f"Expected histogram count {len(search_latencies)}, got {search_histogram.count}" ) # Validate histogram buckets are sorted bucket_les = [bucket.bucket_le for bucket in search_histogram.buckets] assert bucket_les == sorted(bucket_les), ( f"Histogram buckets must be sorted, got {bucket_les}" ) # Validate bucket counts are cumulative (monotonically increasing) bucket_counts = [bucket.count for bucket in search_histogram.buckets] for i in range(1, len(bucket_counts)): assert bucket_counts[i] >= bucket_counts[i - 1], ( f"Bucket counts must be cumulative, but bucket {i} has count " f"{bucket_counts[i]} < previous count {bucket_counts[i - 1]}" ) # Validate total count equals final bucket count assert search_histogram.count == search_histogram.buckets[-1].count, ( f"Total count ({search_histogram.count}) must equal final bucket count " f"({search_histogram.buckets[-1].count})" ) # Validate sum calculation expected_sum = sum(search_latencies) assert abs(float(search_histogram.sum) - expected_sum) < 0.01, ( f"Expected histogram sum {expected_sum}, got {search_histogram.sum}" ) # Validate JSON serialization metrics_dict: dict[str, Any] = metrics_response.model_dump(mode="json") json_str = json.dumps(metrics_dict) assert len(json_str) > 0, "JSON serialization failed" deserialized = json.loads(json_str) assert "counters" in deserialized, "JSON missing field: counters" assert "histograms" in deserialized, "JSON missing field: histograms" assert len(deserialized["counters"]) == 2 assert len(deserialized["histograms"]) == 1 # === Test Prometheus Text Format === prometheus_text = metrics_response.to_prometheus() # Validate Prometheus text format structure assert isinstance(prometheus_text, str), ( f"Prometheus text must be str, got {type(prometheus_text)}" ) assert len(prometheus_text) > 0, "Prometheus text is empty" # Validate counter format (HELP, TYPE, value) assert "# HELP codebase_mcp_requests_total Total number of requests" in prometheus_text, ( "Missing HELP line for requests_total counter" ) assert "# TYPE codebase_mcp_requests_total counter" in prometheus_text, ( "Missing TYPE line for requests_total counter" ) assert "codebase_mcp_requests_total 100" in prometheus_text, ( "Missing value line for requests_total counter" ) # Validate error counter assert "# HELP codebase_mcp_errors_total Total number of errors" in prometheus_text assert "# TYPE codebase_mcp_errors_total counter" in prometheus_text assert "codebase_mcp_errors_total 5" in prometheus_text # Validate histogram format assert "# HELP codebase_mcp_search_latency_seconds Search query latency" in prometheus_text, ( "Missing HELP line for search_latency histogram" ) assert "# TYPE codebase_mcp_search_latency_seconds histogram" in prometheus_text, ( "Missing TYPE line for search_latency histogram" ) # Validate histogram buckets assert 'codebase_mcp_search_latency_seconds_bucket{le="0.1"}' in prometheus_text, ( "Missing 0.1s bucket" ) assert 'codebase_mcp_search_latency_seconds_bucket{le="0.5"}' in prometheus_text, ( "Missing 0.5s bucket" ) assert 'codebase_mcp_search_latency_seconds_bucket{le="1.0"}' in prometheus_text, ( "Missing 1.0s bucket" ) assert 'codebase_mcp_search_latency_seconds_bucket{le="+Inf"}' in prometheus_text, ( "Missing +Inf bucket" ) # Validate histogram count and sum assert f"codebase_mcp_search_latency_seconds_count {len(search_latencies)}" in prometheus_text, ( "Missing or incorrect histogram count" ) assert "codebase_mcp_search_latency_seconds_sum" in prometheus_text, ( "Missing histogram sum" ) @pytest.mark.asyncio async def test_structured_logging_format(tmp_path: Path) -> None: """Validate structured logs are JSON with required fields. Test validates: 1. Log entries are valid JSON 2. Each entry contains timestamp, level, event fields 3. Timestamps are ISO 8601 format 4. Log levels are valid (DEBUG, INFO, WARNING, ERROR, CRITICAL) 5. Log entries contain contextual information Acceptance Criteria: - quickstart.md lines 473-489 - FR-013: Structured logging with JSON format Constitutional Compliance: - Principle V: Production quality logging - Principle VIII: Type safety in log structure - FR-013: Structured logging for monitoring Args: tmp_path: Pytest temporary directory fixture Test Strategy: Generate test log entries and validate: - JSON format compliance - Required fields present - Timestamp and level validity """ # Create test log file log_file = tmp_path / "test_structured_logging.log" # Write test log entries in JSON format test_logs = [ { "timestamp": "2025-10-13T10:30:00.123456Z", "level": "INFO", "event": "server_started", "message": "Codebase MCP Server started successfully", "context": {"port": 8020, "version": "2.0.0"}, }, { "timestamp": "2025-10-13T10:30:01.456789Z", "level": "DEBUG", "event": "health_check_executed", "message": "Health check completed", "context": {"status": "healthy", "latency_ms": 12.5}, }, { "timestamp": "2025-10-13T10:30:02.789012Z", "level": "WARNING", "event": "pool_utilization_high", "message": "Connection pool utilization exceeds 80%", "context": {"utilization_percent": 85.5, "pool_size": 10, "free": 2}, }, { "timestamp": "2025-10-13T10:30:03.012345Z", "level": "ERROR", "event": "database_connection_failed", "message": "Failed to connect to database", "context": {"error": "Connection timeout", "retry_count": 3}, }, ] # Write test logs to file with open(log_file, "w") as f: for log_entry in test_logs: f.write(json.dumps(log_entry) + "\n") # Read and validate log file log_entries = [] with open(log_file, "r") as f: for line in f: try: log_entry = json.loads(line.strip()) log_entries.append(log_entry) except json.JSONDecodeError as e: pytest.fail(f"Log entry is not valid JSON: {line!r}, error: {e}") # Validate we have all test log entries assert len(log_entries) == 4, ( f"Expected 4 log entries, got {len(log_entries)}" ) # Validate each log entry structure valid_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"} for idx, log_entry in enumerate(log_entries): # Validate required fields are present assert "timestamp" in log_entry, f"Log entry {idx} missing field: timestamp" assert "level" in log_entry, f"Log entry {idx} missing field: level" assert "event" in log_entry, f"Log entry {idx} missing field: event" assert "message" in log_entry, f"Log entry {idx} missing field: message" # Validate timestamp is ISO 8601 format timestamp_str = log_entry["timestamp"] try: # Parse ISO 8601 timestamp (handle both with and without microseconds) if timestamp_str.endswith("Z"): # UTC timezone indicator datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) else: datetime.fromisoformat(timestamp_str) except ValueError as e: pytest.fail( f"Log entry {idx} has invalid timestamp format: {timestamp_str}, error: {e}" ) # Validate level is valid level = log_entry["level"] assert level in valid_levels, ( f"Log entry {idx} has invalid level: {level}, must be one of {valid_levels}" ) # Validate event and message are non-empty strings assert isinstance(log_entry["event"], str) and len(log_entry["event"]) > 0, ( f"Log entry {idx} event must be non-empty string" ) assert isinstance(log_entry["message"], str) and len(log_entry["message"]) > 0, ( f"Log entry {idx} message must be non-empty string" ) # Validate context field (if present) is a dict if "context" in log_entry: assert isinstance(log_entry["context"], dict), ( f"Log entry {idx} context must be dict, got {type(log_entry['context'])}" ) # Validate specific test log entries assert log_entries[0]["event"] == "server_started" assert log_entries[0]["level"] == "INFO" assert "port" in log_entries[0]["context"] assert log_entries[0]["context"]["port"] == 8020 assert log_entries[1]["event"] == "health_check_executed" assert log_entries[1]["level"] == "DEBUG" assert "status" in log_entries[1]["context"] assert log_entries[2]["event"] == "pool_utilization_high" assert log_entries[2]["level"] == "WARNING" assert "utilization_percent" in log_entries[2]["context"] assert log_entries[3]["event"] == "database_connection_failed" assert log_entries[3]["level"] == "ERROR" assert "error" in log_entries[3]["context"] # Validate log entries are sorted by timestamp (chronological order) timestamps = [log_entry["timestamp"] for log_entry in log_entries] assert timestamps == sorted(timestamps), ( "Log entries must be in chronological order" ) __all__ = [ "test_health_check_response_time", "test_health_check_response_schema", "test_metrics_prometheus_format", "test_structured_logging_format", ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Ravenight13/codebase-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server