Skip to main content
Glama

Ultimate MCP Coding Platform

monitoring.py13.6 kB
"""Monitoring and health check components for Ultimate MCP.""" from __future__ import annotations import asyncio import logging import time from collections import defaultdict, deque from dataclasses import dataclass, field from typing import Any, Dict, List import psutil logger = logging.getLogger(__name__) @dataclass class SystemMetrics: """System performance metrics.""" cpu_percent: float memory_percent: float memory_used_mb: float memory_available_mb: float disk_usage_percent: float network_bytes_sent: int network_bytes_recv: int process_count: int load_average: List[float] = field(default_factory=list) @dataclass class ApplicationMetrics: """Application-specific metrics.""" total_requests: int = 0 successful_requests: int = 0 failed_requests: int = 0 average_response_time: float = 0.0 active_connections: int = 0 # Execution metrics total_executions: int = 0 successful_executions: int = 0 failed_executions: int = 0 average_execution_time: float = 0.0 # Language breakdown executions_by_language: Dict[str, int] = field(default_factory=dict) # User metrics unique_users: int = 0 authenticated_requests: int = 0 public_requests: int = 0 class MetricsCollector: """Collects and aggregates application metrics.""" def __init__(self, max_history: int = 1000): self.max_history = max_history self.request_times = deque(maxlen=max_history) self.execution_times = deque(maxlen=max_history) self.request_counts = defaultdict(int) self.execution_counts = defaultdict(int) self.language_counts = defaultdict(int) self.user_sessions = set() self.start_time = time.time() async def record_request( self, method: str, path: str, status_code: int, duration: float, user_id: str | None = None, authenticated: bool = False, ) -> None: """Record HTTP request metrics.""" self.request_times.append(duration) self.request_counts["total"] += 1 if 200 <= status_code < 400: self.request_counts["successful"] += 1 else: self.request_counts["failed"] += 1 if user_id: self.user_sessions.add(user_id) if authenticated: self.request_counts["authenticated"] += 1 else: self.request_counts["public"] += 1 async def record_execution( self, language: str, duration: float, success: bool, user_id: str | None = None, ) -> None: """Record code execution metrics.""" self.execution_times.append(duration) self.execution_counts["total"] += 1 self.language_counts[language] += 1 if success: self.execution_counts["successful"] += 1 else: self.execution_counts["failed"] += 1 if user_id: self.user_sessions.add(user_id) def get_system_metrics(self) -> SystemMetrics: """Get current system metrics.""" try: # CPU and memory cpu_percent = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() # Disk usage (root partition) disk = psutil.disk_usage("/") # Network stats network = psutil.net_io_counters() # Process count process_count = len(psutil.pids()) # Load average (Unix-like systems) try: load_avg = list(psutil.getloadavg()) except AttributeError: load_avg = [] return SystemMetrics( cpu_percent=cpu_percent, memory_percent=memory.percent, memory_used_mb=memory.used / (1024 * 1024), memory_available_mb=memory.available / (1024 * 1024), disk_usage_percent=disk.percent, network_bytes_sent=network.bytes_sent, network_bytes_recv=network.bytes_recv, process_count=process_count, load_average=load_avg, ) except Exception as e: logger.error(f"Failed to collect system metrics: {e}") return SystemMetrics( cpu_percent=0.0, memory_percent=0.0, memory_used_mb=0.0, memory_available_mb=0.0, disk_usage_percent=0.0, network_bytes_sent=0, network_bytes_recv=0, process_count=0, ) def get_application_metrics(self) -> ApplicationMetrics: """Get current application metrics.""" avg_response_time = ( sum(self.request_times) / len(self.request_times) if self.request_times else 0.0 ) avg_execution_time = ( sum(self.execution_times) / len(self.execution_times) if self.execution_times else 0.0 ) return ApplicationMetrics( total_requests=self.request_counts["total"], successful_requests=self.request_counts["successful"], failed_requests=self.request_counts["failed"], average_response_time=avg_response_time, total_executions=self.execution_counts["total"], successful_executions=self.execution_counts["successful"], failed_executions=self.execution_counts["failed"], average_execution_time=avg_execution_time, executions_by_language=dict(self.language_counts), unique_users=len(self.user_sessions), authenticated_requests=self.request_counts["authenticated"], public_requests=self.request_counts["public"], ) async def get_metrics(self) -> Dict[str, Any]: """Get comprehensive metrics.""" system_metrics = self.get_system_metrics() app_metrics = self.get_application_metrics() uptime = time.time() - self.start_time return { "timestamp": time.time(), "uptime_seconds": uptime, "system": { "cpu_percent": system_metrics.cpu_percent, "memory_percent": system_metrics.memory_percent, "memory_used_mb": system_metrics.memory_used_mb, "memory_available_mb": system_metrics.memory_available_mb, "disk_usage_percent": system_metrics.disk_usage_percent, "network_bytes_sent": system_metrics.network_bytes_sent, "network_bytes_recv": system_metrics.network_bytes_recv, "process_count": system_metrics.process_count, "load_average": system_metrics.load_average, }, "application": { "requests": { "total": app_metrics.total_requests, "successful": app_metrics.successful_requests, "failed": app_metrics.failed_requests, "success_rate": ( app_metrics.successful_requests / app_metrics.total_requests if app_metrics.total_requests > 0 else 0.0 ), "average_response_time": app_metrics.average_response_time, "requests_per_second": ( app_metrics.total_requests / uptime if uptime > 0 else 0.0 ), "authenticated_requests": app_metrics.authenticated_requests, "public_requests": app_metrics.public_requests, }, "executions": { "total": app_metrics.total_executions, "successful": app_metrics.successful_executions, "failed": app_metrics.failed_executions, "success_rate": ( app_metrics.successful_executions / app_metrics.total_executions if app_metrics.total_executions > 0 else 0.0 ), "average_execution_time": app_metrics.average_execution_time, "by_language": app_metrics.executions_by_language, }, "users": { "unique_users": app_metrics.unique_users, "authenticated_requests": app_metrics.authenticated_requests, "public_requests": app_metrics.public_requests, }, }, } class HealthChecker: """Comprehensive health checking for all system components.""" def __init__(self, neo4j_client): self.neo4j_client = neo4j_client self.monitoring_task = None self.health_history = deque(maxlen=100) self.is_monitoring = False async def check_database_health(self) -> Dict[str, Any]: """Check Neo4j database health.""" try: start_time = time.time() is_healthy = await self.neo4j_client.health_check() response_time = time.time() - start_time return { "status": "healthy" if is_healthy else "unhealthy", "response_time": response_time, "error": None, } except Exception as e: return { "status": "unhealthy", "response_time": None, "error": str(e), } async def check_system_health(self) -> Dict[str, Any]: """Check system resource health.""" try: # Check CPU usage cpu_percent = psutil.cpu_percent(interval=1) cpu_healthy = cpu_percent < 90.0 # Check memory usage memory = psutil.virtual_memory() memory_healthy = memory.percent < 90.0 # Check disk usage disk = psutil.disk_usage("/") disk_healthy = disk.percent < 90.0 overall_healthy = cpu_healthy and memory_healthy and disk_healthy return { "status": "healthy" if overall_healthy else "degraded", "cpu": { "percent": cpu_percent, "healthy": cpu_healthy, }, "memory": { "percent": memory.percent, "healthy": memory_healthy, }, "disk": { "percent": disk.percent, "healthy": disk_healthy, }, } except Exception as e: return { "status": "unhealthy", "error": str(e), } async def get_health_status(self) -> Dict[str, Any]: """Get comprehensive health status.""" # Check all components database_health = await self.check_database_health() system_health = await self.check_system_health() # Determine overall status components_healthy = ( database_health["status"] == "healthy" and system_health["status"] in ["healthy", "degraded"] ) overall_status = "healthy" if components_healthy else "unhealthy" health_data = { "status": overall_status, "timestamp": time.time(), "components": { "database": database_health, "system": system_health, }, } # Store in history self.health_history.append(health_data) return health_data async def start_monitoring(self, interval: int = 30) -> None: """Start continuous health monitoring.""" if self.is_monitoring: return self.is_monitoring = True async def monitor(): while self.is_monitoring: try: health_status = await self.get_health_status() if health_status["status"] != "healthy": logger.warning( "Health check failed", status=health_status["status"], components=health_status["components"], ) await asyncio.sleep(interval) except Exception as e: logger.error(f"Health monitoring error: {e}") await asyncio.sleep(interval) self.monitoring_task = asyncio.create_task(monitor()) logger.info("Health monitoring started") async def stop_monitoring(self) -> None: """Stop health monitoring.""" self.is_monitoring = False if self.monitoring_task: self.monitoring_task.cancel() try: await self.monitoring_task except asyncio.CancelledError: pass logger.info("Health monitoring stopped") def get_health_history(self) -> List[Dict[str, Any]]: """Get recent health check history.""" return list(self.health_history) __all__ = [ "MetricsCollector", "HealthChecker", "SystemMetrics", "ApplicationMetrics" ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Senpai-Sama7/Ultimate_MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server