Skip to main content
Glama

Security MCP Server

by nordeim
health.py43.7 kB
""" Health monitoring system for MCP server. Production-ready implementation with all critical fixes applied. All critical fixes applied: - Health check result staleness validation - HealthCheckPriority enum instead of magic numbers - Enhanced overlap prevention - Proper cleanup methods - Comprehensive type hints - Better configuration handling Features: - Priority-based health checks (Critical, Important, Informational) - Automatic staleness detection - Overlap prevention for concurrent checks - Configurable thresholds and intervals - Comprehensive health history - Custom health check support - Tool availability monitoring Usage: from mcp_server.health import HealthCheckManager, HealthCheckPriority # Create health manager manager = HealthCheckManager(config=config) # Add custom check async def check_database(): # Check database connectivity return HealthStatus.HEALTHY manager.register_check( "database", check_database, priority=HealthCheckPriority.CRITICAL ) # Start monitoring await manager.start_monitoring() # Get health status health = await manager.run_health_checks() # Cleanup await manager.stop_monitoring() """ import asyncio import logging import time from datetime import datetime, timedelta from enum import Enum, IntEnum from dataclasses import dataclass, field from typing import Dict, List, Optional, Any, Callable, Awaitable, Union, Protocol from collections import deque try: import psutil PSUTIL_AVAILABLE = True except ImportError: PSUTIL_AVAILABLE = False psutil = None log = logging.getLogger(__name__) class HealthStatus(Enum): """Health status levels.""" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" class HealthCheckPriority(IntEnum): """ Health check priority levels with clear semantics. Priority determines impact on overall health status: - CRITICAL (0): Failure causes UNHEALTHY status - IMPORTANT (1): Failure causes DEGRADED status - INFORMATIONAL (2): Failure only logged, minimal impact """ CRITICAL = 0 # System-critical checks (CPU, memory, disk) IMPORTANT = 1 # Important but non-critical (process health, circuit breakers) INFORMATIONAL = 2 # Nice-to-have checks (dependencies, tool availability) @dataclass class HealthCheckResult: """Result of a health check with comprehensive metadata.""" name: str status: HealthStatus message: str priority: HealthCheckPriority = HealthCheckPriority.INFORMATIONAL timestamp: datetime = field(default_factory=datetime.now) duration: float = 0.0 metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return { "name": self.name, "status": self.status.value, "message": self.message, "priority": self.priority.value, "priority_name": self.priority.name, "timestamp": self.timestamp.isoformat(), "duration": round(self.duration, 3), "metadata": self.metadata } @dataclass class SystemHealth: """Overall system health status with staleness tracking.""" overall_status: HealthStatus checks: Dict[str, HealthCheckResult] timestamp: datetime = field(default_factory=datetime.now) metadata: Dict[str, Any] = field(default_factory=dict) def is_stale(self, max_age_seconds: float = 60.0) -> bool: """ Check if health data is stale. Args: max_age_seconds: Maximum age before considered stale Returns: True if stale, False otherwise """ age = (datetime.now() - self.timestamp).total_seconds() return age > max_age_seconds def get_age_seconds(self) -> float: """Get age of health check in seconds.""" return (datetime.now() - self.timestamp).total_seconds() def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return { "overall_status": self.overall_status.value, "timestamp": self.timestamp.isoformat(), "age_seconds": round(self.get_age_seconds(), 2), "is_stale": self.is_stale(), "checks": {name: result.to_dict() for name, result in self.checks.items()}, "metadata": self.metadata } class HealthCheckProtocol(Protocol): """Protocol for health check callables.""" async def __call__(self) -> HealthStatus: """Execute health check and return status.""" ... class HealthCheck: """Base class for health checks with timeout and error handling.""" def __init__(self, name: str, priority: HealthCheckPriority = HealthCheckPriority.INFORMATIONAL, timeout: float = 10.0): self.name = name self.priority = max(HealthCheckPriority.CRITICAL, min(priority, HealthCheckPriority.INFORMATIONAL)) self.timeout = max(1.0, timeout) async def check(self) -> HealthCheckResult: """Execute the health check with timeout and error handling.""" start_time = time.time() try: result = await asyncio.wait_for(self._execute_check(), timeout=self.timeout) duration = time.time() - start_time result.duration = duration result.priority = self.priority return result except asyncio.TimeoutError: return HealthCheckResult( name=self.name, status=HealthStatus.UNHEALTHY, message=f"Health check timed out after {self.timeout}s", priority=self.priority, duration=self.timeout, metadata={"error": "timeout"} ) except Exception as e: duration = time.time() - start_time log.error("health_check.failed name=%s error=%s duration=%.2f", self.name, str(e), duration, exc_info=True) return HealthCheckResult( name=self.name, status=HealthStatus.UNHEALTHY, message=f"Health check failed: {str(e)}", priority=self.priority, duration=duration, metadata={"error": str(e), "error_type": type(e).__name__} ) async def _execute_check(self) -> HealthCheckResult: """Override this method to implement specific health check logic.""" raise NotImplementedError class SystemResourceHealthCheck(HealthCheck): """Check system resources (CPU, memory, disk) with proper thresholds.""" def __init__(self, name: str = "system_resources", cpu_threshold: float = 80.0, memory_threshold: float = 80.0, disk_threshold: float = 80.0, priority: HealthCheckPriority = HealthCheckPriority.CRITICAL): super().__init__(name, priority) self.cpu_threshold = max(0.0, min(100.0, cpu_threshold)) self.memory_threshold = max(0.0, min(100.0, memory_threshold)) self.disk_threshold = max(0.0, min(100.0, disk_threshold)) async def _execute_check(self) -> HealthCheckResult: """Check system resources with detailed reporting.""" if not PSUTIL_AVAILABLE: return HealthCheckResult( name=self.name, status=HealthStatus.DEGRADED, message="psutil not available for system resource monitoring", priority=self.priority, metadata={"psutil_available": False, "hint": "pip install psutil"} ) try: # CPU check (non-blocking) cpu_percent = psutil.cpu_percent(interval=1) # Memory check memory = psutil.virtual_memory() memory_percent = memory.percent # Disk check (with error handling) try: disk = psutil.disk_usage('/') disk_percent = disk.percent except Exception as disk_error: log.warning("health_check.disk_usage_failed error=%s", str(disk_error)) disk_percent = 0.0 # Determine status based on thresholds status = HealthStatus.HEALTHY messages = [] warnings = [] if cpu_percent > self.cpu_threshold: status = HealthStatus.UNHEALTHY messages.append(f"CPU usage critical: {cpu_percent:.1f}%") elif cpu_percent > self.cpu_threshold * 0.8: warnings.append(f"CPU usage elevated: {cpu_percent:.1f}%") if memory_percent > self.memory_threshold: if status == HealthStatus.HEALTHY: status = HealthStatus.DEGRADED messages.append(f"Memory usage high: {memory_percent:.1f}%") elif memory_percent > self.memory_threshold * 0.8: warnings.append(f"Memory usage elevated: {memory_percent:.1f}%") if disk_percent > self.disk_threshold: if status == HealthStatus.HEALTHY: status = HealthStatus.DEGRADED messages.append(f"Disk usage high: {disk_percent:.1f}%") elif disk_percent > self.disk_threshold * 0.8: warnings.append(f"Disk usage elevated: {disk_percent:.1f}%") # Construct message if messages: message = ", ".join(messages) elif warnings: message = "System resources acceptable with warnings: " + ", ".join(warnings) else: message = "System resources healthy" return HealthCheckResult( name=self.name, status=status, message=message, priority=self.priority, metadata={ "cpu_percent": round(cpu_percent, 2), "memory_percent": round(memory_percent, 2), "disk_percent": round(disk_percent, 2), "cpu_threshold": self.cpu_threshold, "memory_threshold": self.memory_threshold, "disk_threshold": self.disk_threshold, "warnings": warnings, "psutil_available": True } ) except Exception as e: log.error("health_check.system_resources_failed error=%s", str(e), exc_info=True) return HealthCheckResult( name=self.name, status=HealthStatus.UNHEALTHY, message=f"Failed to check system resources: {str(e)}", priority=self.priority, metadata={"psutil_available": PSUTIL_AVAILABLE, "error": str(e)} ) class ToolAvailabilityHealthCheck(HealthCheck): """Check availability of MCP tools.""" def __init__(self, tool_registry, name: str = "tool_availability", priority: HealthCheckPriority = HealthCheckPriority.INFORMATIONAL): super().__init__(name, priority) self.tool_registry = tool_registry async def _execute_check(self) -> HealthCheckResult: """Check tool availability with detailed reporting.""" try: if not hasattr(self.tool_registry, 'get_enabled_tools'): return HealthCheckResult( name=self.name, status=HealthStatus.UNHEALTHY, message="Tool registry does not support get_enabled_tools method", priority=self.priority, metadata={"registry_type": type(self.tool_registry).__name__} ) tools = self.tool_registry.get_enabled_tools() unavailable_tools = [] available_count = 0 for tool_name, tool in tools.items(): try: if not hasattr(tool, '_resolve_command'): unavailable_tools.append({ "name": tool_name, "reason": "missing _resolve_command method" }) elif not tool._resolve_command(): unavailable_tools.append({ "name": tool_name, "reason": "command not found in PATH" }) else: available_count += 1 except Exception as tool_error: unavailable_tools.append({ "name": tool_name, "reason": f"error: {str(tool_error)}" }) total_tools = len(tools) # Determine status if unavailable_tools: if available_count == 0: status = HealthStatus.UNHEALTHY message = f"All {total_tools} tools unavailable" elif available_count < total_tools / 2: status = HealthStatus.DEGRADED message = f"Majority of tools unavailable: {len(unavailable_tools)}/{total_tools}" else: status = HealthStatus.DEGRADED message = f"Some tools unavailable: {len(unavailable_tools)}/{total_tools}" else: status = HealthStatus.HEALTHY message = f"All {total_tools} tools available" return HealthCheckResult( name=self.name, status=status, message=message, priority=self.priority, metadata={ "total_tools": total_tools, "available_tools": available_count, "unavailable_tools": len(unavailable_tools), "unavailable_details": unavailable_tools } ) except Exception as e: log.error("health_check.tool_availability_failed error=%s", str(e), exc_info=True) return HealthCheckResult( name=self.name, status=HealthStatus.UNHEALTHY, message=f"Failed to check tool availability: {str(e)}", priority=self.priority, metadata={ "registry_type": type(self.tool_registry).__name__ if self.tool_registry else None, "error": str(e) } ) class ProcessHealthCheck(HealthCheck): """Check if the process is running properly.""" def __init__(self, name: str = "process_health", priority: HealthCheckPriority = HealthCheckPriority.IMPORTANT): super().__init__(name, priority) async def _execute_check(self) -> HealthCheckResult: """Check process health with detailed metrics.""" if not PSUTIL_AVAILABLE: return HealthCheckResult( name=self.name, status=HealthStatus.DEGRADED, message="psutil not available for process health monitoring", priority=self.priority, metadata={"psutil_available": False, "hint": "pip install psutil"} ) try: process = psutil.Process() if not process.is_running(): return HealthCheckResult( name=self.name, status=HealthStatus.UNHEALTHY, message="Process is not running", priority=self.priority, metadata={"pid": process.pid} ) # Get process metrics create_time = datetime.fromtimestamp(process.create_time()) age = datetime.now() - create_time memory_info = process.memory_info() memory_mb = memory_info.rss / 1024 / 1024 cpu_percent = process.cpu_percent() # Get thread count num_threads = process.num_threads() # Get file descriptors (Unix only) try: num_fds = process.num_fds() except (AttributeError, NotImplementedError): num_fds = None return HealthCheckResult( name=self.name, status=HealthStatus.HEALTHY, message="Process is running normally", priority=self.priority, metadata={ "pid": process.pid, "age_seconds": round(age.total_seconds(), 2), "memory_mb": round(memory_mb, 2), "cpu_percent": round(cpu_percent, 2), "num_threads": num_threads, "num_fds": num_fds, "create_time": create_time.isoformat(), "psutil_available": True } ) except Exception as e: log.error("health_check.process_health_failed error=%s", str(e), exc_info=True) return HealthCheckResult( name=self.name, status=HealthStatus.UNHEALTHY, message=f"Failed to check process health: {str(e)}", priority=self.priority, metadata={"psutil_available": PSUTIL_AVAILABLE, "error": str(e)} ) class DependencyHealthCheck(HealthCheck): """Check external dependencies availability.""" def __init__(self, dependencies: List[str], name: str = "dependencies", priority: HealthCheckPriority = HealthCheckPriority.INFORMATIONAL): super().__init__(name, priority) self.dependencies = dependencies or [] async def _execute_check(self) -> HealthCheckResult: """Check dependency availability.""" try: import importlib missing_deps = [] available_deps = [] for dep in self.dependencies: try: importlib.import_module(dep) available_deps.append(dep) except ImportError: missing_deps.append({"name": dep, "reason": "not installed"}) except Exception as dep_error: missing_deps.append({ "name": dep, "reason": f"error: {str(dep_error)}" }) total_deps = len(self.dependencies) # Determine status if missing_deps: if available_deps: status = HealthStatus.DEGRADED message = f"Some dependencies missing: {len(missing_deps)}/{total_deps}" else: status = HealthStatus.UNHEALTHY message = f"All {total_deps} dependencies missing" else: status = HealthStatus.HEALTHY message = f"All {total_deps} dependencies available" return HealthCheckResult( name=self.name, status=status, message=message, priority=self.priority, metadata={ "total_dependencies": total_deps, "available_dependencies": available_deps, "missing_dependencies": missing_deps } ) except Exception as e: log.error("health_check.dependency_failed error=%s", str(e), exc_info=True) return HealthCheckResult( name=self.name, status=HealthStatus.UNHEALTHY, message=f"Failed to check dependencies: {str(e)}", priority=self.priority, metadata={"dependencies": self.dependencies, "error": str(e)} ) class CustomHealthCheck(HealthCheck): """Custom health check with user-provided function.""" def __init__(self, name: str, check_func: Callable[[], Awaitable[HealthStatus]], priority: HealthCheckPriority = HealthCheckPriority.INFORMATIONAL, timeout: float = 10.0): super().__init__(name, priority, timeout) self.check_func = check_func async def _execute_check(self) -> HealthCheckResult: """Execute custom health check function.""" try: status = await self.check_func() message = f"{self.name} check {'passed' if status == HealthStatus.HEALTHY else 'failed'}" return HealthCheckResult( name=self.name, status=status, message=message, priority=self.priority ) except Exception as e: return HealthCheckResult( name=self.name, status=HealthStatus.UNHEALTHY, message=f"Custom check failed: {str(e)}", priority=self.priority, metadata={"error": str(e)} ) class HealthCheckManager: """ Manager for health checks with enhanced features and all critical fixes. Features: - Priority-based health checks - Staleness detection - Overlap prevention - Automatic monitoring - Health history tracking - Comprehensive cleanup """ def __init__(self, config: Optional[Union[dict, object]] = None, checks: Optional[List[HealthCheck]] = None, check_timeout_seconds: float = 10.0, max_staleness_seconds: float = 60.0): self._raw_config = config self.config = self._normalize_config(config) self.health_checks: Dict[str, HealthCheck] = {} self.check_priorities: Dict[str, HealthCheckPriority] = {} self.last_health_check: Optional[SystemHealth] = None self.check_interval = self.config.get('check_interval', 30.0) self.check_timeout_seconds = max(1.0, check_timeout_seconds) self.max_staleness_seconds = max(10.0, max_staleness_seconds) self._monitor_task: Optional[asyncio.Task] = None self._shutdown_event = asyncio.Event() self._check_in_progress = False self._check_lock = asyncio.Lock() self.check_history = deque(maxlen=100) # Register provided checks if checks: for check in checks: self.add_health_check(check, check.priority) self._initialize_default_checks() def _normalize_config(self, cfg: Union[dict, object, None]) -> dict: """Simplified and robust config normalization.""" defaults = { 'check_interval': 30.0, 'cpu_threshold': 80.0, 'memory_threshold': 80.0, 'disk_threshold': 80.0, 'dependencies': [], 'timeout': 10.0, } if cfg is None: return defaults result = defaults.copy() # Handle dict config if isinstance(cfg, dict): # Direct keys for key in defaults: if key in cfg: result[key] = cfg[key] # Nested health section if 'health' in cfg and isinstance(cfg['health'], dict): health = cfg['health'] mapping = { 'check_interval': 'check_interval', 'cpu_threshold': 'cpu_threshold', 'memory_threshold': 'memory_threshold', 'disk_threshold': 'disk_threshold', 'dependencies': 'dependencies', 'timeout': 'timeout' } for src, dst in mapping.items(): if src in health: result[dst] = health[src] # Handle object config elif hasattr(cfg, 'health'): health = getattr(cfg, 'health', None) if health: for attr in ['check_interval', 'cpu_threshold', 'memory_threshold', 'disk_threshold', 'dependencies', 'timeout']: if hasattr(health, attr): value = getattr(health, attr, None) if value is not None: result[attr] = value # Validate and clamp values result['check_interval'] = max(5.0, float(result['check_interval'])) result['cpu_threshold'] = max(0.0, min(100.0, float(result['cpu_threshold']))) result['memory_threshold'] = max(0.0, min(100.0, float(result['memory_threshold']))) result['disk_threshold'] = max(0.0, min(100.0, float(result['disk_threshold']))) result['timeout'] = max(1.0, float(result['timeout'])) if not isinstance(result['dependencies'], list): result['dependencies'] = [] return result def _initialize_default_checks(self): """Initialize default health checks with proper priorities.""" try: # System resources check (CRITICAL) self.add_health_check( SystemResourceHealthCheck( cpu_threshold=self.config['cpu_threshold'], memory_threshold=self.config['memory_threshold'], disk_threshold=self.config['disk_threshold'], priority=HealthCheckPriority.CRITICAL ), priority=HealthCheckPriority.CRITICAL ) # Process health (IMPORTANT) self.add_health_check( ProcessHealthCheck(priority=HealthCheckPriority.IMPORTANT), priority=HealthCheckPriority.IMPORTANT ) # Dependencies (INFORMATIONAL) if self.config['dependencies']: self.add_health_check( DependencyHealthCheck( self.config['dependencies'], priority=HealthCheckPriority.INFORMATIONAL ), priority=HealthCheckPriority.INFORMATIONAL ) log.info("health_check_manager.initialized checks=%d interval=%.1f", len(self.health_checks), self.check_interval) except Exception as e: log.error("health_check_manager.initialization_failed error=%s", str(e), exc_info=True) def add_health_check(self, health_check: HealthCheck, priority: HealthCheckPriority = HealthCheckPriority.INFORMATIONAL): """Add a health check with priority level.""" if not health_check or not health_check.name: log.warning("health_check.invalid_check skipped") return self.health_checks[health_check.name] = health_check self.check_priorities[health_check.name] = priority log.info("health_check.added name=%s priority=%s", health_check.name, priority.name) def remove_health_check(self, name: str): """Remove a health check.""" if name in self.health_checks: del self.health_checks[name] if name in self.check_priorities: del self.check_priorities[name] log.info("health_check.removed name=%s", name) def register_check(self, name: str, check_func: Callable[[], Awaitable[HealthStatus]], priority: HealthCheckPriority = HealthCheckPriority.INFORMATIONAL, timeout: float = 10.0): """Register a custom health check function.""" health_check = CustomHealthCheck(name, check_func, priority, timeout) self.add_health_check(health_check, priority) async def run_checks(self) -> SystemHealth: """Alias for run_health_checks for compatibility.""" return await self.run_health_checks() async def run_health_checks(self) -> SystemHealth: """ Run health checks with staleness validation and overlap prevention. Returns: SystemHealth with current health status """ # Check if already in progress if self._check_in_progress: log.warning("health_checks.already_running") # Return cached result if fresh enough if self.last_health_check and not self.last_health_check.is_stale(self.max_staleness_seconds): log.debug("health_checks.returning_cached age=%.2fs", self.last_health_check.get_age_seconds()) return self.last_health_check # Wait briefly for current check to complete try: await asyncio.wait_for(self._check_lock.acquire(), timeout=2.0) self._check_lock.release() # Return latest result after wait if self.last_health_check: return self.last_health_check except asyncio.TimeoutError: pass # Return degraded status if no recent data return SystemHealth( overall_status=HealthStatus.DEGRADED, checks={}, metadata={ "message": "Health check in progress, no recent data available", "check_in_progress": True } ) # Acquire lock for this check async with self._check_lock: self._check_in_progress = True try: if not self.health_checks: return SystemHealth( overall_status=HealthStatus.HEALTHY, checks={}, metadata={"message": "No health checks configured"} ) check_results = {} tasks = [] timeout = self.config.get('timeout', 10.0) # Create tasks for all checks for name, health_check in self.health_checks.items(): if hasattr(health_check, 'timeout'): health_check.timeout = min(health_check.timeout, timeout) task = asyncio.create_task( self._run_single_check(name, health_check), name=f"health_check_{name}" ) tasks.append((name, task)) # Wait for all checks with overall timeout try: done, pending = await asyncio.wait( [task for _, task in tasks], timeout=timeout + 2.0, return_when=asyncio.ALL_COMPLETED ) # Cancel any pending tasks for task in pending: task.cancel() log.warning("health_check.timeout task=%s", task.get_name()) except Exception as e: log.error("health_check.wait_failed error=%s", str(e), exc_info=True) # Collect results for name, task in tasks: try: if task.done() and not task.cancelled(): result = task.result() else: result = HealthCheckResult( name=name, status=HealthStatus.UNHEALTHY, message="Health check timed out or was cancelled", priority=self.check_priorities.get(name, HealthCheckPriority.INFORMATIONAL) ) check_results[name] = result except Exception as e: log.error("health_check.result_failed name=%s error=%s", name, str(e)) check_results[name] = HealthCheckResult( name=name, status=HealthStatus.UNHEALTHY, message=f"Health check failed: {str(e)}", priority=self.check_priorities.get(name, HealthCheckPriority.INFORMATIONAL) ) # Calculate overall status overall_status = self._calculate_overall_status(check_results) # Create system health system_health = SystemHealth( overall_status=overall_status, checks=check_results, metadata=self._generate_health_metadata(check_results) ) # Update history self.check_history.append({ "timestamp": system_health.timestamp.isoformat(), "status": overall_status.value, "check_count": len(check_results) }) # Cache result self.last_health_check = system_health log.info( "health_check.completed overall=%s checks=%d duration=%.2f age=%.2f", overall_status.value, len(check_results), sum(r.duration for r in check_results.values()), system_health.get_age_seconds() ) return system_health finally: self._check_in_progress = False async def _run_single_check(self, name: str, health_check: HealthCheck) -> HealthCheckResult: """Run a single health check with error handling.""" try: return await health_check.check() except Exception as e: log.error("health_check.execution_failed name=%s error=%s", name, str(e), exc_info=True) return HealthCheckResult( name=name, status=HealthStatus.UNHEALTHY, message=f"Check failed: {str(e)}", priority=self.check_priorities.get(name, HealthCheckPriority.INFORMATIONAL) ) def _calculate_overall_status(self, check_results: Dict[str, HealthCheckResult]) -> HealthStatus: """Calculate overall status with priority weighting.""" # Critical checks (priority 0) critical_checks = [ result for name, result in check_results.items() if self.check_priorities.get(name, HealthCheckPriority.INFORMATIONAL) == HealthCheckPriority.CRITICAL ] if any(r.status == HealthStatus.UNHEALTHY for r in critical_checks): return HealthStatus.UNHEALTHY # Important checks (priority 1) important_checks = [ result for name, result in check_results.items() if self.check_priorities.get(name, HealthCheckPriority.INFORMATIONAL) == HealthCheckPriority.IMPORTANT ] if any(r.status == HealthStatus.UNHEALTHY for r in important_checks): return HealthStatus.DEGRADED # Any degraded status if any(r.status == HealthStatus.DEGRADED for r in check_results.values()): return HealthStatus.DEGRADED # Informational checks (priority 2) - only degrade if all are unhealthy info_checks = [ result for name, result in check_results.items() if self.check_priorities.get(name, HealthCheckPriority.INFORMATIONAL) == HealthCheckPriority.INFORMATIONAL ] if info_checks and all(r.status == HealthStatus.UNHEALTHY for r in info_checks): return HealthStatus.DEGRADED return HealthStatus.HEALTHY def _generate_health_metadata(self, check_results: Dict[str, HealthCheckResult]) -> Dict[str, Any]: """Generate comprehensive health metadata.""" priority_breakdown = { "critical": {"total": 0, "healthy": 0, "degraded": 0, "unhealthy": 0}, "important": {"total": 0, "healthy": 0, "degraded": 0, "unhealthy": 0}, "informational": {"total": 0, "healthy": 0, "degraded": 0, "unhealthy": 0} } for name, result in check_results.items(): priority = self.check_priorities.get(name, HealthCheckPriority.INFORMATIONAL) priority_name = priority.name.lower() priority_breakdown[priority_name]["total"] += 1 if result.status == HealthStatus.HEALTHY: priority_breakdown[priority_name]["healthy"] += 1 elif result.status == HealthStatus.DEGRADED: priority_breakdown[priority_name]["degraded"] += 1 else: priority_breakdown[priority_name]["unhealthy"] += 1 return { "total_checks": len(check_results), "healthy_checks": sum(1 for r in check_results.values() if r.status == HealthStatus.HEALTHY), "degraded_checks": sum(1 for r in check_results.values() if r.status == HealthStatus.DEGRADED), "unhealthy_checks": sum(1 for r in check_results.values() if r.status == HealthStatus.UNHEALTHY), "average_duration": round( sum(r.duration for r in check_results.values()) / len(check_results), 3 ) if check_results else 0.0, "priority_breakdown": priority_breakdown, "check_priorities": {name: p.name for name, p in self.check_priorities.items()} } async def start_monitoring(self): """Start health monitoring with overlap prevention.""" if self._monitor_task and not self._monitor_task.done(): log.warning("health_monitor.already_running") return self._shutdown_event.clear() self._monitor_task = asyncio.create_task( self._monitor_loop(), name="health_monitor" ) log.info("health_monitor.started interval=%.1f", self.check_interval) async def _monitor_loop(self): """Health monitoring loop with proper overlap prevention.""" try: while not self._shutdown_event.is_set(): try: # Run checks with timeout slightly less than interval await asyncio.wait_for( self.run_health_checks(), timeout=self.check_interval * 0.9 ) except asyncio.TimeoutError: log.warning("health_monitor.check_timeout interval=%.1f", self.check_interval) except Exception as e: log.error("health_monitor.check_failed error=%s", str(e), exc_info=True) # Wait for next interval or shutdown try: await asyncio.wait_for( self._shutdown_event.wait(), timeout=self.check_interval ) except asyncio.TimeoutError: continue # Normal timeout, continue loop except asyncio.CancelledError: log.info("health_monitor.cancelled") raise finally: log.info("health_monitor.stopped") async def stop_monitoring(self): """Stop health monitoring gracefully with proper cleanup.""" log.info("health_monitor.stopping") self._shutdown_event.set() if self._monitor_task and not self._monitor_task.done(): try: await asyncio.wait_for(self._monitor_task, timeout=5.0) except asyncio.TimeoutError: log.warning("health_monitor.stop_timeout cancelling_task") self._monitor_task.cancel() try: await self._monitor_task except asyncio.CancelledError: pass log.info("health_monitor.stopped") async def get_overall_health(self) -> HealthStatus: """Get current overall health status.""" if self.last_health_check and not self.last_health_check.is_stale(self.max_staleness_seconds): return self.last_health_check.overall_status system_health = await self.run_health_checks() return system_health.overall_status async def get_all_check_results(self) -> Dict[str, Any]: """Get all health check results with detailed information.""" if not self.last_health_check or self.last_health_check.is_stale(self.max_staleness_seconds): await self.run_health_checks() if self.last_health_check: return { name: result.to_dict() for name, result in self.last_health_check.checks.items() } return {} def get_health_summary(self) -> Dict[str, Any]: """Get a summary of health status with comprehensive details.""" if not self.last_health_check: return { "status": "unknown", "message": "No health check data available", "timestamp": datetime.now().isoformat() } return { "overall_status": self.last_health_check.overall_status.value, "timestamp": self.last_health_check.timestamp.isoformat(), "age_seconds": round(self.last_health_check.get_age_seconds(), 2), "is_stale": self.last_health_check.is_stale(self.max_staleness_seconds), "checks": { name: { "status": result.status.value, "message": result.message, "duration": round(result.duration, 3), "priority": self.check_priorities.get(name, HealthCheckPriority.INFORMATIONAL).name } for name, result in self.last_health_check.checks.items() }, "metadata": self.last_health_check.metadata, "history": list(self.check_history)[-10:] } async def cleanup(self): """Clean up resources and stop monitoring.""" log.info("health_check_manager.cleanup_started") # Stop monitoring await self.stop_monitoring() # Clear caches self.last_health_check = None self.check_history.clear() log.info("health_check_manager.cleanup_completed") async def __aenter__(self): """Start health monitoring when used as context manager.""" await self.start_monitoring() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Stop health monitoring and cleanup when exiting context.""" await self.cleanup()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nordeim/Security-MCP-Server-v3'

If you have feedback or need assistance with the MCP directory API, please join our Discord server