Skip to main content
Glama

MCP Git Server

by MementoRC
server_metrics.py17.4 kB
""" Server metrics and monitoring service for MCP Git server. This module provides comprehensive metrics collection and monitoring capabilities for the MCP Git server, including performance tracking, usage statistics, and health monitoring. """ import asyncio import json import logging from collections import defaultdict, deque from dataclasses import dataclass, field from datetime import datetime, timedelta from threading import Lock from typing import Any from mcp_server_git.protocols.debugging_protocol import ( DebuggableComponent, ) @dataclass class ComponentState: """Component state information for debugging.""" name: str status: str timestamp: datetime metadata: dict[str, Any] = field(default_factory=dict) details: dict[str, Any] = field(default_factory=dict) @dataclass class ValidationResult: """Component validation result.""" is_valid: bool issues: list[str] = field(default_factory=list) warnings: list[str] = field(default_factory=list) component_name: str = "" @dataclass class MetricPoint: """A single metric data point.""" name: str value: float timestamp: datetime labels: dict[str, str] = field(default_factory=dict) def to_dict(self) -> dict[str, Any]: """Convert to dictionary representation.""" return { "name": self.name, "value": self.value, "timestamp": self.timestamp.isoformat(), "labels": self.labels, } @dataclass class PerformanceMetrics: """Performance tracking metrics.""" operation_count: int = 0 total_duration: float = 0.0 min_duration: float = float("inf") max_duration: float = 0.0 error_count: int = 0 last_operation_time: datetime | None = None @property def average_duration(self) -> float: """Calculate average operation duration.""" return ( self.total_duration / self.operation_count if self.operation_count > 0 else 0.0 ) @property def success_rate(self) -> float: """Calculate operation success rate.""" return ( (self.operation_count - self.error_count) / self.operation_count if self.operation_count > 0 else 1.0 ) def to_dict(self) -> dict[str, Any]: """Convert to dictionary representation.""" return { "operation_count": self.operation_count, "total_duration": self.total_duration, "min_duration": self.min_duration if self.min_duration != float("inf") else 0.0, "max_duration": self.max_duration, "average_duration": self.average_duration, "error_count": self.error_count, "success_rate": self.success_rate, "last_operation_time": ( self.last_operation_time.isoformat() if self.last_operation_time else None ), } @dataclass class SystemHealthMetrics: """System health monitoring metrics.""" cpu_usage: float = 0.0 memory_usage: float = 0.0 disk_usage: float = 0.0 active_connections: int = 0 request_queue_size: int = 0 last_health_check: datetime | None = None def to_dict(self) -> dict[str, Any]: """Convert to dictionary representation.""" return { "cpu_usage": self.cpu_usage, "memory_usage": self.memory_usage, "disk_usage": self.disk_usage, "active_connections": self.active_connections, "request_queue_size": self.request_queue_size, "last_health_check": ( self.last_health_check.isoformat() if self.last_health_check else None ), } class MetricsService(DebuggableComponent): """ Comprehensive metrics collection and monitoring service. Provides performance tracking, usage statistics, and health monitoring for the MCP Git server with thread-safe operations and configurable retention policies. """ def __init__( self, max_metric_history: int = 10000, health_check_interval: float = 60.0, enable_system_metrics: bool = True, ): """ Initialize the metrics service. Args: max_metric_history: Maximum number of metric points to retain health_check_interval: Interval for system health checks in seconds enable_system_metrics: Whether to collect system-level metrics """ self._max_metric_history = max_metric_history self._health_check_interval = health_check_interval self._enable_system_metrics = enable_system_metrics # Thread-safe metric storage self._metrics_lock = Lock() self._metric_points: deque[MetricPoint] = deque(maxlen=max_metric_history) self._performance_metrics: dict[str, PerformanceMetrics] = defaultdict( PerformanceMetrics ) self._system_health = SystemHealthMetrics() # Service state self._is_running = False self._health_check_task: asyncio.Task | None = None self._start_time = datetime.now() # Logger self._logger = logging.getLogger(__name__) async def start(self) -> None: """Start the metrics service.""" if self._is_running: self._logger.warning("Metrics service already running") return self._is_running = True self._start_time = datetime.now() # Start health monitoring if system metrics enabled if self._enable_system_metrics: self._health_check_task = asyncio.create_task(self._health_check_loop()) self._logger.info("Metrics service started") async def stop(self) -> None: """Stop the metrics service.""" if not self._is_running: return self._is_running = False # Cancel health check task if self._health_check_task: self._health_check_task.cancel() try: await self._health_check_task except asyncio.CancelledError: pass self._logger.info("Metrics service stopped") def record_metric( self, name: str, value: float, labels: dict[str, str] | None = None, ) -> None: """ Record a custom metric point. Args: name: Metric name value: Metric value labels: Optional metric labels """ metric_point = MetricPoint( name=name, value=value, timestamp=datetime.now(), labels=labels or {}, ) with self._metrics_lock: self._metric_points.append(metric_point) def record_operation( self, operation_name: str, duration: float, success: bool = True, ) -> None: """ Record an operation performance metric. Args: operation_name: Name of the operation duration: Operation duration in seconds success: Whether the operation succeeded """ with self._metrics_lock: metrics = self._performance_metrics[operation_name] metrics.operation_count += 1 metrics.total_duration += duration metrics.min_duration = min(metrics.min_duration, duration) metrics.max_duration = max(metrics.max_duration, duration) metrics.last_operation_time = datetime.now() if not success: metrics.error_count += 1 # Record as metric point self.record_metric( f"operation.{operation_name}.duration", duration, {"success": str(success)}, ) def get_metrics_summary(self) -> dict[str, Any]: """Get a summary of all metrics.""" with self._metrics_lock: # Performance metrics summary perf_summary = {} for operation, metrics in self._performance_metrics.items(): perf_summary[operation] = metrics.to_dict() # Recent metric points (last 100) recent_metrics = [] for metric in list(self._metric_points)[-100:]: recent_metrics.append(metric.to_dict()) return { "service_uptime": (datetime.now() - self._start_time).total_seconds(), "total_metric_points": len(self._metric_points), "performance_metrics": perf_summary, "system_health": self._system_health.to_dict(), "recent_metrics": recent_metrics, "is_running": self._is_running, } def get_operation_metrics(self, operation_name: str) -> dict[str, Any] | None: """Get metrics for a specific operation.""" with self._metrics_lock: if operation_name in self._performance_metrics: return self._performance_metrics[operation_name].to_dict() return None def clear_metrics(self) -> None: """Clear all stored metrics.""" with self._metrics_lock: self._metric_points.clear() self._performance_metrics.clear() self._system_health = SystemHealthMetrics() self._logger.info("All metrics cleared") async def _health_check_loop(self) -> None: """Background health monitoring loop.""" while self._is_running: try: await self._collect_system_health() await asyncio.sleep(self._health_check_interval) except asyncio.CancelledError: break except Exception as e: self._logger.error(f"Health check failed: {e}") await asyncio.sleep(self._health_check_interval) async def _collect_system_health(self) -> None: """Collect system health metrics.""" try: import psutil # CPU and memory usage self._system_health.cpu_usage = psutil.cpu_percent(interval=1) self._system_health.memory_usage = psutil.virtual_memory().percent self._system_health.disk_usage = psutil.disk_usage("/").percent self._system_health.last_health_check = datetime.now() # Record as metric points self.record_metric("system.cpu_usage", self._system_health.cpu_usage) self.record_metric("system.memory_usage", self._system_health.memory_usage) self.record_metric("system.disk_usage", self._system_health.disk_usage) except ImportError: # psutil not available, use basic metrics self._system_health.last_health_check = datetime.now() except Exception as e: self._logger.error(f"Failed to collect system health: {e}") # DebuggableComponent implementation def get_component_state(self) -> ComponentState: """Get the current component state.""" with self._metrics_lock: status = "HEALTHY" if self._is_running else "STOPPED" return ComponentState( name="MetricsService", status=status, timestamp=datetime.now(), metadata={ "is_running": self._is_running, "uptime_seconds": ( datetime.now() - self._start_time ).total_seconds(), "total_metrics": len(self._metric_points), "tracked_operations": len(self._performance_metrics), "system_metrics_enabled": self._enable_system_metrics, }, details={ "performance_metrics": { op: metrics.to_dict() for op, metrics in self._performance_metrics.items() }, "system_health": self._system_health.to_dict(), }, ) def validate_component(self) -> ValidationResult: """Validate component state and configuration.""" issues = [] # Check if service is running if not self._is_running: issues.append("Metrics service is not running") # Check metric collection health with self._metrics_lock: if len(self._metric_points) >= self._max_metric_history * 0.9: issues.append("Metric storage approaching capacity") # Check system health if enabled if self._enable_system_metrics: if self._system_health.last_health_check: time_since_check = ( datetime.now() - self._system_health.last_health_check ) if time_since_check > timedelta( seconds=self._health_check_interval * 2 ): issues.append("System health checks are stale") return ValidationResult( is_valid=len(issues) == 0, issues=issues, warnings=[], component_name="MetricsService", ) def get_debug_info(self, debug_level: str = "INFO") -> dict[str, Any]: """Get debug information about the metrics service.""" debug_info = { "component_type": "MetricsService", "is_running": self._is_running, "configuration": { "max_metric_history": self._max_metric_history, "health_check_interval": self._health_check_interval, "enable_system_metrics": self._enable_system_metrics, }, } if debug_level in ["DEBUG", "detailed"]: debug_info.update(self.get_metrics_summary()) return debug_info def inspect_state(self, path: str | None = None) -> dict[str, Any]: """Inspect specific parts of the component state.""" full_state = { "service_config": { "max_metric_history": self._max_metric_history, "health_check_interval": self._health_check_interval, "enable_system_metrics": self._enable_system_metrics, }, "runtime_state": { "is_running": self._is_running, "start_time": self._start_time.isoformat(), "uptime_seconds": (datetime.now() - self._start_time).total_seconds(), }, "metrics_data": self.get_metrics_summary(), } if path is None: return full_state # Simple path navigation (e.g., "service_config.max_metric_history") parts = path.split(".") current = full_state for part in parts: if isinstance(current, dict) and part in current: current = current[part] else: return {} return {path: current} if not isinstance(current, dict) else current def get_component_dependencies(self) -> list[str]: """Get list of component dependencies.""" # MetricsService has minimal dependencies dependencies = [] if self._enable_system_metrics: dependencies.append("psutil") # Optional system dependency return dependencies def export_state_json(self) -> str: """Export component state as JSON for external analysis.""" state = { "component_type": "MetricsService", "timestamp": datetime.now().isoformat(), "state": self.inspect_state(), "health": self.health_check(), } return json.dumps(state, indent=2, default=str) def health_check(self) -> dict[str, bool | str | int | float]: """Perform a health check on the component.""" current_time = datetime.now() uptime = (current_time - self._start_time).total_seconds() # Check for any health issues healthy = True status = "healthy" error_count = 0 last_error = None if not self._is_running: healthy = False status = "stopped" # Check metric storage capacity with self._metrics_lock: storage_usage = len(self._metric_points) / self._max_metric_history if storage_usage > 0.9: healthy = False status = "storage_full" error_count += 1 last_error = f"Metric storage {storage_usage:.1%} full" # Check health check staleness if self._enable_system_metrics and self._system_health.last_health_check: time_since_check = current_time - self._system_health.last_health_check if time_since_check > timedelta(seconds=self._health_check_interval * 2): healthy = False status = "stale_health_checks" error_count += 1 last_error = ( f"Health checks stale by {time_since_check.total_seconds():.1f}s" ) return { "healthy": healthy, "status": status, "uptime": uptime, "last_error": last_error, "error_count": error_count, "storage_usage": storage_usage if "storage_usage" in locals() else 0.0, "system_metrics_enabled": self._enable_system_metrics, }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MementoRC/mcp-git'

If you have feedback or need assistance with the MCP directory API, please join our Discord server