Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
health_monitor.py13.9 kB
""" Background health monitoring system. This module provides background tasks for continuous health monitoring, alerting, and automatic recovery actions for the backtesting system. """ import asyncio import logging import time from datetime import UTC, datetime, timedelta from typing import Any from maverick_mcp.config.settings import get_settings from maverick_mcp.monitoring.status_dashboard import get_status_dashboard from maverick_mcp.utils.circuit_breaker import get_circuit_breaker_manager logger = logging.getLogger(__name__) settings = get_settings() # Monitoring intervals (seconds) HEALTH_CHECK_INTERVAL = 30 CIRCUIT_BREAKER_CHECK_INTERVAL = 60 RESOURCE_CHECK_INTERVAL = 45 ALERT_CHECK_INTERVAL = 120 # Alert thresholds ALERT_THRESHOLDS = { "consecutive_failures": 5, "high_cpu_duration": 300, # 5 minutes "high_memory_duration": 300, # 5 minutes "circuit_breaker_open_duration": 180, # 3 minutes } class HealthMonitor: """Background health monitoring system.""" def __init__(self): self.running = False self.tasks = [] self.alerts_sent = {} self.start_time = time.time() self.health_history = [] self.dashboard = get_status_dashboard() self.circuit_breaker_manager = get_circuit_breaker_manager() async def start(self): """Start all background monitoring tasks.""" if self.running: logger.warning("Health monitor is already running") return self.running = True logger.info("Starting health monitoring system...") # Initialize circuit breakers self.circuit_breaker_manager.initialize() # Start monitoring tasks self.tasks = [ asyncio.create_task(self._health_check_loop()), asyncio.create_task(self._circuit_breaker_monitor_loop()), asyncio.create_task(self._resource_monitor_loop()), asyncio.create_task(self._alert_processor_loop()), ] logger.info(f"Started {len(self.tasks)} monitoring tasks") async def stop(self): """Stop all monitoring tasks.""" if not self.running: return self.running = False logger.info("Stopping health monitoring system...") # Cancel all tasks for task in self.tasks: task.cancel() # Wait for tasks to complete if self.tasks: await asyncio.gather(*self.tasks, return_exceptions=True) self.tasks.clear() logger.info("Health monitoring system stopped") async def _health_check_loop(self): """Background loop for general health checks.""" logger.info("Started health check monitoring loop") while self.running: try: await self._perform_health_check() await asyncio.sleep(HEALTH_CHECK_INTERVAL) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in health check loop: {e}") await asyncio.sleep(HEALTH_CHECK_INTERVAL) logger.info("Health check monitoring loop stopped") async def _circuit_breaker_monitor_loop(self): """Background loop for circuit breaker monitoring.""" logger.info("Started circuit breaker monitoring loop") while self.running: try: await self._check_circuit_breakers() await asyncio.sleep(CIRCUIT_BREAKER_CHECK_INTERVAL) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in circuit breaker monitoring loop: {e}") await asyncio.sleep(CIRCUIT_BREAKER_CHECK_INTERVAL) logger.info("Circuit breaker monitoring loop stopped") async def _resource_monitor_loop(self): """Background loop for resource monitoring.""" logger.info("Started resource monitoring loop") while self.running: try: await self._check_resource_usage() await asyncio.sleep(RESOURCE_CHECK_INTERVAL) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in resource monitoring loop: {e}") await asyncio.sleep(RESOURCE_CHECK_INTERVAL) logger.info("Resource monitoring loop stopped") async def _alert_processor_loop(self): """Background loop for alert processing.""" logger.info("Started alert processing loop") while self.running: try: await self._process_alerts() await asyncio.sleep(ALERT_CHECK_INTERVAL) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in alert processing loop: {e}") await asyncio.sleep(ALERT_CHECK_INTERVAL) logger.info("Alert processing loop stopped") async def _perform_health_check(self): """Perform comprehensive health check.""" try: from maverick_mcp.api.routers.health_enhanced import ( _get_detailed_health_status, ) health_status = await _get_detailed_health_status() # Log health status logger.debug(f"Health check: {health_status['status']}") # Record health data self._record_health_data(health_status) # Check for issues requiring attention await self._analyze_health_trends(health_status) except Exception as e: logger.error(f"Failed to perform health check: {e}") async def _check_circuit_breakers(self): """Monitor circuit breaker states and perform recovery actions.""" try: cb_status = self.circuit_breaker_manager.get_health_status() for name, status in cb_status.items(): state = status.get("state", "unknown") # Check for stuck open circuit breakers if state == "open": await self._handle_open_circuit_breaker(name, status) # Check for high failure rates metrics = status.get("metrics", {}) failure_rate = metrics.get("failure_rate", 0) if failure_rate > 0.5: # 50% failure rate await self._handle_high_failure_rate(name, failure_rate) except Exception as e: logger.error(f"Failed to check circuit breakers: {e}") async def _check_resource_usage(self): """Monitor system resource usage.""" try: from maverick_mcp.api.routers.health_enhanced import _get_resource_usage resource_usage = _get_resource_usage() # Check CPU usage if resource_usage.cpu_percent > 80: await self._handle_high_cpu_usage(resource_usage.cpu_percent) # Check memory usage if resource_usage.memory_percent > 85: await self._handle_high_memory_usage(resource_usage.memory_percent) # Check disk usage if resource_usage.disk_percent > 90: await self._handle_high_disk_usage(resource_usage.disk_percent) except Exception as e: logger.error(f"Failed to check resource usage: {e}") async def _process_alerts(self): """Process and manage alerts.""" try: dashboard_data = await self.dashboard.get_dashboard_data() alerts = dashboard_data.get("alerts", []) for alert in alerts: await self._handle_alert(alert) except Exception as e: logger.error(f"Failed to process alerts: {e}") def _record_health_data(self, health_status: dict[str, Any]): """Record health data for trend analysis.""" timestamp = datetime.now(UTC) health_record = { "timestamp": timestamp.isoformat(), "overall_status": health_status.get("status", "unknown"), "components_healthy": len( [ c for c in health_status.get("components", {}).values() if c.status == "healthy" ] ), "components_total": len(health_status.get("components", {})), "resource_usage": health_status.get("resource_usage", {}), } self.health_history.append(health_record) # Keep only last 24 hours of data cutoff_time = timestamp - timedelta(hours=24) self.health_history = [ record for record in self.health_history if datetime.fromisoformat(record["timestamp"].replace("Z", "+00:00")) > cutoff_time ] async def _analyze_health_trends(self, current_status: dict[str, Any]): """Analyze health trends and predict issues.""" if len(self.health_history) < 10: return # Not enough data for trend analysis # Analyze degradation trends recent_records = self.health_history[-10:] # Last 10 records unhealthy_trend = sum( 1 for record in recent_records if record["overall_status"] in ["degraded", "unhealthy"] ) if unhealthy_trend >= 7: # 70% of recent checks are problematic logger.warning( "Detected concerning health trend - system may need attention" ) await self._trigger_maintenance_alert() async def _handle_open_circuit_breaker(self, name: str, status: dict[str, Any]): """Handle circuit breaker that's been open too long.""" # Check if we've already alerted for this breaker recently alert_key = f"cb_open_{name}" last_alert = self.alerts_sent.get(alert_key) if last_alert and (time.time() - last_alert) < 300: # 5 minutes return logger.warning(f"Circuit breaker '{name}' has been open - investigating") # Record alert self.alerts_sent[alert_key] = time.time() # Could implement automatic recovery attempts here # For now, just log the issue async def _handle_high_failure_rate(self, name: str, failure_rate: float): """Handle high failure rate in circuit breaker.""" logger.warning(f"High failure rate detected for {name}: {failure_rate:.1%}") async def _handle_high_cpu_usage(self, cpu_percent: float): """Handle sustained high CPU usage.""" alert_key = "high_cpu" last_alert = self.alerts_sent.get(alert_key) if last_alert and (time.time() - last_alert) < 600: # 10 minutes return logger.warning(f"High CPU usage detected: {cpu_percent:.1f}%") self.alerts_sent[alert_key] = time.time() async def _handle_high_memory_usage(self, memory_percent: float): """Handle sustained high memory usage.""" alert_key = "high_memory" last_alert = self.alerts_sent.get(alert_key) if last_alert and (time.time() - last_alert) < 600: # 10 minutes return logger.warning(f"High memory usage detected: {memory_percent:.1f}%") self.alerts_sent[alert_key] = time.time() async def _handle_high_disk_usage(self, disk_percent: float): """Handle high disk usage.""" alert_key = "high_disk" last_alert = self.alerts_sent.get(alert_key) if last_alert and (time.time() - last_alert) < 1800: # 30 minutes return logger.error(f"Critical disk usage detected: {disk_percent:.1f}%") self.alerts_sent[alert_key] = time.time() async def _handle_alert(self, alert: dict[str, Any]): """Handle individual alert.""" severity = alert.get("severity", "info") # Log alert based on severity if severity == "critical": logger.error(f"Critical alert: {alert.get('title', 'Unknown')}") elif severity == "warning": logger.warning(f"Warning alert: {alert.get('title', 'Unknown')}") else: logger.info(f"Info alert: {alert.get('title', 'Unknown')}") async def _trigger_maintenance_alert(self): """Trigger alert for system maintenance needed.""" alert_key = "maintenance_needed" last_alert = self.alerts_sent.get(alert_key) if last_alert and (time.time() - last_alert) < 3600: # 1 hour return logger.error("System health trends indicate maintenance may be needed") self.alerts_sent[alert_key] = time.time() def get_monitoring_status(self) -> dict[str, Any]: """Get current monitoring system status.""" return { "running": self.running, "uptime_seconds": time.time() - self.start_time, "active_tasks": len([t for t in self.tasks if not t.done()]), "total_tasks": len(self.tasks), "health_records": len(self.health_history), "alerts_sent_count": len(self.alerts_sent), "last_health_check": max( [record["timestamp"] for record in self.health_history], default=None ), } # Global health monitor instance _health_monitor = HealthMonitor() def get_health_monitor() -> HealthMonitor: """Get the global health monitor instance.""" return _health_monitor async def start_health_monitoring(): """Start health monitoring system (convenience function).""" await _health_monitor.start() async def stop_health_monitoring(): """Stop health monitoring system (convenience function).""" await _health_monitor.stop() def get_monitoring_status() -> dict[str, Any]: """Get monitoring status (convenience function).""" return _health_monitor.get_monitoring_status()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server