Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
monitoring.py26.4 kB
""" Monitoring and health check endpoints for MaverickMCP. This module provides endpoints for: - Prometheus metrics exposure - Health checks (basic, detailed, readiness) - System status and diagnostics - Monitoring dashboard data """ import time from typing import Any from fastapi import APIRouter, HTTPException, Response from pydantic import BaseModel from maverick_mcp.config.settings import settings from maverick_mcp.monitoring.metrics import ( get_backtesting_metrics, get_metrics_for_prometheus, ) from maverick_mcp.utils.database_monitoring import ( get_cache_monitor, get_database_monitor, ) from maverick_mcp.utils.logging import get_logger from maverick_mcp.utils.monitoring import get_metrics, get_monitoring_service logger = get_logger(__name__) router = APIRouter() class HealthStatus(BaseModel): """Health check response model.""" status: str timestamp: float version: str environment: str uptime_seconds: float class DetailedHealthStatus(BaseModel): """Detailed health check response model.""" status: str timestamp: float version: str environment: str uptime_seconds: float services: dict[str, dict[str, Any]] metrics: dict[str, Any] class SystemMetrics(BaseModel): """System metrics response model.""" cpu_usage_percent: float memory_usage_mb: float open_file_descriptors: int active_connections: int database_pool_status: dict[str, Any] redis_info: dict[str, Any] class ServiceStatus(BaseModel): """Individual service status.""" name: str status: str last_check: float details: dict[str, Any] = {} # Track server start time for uptime calculation _server_start_time = time.time() @router.get("/health", response_model=HealthStatus) async def health_check(): """ Basic health check endpoint. Returns basic service status and uptime information. Used by load balancers and orchestration systems. """ return HealthStatus( status="healthy", timestamp=time.time(), version="1.0.0", # You might want to get this from a version file environment=settings.environment, uptime_seconds=time.time() - _server_start_time, ) @router.get("/health/detailed", response_model=DetailedHealthStatus) async def detailed_health_check(): """ Detailed health check endpoint. Returns comprehensive health information including: - Service dependencies status - Database connectivity - Redis connectivity - Performance metrics """ services = {} # Check database health try: db_monitor = get_database_monitor() pool_status = db_monitor.get_pool_status() services["database"] = { "status": "healthy" if pool_status else "unknown", "details": pool_status, "last_check": time.time(), } except Exception as e: services["database"] = { "status": "unhealthy", "details": {"error": str(e)}, "last_check": time.time(), } # Check Redis health try: cache_monitor = get_cache_monitor() redis_info = ( await cache_monitor.redis_monitor.get_redis_info() if cache_monitor.redis_monitor else {} ) services["redis"] = { "status": "healthy" if redis_info else "unknown", "details": redis_info, "last_check": time.time(), } except Exception as e: services["redis"] = { "status": "unhealthy", "details": {"error": str(e)}, "last_check": time.time(), } # Check monitoring services try: monitoring = get_monitoring_service() services["monitoring"] = { "status": "healthy", "details": { "sentry_enabled": monitoring.sentry_enabled, }, "last_check": time.time(), } except Exception as e: services["monitoring"] = { "status": "unhealthy", "details": {"error": str(e)}, "last_check": time.time(), } # Overall status overall_status = "healthy" for service in services.values(): if service["status"] == "unhealthy": overall_status = "unhealthy" break elif service["status"] == "unknown" and overall_status == "healthy": overall_status = "degraded" return DetailedHealthStatus( status=overall_status, timestamp=time.time(), version="1.0.0", environment=settings.environment, uptime_seconds=time.time() - _server_start_time, services=services, metrics=await _get_basic_metrics(), ) @router.get("/health/readiness") async def readiness_check(): """ Readiness check endpoint. Indicates whether the service is ready to handle requests. Used by Kubernetes and other orchestration systems. """ try: # Check critical dependencies checks = [] # Database readiness try: db_monitor = get_database_monitor() pool_status = db_monitor.get_pool_status() if pool_status and pool_status.get("pool_size", 0) > 0: checks.append(True) else: checks.append(False) except Exception: checks.append(False) # Redis readiness (if configured) try: cache_monitor = get_cache_monitor() if cache_monitor.redis_monitor: redis_info = await cache_monitor.redis_monitor.get_redis_info() checks.append(bool(redis_info)) else: checks.append(True) # Redis not required except Exception: checks.append(False) if all(checks): return {"status": "ready", "timestamp": time.time()} else: raise HTTPException(status_code=503, detail="Service not ready") except HTTPException: raise except Exception as e: logger.error(f"Readiness check failed: {e}") raise HTTPException(status_code=503, detail="Readiness check failed") @router.get("/health/liveness") async def liveness_check(): """ Liveness check endpoint. Indicates whether the service is alive and should not be restarted. Used by Kubernetes and other orchestration systems. """ # Simple check - if we can respond, we're alive return {"status": "alive", "timestamp": time.time()} @router.get("/metrics") async def prometheus_metrics(): """ Prometheus metrics endpoint. Returns comprehensive metrics in Prometheus text format for scraping. Includes both system metrics and backtesting-specific metrics. """ try: # Get standard system metrics system_metrics = get_metrics() # Get backtesting-specific metrics backtesting_metrics = get_metrics_for_prometheus() # Combine all metrics combined_metrics = system_metrics + "\n" + backtesting_metrics return Response( content=combined_metrics, media_type="text/plain; version=0.0.4; charset=utf-8", ) except Exception as e: logger.error(f"Failed to generate metrics: {e}") raise HTTPException(status_code=500, detail="Failed to generate metrics") @router.get("/metrics/backtesting") async def backtesting_metrics(): """ Specialized backtesting metrics endpoint. Returns backtesting-specific metrics in Prometheus text format. Useful for dedicated backtesting monitoring and alerting. """ try: backtesting_metrics_text = get_metrics_for_prometheus() return Response( content=backtesting_metrics_text, media_type="text/plain; version=0.0.4; charset=utf-8", ) except Exception as e: logger.error(f"Failed to generate backtesting metrics: {e}") raise HTTPException( status_code=500, detail="Failed to generate backtesting metrics" ) @router.get("/metrics/json") async def metrics_json(): """ Get metrics in JSON format for dashboards and monitoring. Returns structured metrics data suitable for consumption by monitoring dashboards and alerting systems. """ try: return { "timestamp": time.time(), "system": await _get_system_metrics(), "application": await _get_application_metrics(), "business": await _get_business_metrics(), "backtesting": await _get_backtesting_metrics(), } except Exception as e: logger.error(f"Failed to generate JSON metrics: {e}") raise HTTPException(status_code=500, detail="Failed to generate JSON metrics") @router.get("/status", response_model=SystemMetrics) async def system_status(): """ Get current system status and performance metrics. Returns real-time system performance data including: - CPU and memory usage - Database connection pool status - Redis connection status - File descriptor usage """ try: import psutil process = psutil.Process() memory_info = process.memory_info() # Get database pool status db_monitor = get_database_monitor() pool_status = db_monitor.get_pool_status() # Get Redis info cache_monitor = get_cache_monitor() redis_info = {} if cache_monitor.redis_monitor: redis_info = await cache_monitor.redis_monitor.get_redis_info() return SystemMetrics( cpu_usage_percent=process.cpu_percent(), memory_usage_mb=memory_info.rss / 1024 / 1024, open_file_descriptors=process.num_fds() if hasattr(process, "num_fds") else 0, active_connections=0, # This would come from your connection tracking database_pool_status=pool_status, redis_info=redis_info, ) except Exception as e: logger.error(f"Failed to get system status: {e}") raise HTTPException(status_code=500, detail="Failed to get system status") @router.get("/diagnostics") async def system_diagnostics(): """ Get comprehensive system diagnostics. Returns detailed diagnostic information for troubleshooting: - Environment configuration - Feature flags - Service dependencies - Performance metrics - Recent errors """ try: diagnostics = { "timestamp": time.time(), "environment": { "name": settings.environment, "auth_enabled": False, # Disabled for personal use "debug_mode": settings.api.debug, }, "uptime_seconds": time.time() - _server_start_time, "services": await _get_service_diagnostics(), "performance": await _get_performance_diagnostics(), "configuration": _get_configuration_diagnostics(), } return diagnostics except Exception as e: logger.error(f"Failed to generate diagnostics: {e}") raise HTTPException(status_code=500, detail="Failed to generate diagnostics") async def _get_basic_metrics() -> dict[str, Any]: """Get basic performance metrics.""" try: import psutil process = psutil.Process() memory_info = process.memory_info() return { "cpu_usage_percent": process.cpu_percent(), "memory_usage_mb": memory_info.rss / 1024 / 1024, "uptime_seconds": time.time() - _server_start_time, } except Exception as e: logger.error(f"Failed to get basic metrics: {e}") return {} async def _get_system_metrics() -> dict[str, Any]: """Get detailed system metrics.""" try: import psutil process = psutil.Process() memory_info = process.memory_info() return { "cpu": { "usage_percent": process.cpu_percent(), "times": process.cpu_times()._asdict(), }, "memory": { "rss_mb": memory_info.rss / 1024 / 1024, "vms_mb": memory_info.vms / 1024 / 1024, "percent": process.memory_percent(), }, "file_descriptors": { "open": process.num_fds() if hasattr(process, "num_fds") else 0, }, "threads": process.num_threads(), } except Exception as e: logger.error(f"Failed to get system metrics: {e}") return {} async def _get_application_metrics() -> dict[str, Any]: """Get application-specific metrics.""" try: # Get database metrics db_monitor = get_database_monitor() pool_status = db_monitor.get_pool_status() # Get cache metrics cache_monitor = get_cache_monitor() redis_info = {} if cache_monitor.redis_monitor: redis_info = await cache_monitor.redis_monitor.get_redis_info() return { "database": { "pool_status": pool_status, }, "cache": { "redis_info": redis_info, }, "monitoring": { "sentry_enabled": get_monitoring_service().sentry_enabled, }, } except Exception as e: logger.error(f"Failed to get application metrics: {e}") return {} async def _get_business_metrics() -> dict[str, Any]: """Get business-related metrics.""" # This would typically query your database for business metrics # For now, return placeholder data return { "users": { "total_active": 0, "daily_active": 0, "monthly_active": 0, }, "tools": { "total_executions": 0, "average_execution_time": 0, }, "engagement": { "portfolio_reviews": 0, "watchlists_managed": 0, }, } async def _get_backtesting_metrics() -> dict[str, Any]: """Get backtesting-specific metrics summary.""" try: # Get the backtesting metrics collector get_backtesting_metrics() # Return a summary of key backtesting metrics # In a real implementation, you might query the metrics registry # or maintain counters in the collector class return { "strategy_performance": { "total_backtests_run": 0, # Would be populated from metrics "average_execution_time": 0.0, "successful_backtests": 0, "failed_backtests": 0, }, "api_usage": { "total_api_calls": 0, "rate_limit_hits": 0, "average_response_time": 0.0, "error_rate": 0.0, }, "resource_usage": { "peak_memory_usage_mb": 0.0, "average_computation_time": 0.0, "database_query_count": 0, }, "anomalies": { "total_anomalies_detected": 0, "critical_anomalies": 0, "warning_anomalies": 0, }, } except Exception as e: logger.error(f"Failed to get backtesting metrics: {e}") return {} async def _get_service_diagnostics() -> dict[str, Any]: """Get service dependency diagnostics.""" services = {} # Database diagnostics try: db_monitor = get_database_monitor() pool_status = db_monitor.get_pool_status() services["database"] = { "status": "healthy" if pool_status else "unknown", "pool_status": pool_status, "url_configured": bool(settings.database.url), } except Exception as e: services["database"] = { "status": "error", "error": str(e), } # Redis diagnostics try: cache_monitor = get_cache_monitor() if cache_monitor.redis_monitor: redis_info = await cache_monitor.redis_monitor.get_redis_info() services["redis"] = { "status": "healthy" if redis_info else "unknown", "info": redis_info, } else: services["redis"] = { "status": "not_configured", } except Exception as e: services["redis"] = { "status": "error", "error": str(e), } return services async def _get_performance_diagnostics() -> dict[str, Any]: """Get performance diagnostics.""" try: import gc import psutil process = psutil.Process() return { "garbage_collection": { "stats": gc.get_stats(), "counts": gc.get_count(), }, "process": { "create_time": process.create_time(), "num_threads": process.num_threads(), "connections": len(process.connections()) if hasattr(process, "connections") else 0, }, } except Exception as e: logger.error(f"Failed to get performance diagnostics: {e}") return {} def _get_configuration_diagnostics() -> dict[str, Any]: """Get configuration diagnostics.""" return { "environment": settings.environment, "features": { "auth_enabled": False, # Disabled for personal use "debug_mode": settings.api.debug, }, "database": { "url_configured": bool(settings.database.url), }, } # Health check dependencies for other endpoints async def require_healthy_database(): """Dependency that ensures database is healthy.""" try: db_monitor = get_database_monitor() pool_status = db_monitor.get_pool_status() if not pool_status or pool_status.get("pool_size", 0) == 0: raise HTTPException(status_code=503, detail="Database not available") except HTTPException: raise except Exception as e: raise HTTPException( status_code=503, detail=f"Database health check failed: {e}" ) async def require_healthy_redis(): """Dependency that ensures Redis is healthy.""" try: cache_monitor = get_cache_monitor() if cache_monitor.redis_monitor: redis_info = await cache_monitor.redis_monitor.get_redis_info() if not redis_info: raise HTTPException(status_code=503, detail="Redis not available") except HTTPException: raise except Exception as e: raise HTTPException(status_code=503, detail=f"Redis health check failed: {e}") @router.get("/alerts") async def get_active_alerts(): """ Get active alerts and anomalies detected by the monitoring system. Returns current alerts for: - Strategy performance anomalies - API rate limiting issues - Resource usage threshold violations - Data quality problems """ try: alerts = [] timestamp = time.time() # Get the backtesting metrics collector to check for anomalies get_backtesting_metrics() # In a real implementation, you would query stored alert data # For now, we'll check current thresholds and return sample data # Example: Check current system metrics against thresholds import psutil process = psutil.Process() memory_mb = process.memory_info().rss / 1024 / 1024 # Check memory usage threshold if memory_mb > 1000: # 1GB threshold alerts.append( { "id": "memory_high_001", "type": "resource_usage", "severity": "warning" if memory_mb < 2000 else "critical", "title": "High Memory Usage", "description": f"Process memory usage is {memory_mb:.1f}MB", "timestamp": timestamp, "metric_value": memory_mb, "threshold_value": 1000, "status": "active", "tags": ["memory", "system", "performance"], } ) # Check database connection pool try: db_monitor = get_database_monitor() pool_status = db_monitor.get_pool_status() if ( pool_status and pool_status.get("active_connections", 0) > pool_status.get("pool_size", 10) * 0.8 ): alerts.append( { "id": "db_pool_high_001", "type": "database_performance", "severity": "warning", "title": "High Database Connection Usage", "description": "Database connection pool usage is above 80%", "timestamp": timestamp, "metric_value": pool_status.get("active_connections", 0), "threshold_value": pool_status.get("pool_size", 10) * 0.8, "status": "active", "tags": ["database", "connections", "performance"], } ) except Exception: pass return { "alerts": alerts, "total_count": len(alerts), "severity_counts": { "critical": len([a for a in alerts if a["severity"] == "critical"]), "warning": len([a for a in alerts if a["severity"] == "warning"]), "info": len([a for a in alerts if a["severity"] == "info"]), }, "timestamp": timestamp, } except Exception as e: logger.error(f"Failed to get alerts: {e}") raise HTTPException(status_code=500, detail="Failed to get alerts") @router.get("/alerts/rules") async def get_alert_rules(): """ Get configured alert rules and thresholds. Returns the current alert rule configuration including: - Performance thresholds - Anomaly detection settings - Alert severity levels - Notification settings """ try: # Get the backtesting metrics collector get_backtesting_metrics() # Return the configured alert rules rules = { "performance_thresholds": { "sharpe_ratio": { "warning_threshold": 0.5, "critical_threshold": 0.0, "comparison": "less_than", "enabled": True, }, "max_drawdown": { "warning_threshold": 20.0, "critical_threshold": 30.0, "comparison": "greater_than", "enabled": True, }, "win_rate": { "warning_threshold": 40.0, "critical_threshold": 30.0, "comparison": "less_than", "enabled": True, }, "execution_time": { "warning_threshold": 60.0, "critical_threshold": 120.0, "comparison": "greater_than", "enabled": True, }, }, "resource_thresholds": { "memory_usage": { "warning_threshold": 1000, # MB "critical_threshold": 2000, # MB "comparison": "greater_than", "enabled": True, }, "cpu_usage": { "warning_threshold": 80.0, # % "critical_threshold": 95.0, # % "comparison": "greater_than", "enabled": True, }, "disk_usage": { "warning_threshold": 80.0, # % "critical_threshold": 95.0, # % "comparison": "greater_than", "enabled": True, }, }, "api_thresholds": { "response_time": { "warning_threshold": 30.0, # seconds "critical_threshold": 60.0, # seconds "comparison": "greater_than", "enabled": True, }, "error_rate": { "warning_threshold": 5.0, # % "critical_threshold": 10.0, # % "comparison": "greater_than", "enabled": True, }, "rate_limit_usage": { "warning_threshold": 80.0, # % "critical_threshold": 95.0, # % "comparison": "greater_than", "enabled": True, }, }, "anomaly_detection": { "enabled": True, "sensitivity": "medium", "lookback_period_hours": 24, "minimum_data_points": 10, }, "notification_settings": { "webhook_enabled": False, "email_enabled": False, "slack_enabled": False, "webhook_url": None, }, } return { "rules": rules, "total_rules": sum( len(category) for category in rules.values() if isinstance(category, dict) ), "enabled_rules": sum( len( [ rule for rule in category.values() if isinstance(rule, dict) and rule.get("enabled", False) ] ) for category in rules.values() if isinstance(category, dict) ), "timestamp": time.time(), } except Exception as e: logger.error(f"Failed to get alert rules: {e}") raise HTTPException(status_code=500, detail="Failed to get alert rules")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server