Skip to main content
Glama

Security MCP Server

by nordeim
metrics.py30.2 kB
""" Metrics collection system for MCP server. Production-ready implementation with thread safety, memory management, and comprehensive observability. Features: - Thread-safe singleton pattern with proper cleanup - Safe Prometheus integration without private attribute access - Memory-efficient percentile calculations - Comprehensive edge case handling (NaN, Inf, negative values) - Multiple export formats (Prometheus, JSON, CSV) - Metric retention policies - Testing utilities Usage: # Basic usage from mcp_server.metrics import MetricsManager manager = MetricsManager.get() manager.record_tool_execution( tool_name="NmapTool", success=True, execution_time=2.5 ) # Get statistics stats = manager.get_all_stats() # Export for Prometheus metrics_text = manager.get_prometheus_metrics() # Cleanup await manager.cleanup() Testing: # Reset singleton for testing MetricsManager.reset_for_testing() # Create test instance manager = MetricsManager(max_tools=10) """ import time import logging import threading import math import json import csv import io import bisect from typing import Dict, Any, Optional, Set, List, Tuple from datetime import datetime, timedelta from dataclasses import dataclass, field, asdict from collections import deque from enum import Enum log = logging.getLogger(__name__) class MetricExportFormat(Enum): """Supported metric export formats.""" PROMETHEUS = "prometheus" JSON = "json" CSV = "csv" @dataclass class MetricSnapshot: """Immutable snapshot of metrics for comparison.""" timestamp: datetime system_stats: Dict[str, Any] tool_stats: Dict[str, Dict[str, Any]] def compare(self, other: 'MetricSnapshot') -> Dict[str, Any]: """Compare two snapshots and return differences.""" if not isinstance(other, MetricSnapshot): raise TypeError("Can only compare with another MetricSnapshot") return { "time_delta": (self.timestamp - other.timestamp).total_seconds(), "system_changes": self._diff_dicts(self.system_stats, other.system_stats), "tool_changes": { tool: self._diff_dicts(self.tool_stats.get(tool, {}), other.tool_stats.get(tool, {})) for tool in set(self.tool_stats.keys()) | set(other.tool_stats.keys()) } } @staticmethod def _diff_dicts(new: Dict, old: Dict) -> Dict: """Calculate difference between two dictionaries.""" diff = {} for key in set(new.keys()) | set(old.keys()): new_val = new.get(key, 0) old_val = old.get(key, 0) if isinstance(new_val, (int, float)) and isinstance(old_val, (int, float)): diff[key] = new_val - old_val elif new_val != old_val: diff[key] = {"old": old_val, "new": new_val} return diff class PrometheusRegistry: """Enhanced singleton registry with safe metric detection.""" _instance: Optional['PrometheusRegistry'] = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def initialize(self): """Initialize Prometheus metrics once with safe detection.""" if self._initialized: return with self._lock: if self._initialized: return try: from prometheus_client import Counter, Histogram, Gauge, REGISTRY, generate_latest self.registry = REGISTRY self.generate_latest = generate_latest # Safe metric existence check using collect() existing_metrics = self._get_existing_metrics_safe() # Create or reuse metrics self.execution_counter = self._get_or_create_counter( 'mcp_tool_execution_total', 'Total tool executions', ['tool', 'status', 'error_type'], existing_metrics ) self.execution_histogram = self._get_or_create_histogram( 'mcp_tool_execution_seconds', 'Tool execution time in seconds', ['tool'], existing_metrics ) self.active_gauge = self._get_or_create_gauge( 'mcp_tool_active', 'Currently active tool executions', ['tool'], existing_metrics ) self.error_counter = self._get_or_create_counter( 'mcp_tool_errors_total', 'Total tool errors', ['tool', 'error_type'], existing_metrics ) self._initialized = True self.available = True log.info("prometheus.initialized successfully metrics_count=%d", len(existing_metrics)) except ImportError: self.available = False self.generate_latest = None log.info("prometheus.not_available hint='pip install prometheus-client'") except Exception as e: self.available = False self.generate_latest = None log.error("prometheus.initialization_failed error=%s", str(e)) def _get_existing_metrics_safe(self) -> Set[str]: """Safely get existing metric names using public API.""" existing = set() try: # Use public collect() method instead of private attributes for family in self.registry.collect(): existing.add(family.name) except Exception as e: log.debug("metric_detection_failed error=%s", str(e)) return existing def _get_or_create_counter(self, name: str, doc: str, labels: List[str], existing: Set[str]): """Get existing counter or create new one.""" if name in existing: return self._find_collector_by_name(name) from prometheus_client import Counter return Counter(name, doc, labels, registry=self.registry) def _get_or_create_histogram(self, name: str, doc: str, labels: List[str], existing: Set[str]): """Get existing histogram or create new one.""" if name in existing: return self._find_collector_by_name(name) from prometheus_client import Histogram return Histogram(name, doc, labels, registry=self.registry) def _get_or_create_gauge(self, name: str, doc: str, labels: List[str], existing: Set[str]): """Get existing gauge or create new one.""" if name in existing: return self._find_collector_by_name(name) from prometheus_client import Gauge return Gauge(name, doc, labels, registry=self.registry) def _find_collector_by_name(self, name: str): """Find collector by name using public API.""" try: for family in self.registry.collect(): if family.name == name: # Return the collector associated with this family # This is a fallback; if metric exists, it should work return None # Let Prometheus handle it except Exception: pass return None # Initialize global registry _prometheus_registry = PrometheusRegistry() _prometheus_registry.initialize() def sanitize_metric_value(value: float, name: str = "value") -> float: """Sanitize metric value handling NaN, Inf, and negative values.""" if value is None: log.warning("metrics.null_value name=%s defaulting_to_zero", name) return 0.0 try: value = float(value) except (TypeError, ValueError) as e: log.warning("metrics.invalid_value name=%s value=%s error=%s defaulting_to_zero", name, value, str(e)) return 0.0 if math.isnan(value): log.warning("metrics.nan_value name=%s defaulting_to_zero", name) return 0.0 if math.isinf(value): log.warning("metrics.infinite_value name=%s defaulting_to_zero", name) return 0.0 # Execution times should be non-negative if value < 0: log.warning("metrics.negative_value name=%s value=%f using_absolute", name, value) return abs(value) return value @dataclass class ToolExecutionMetrics: """Thread-safe tool execution metrics with comprehensive edge case handling.""" tool_name: str _lock: threading.RLock = field(default_factory=threading.RLock, init=False, repr=False) execution_count: int = 0 success_count: int = 0 failure_count: int = 0 timeout_count: int = 0 error_count: int = 0 total_execution_time: float = 0.0 min_execution_time: float = float('inf') max_execution_time: float = 0.0 last_execution_time: Optional[datetime] = None recent_executions: deque = field(default_factory=lambda: deque(maxlen=100)) _sorted_times: List[float] = field(default_factory=list, init=False, repr=False) _needs_sort: bool = field(default=False, init=False, repr=False) def record_execution(self, success: bool, execution_time: float, timed_out: bool = False, error_type: Optional[str] = None): """Thread-safe execution recording with validation.""" with self._lock: # Sanitize execution_time execution_time = sanitize_metric_value(execution_time, "execution_time") self.execution_count += 1 self.total_execution_time += execution_time # Handle min/max with infinity edge case if self.min_execution_time == float('inf') or execution_time < self.min_execution_time: self.min_execution_time = execution_time if execution_time > self.max_execution_time: self.max_execution_time = execution_time self.last_execution_time = datetime.now() if success: self.success_count += 1 else: self.failure_count += 1 if error_type: self.error_count += 1 if timed_out: self.timeout_count += 1 self.recent_executions.append({ "timestamp": datetime.now(), "success": success, "execution_time": execution_time, "timed_out": timed_out, "error_type": error_type }) # Add to sorted times for percentile calculation bisect.insort(self._sorted_times, execution_time) if len(self._sorted_times) > 1000: # Keep last 1000 for percentiles self._sorted_times = self._sorted_times[-1000:] def _calculate_percentile(self, percentile: float) -> float: """Calculate percentile efficiently using sorted list.""" if not self._sorted_times: return 0.0 if percentile < 0 or percentile > 100: log.warning("metrics.invalid_percentile value=%f clamping", percentile) percentile = max(0.0, min(100.0, percentile)) index = int(len(self._sorted_times) * (percentile / 100.0)) index = max(0, min(index, len(self._sorted_times) - 1)) return self._sorted_times[index] def get_stats(self) -> Dict[str, Any]: """Get thread-safe statistics snapshot with proper edge case handling.""" with self._lock: if self.execution_count == 0: return { "tool_name": self.tool_name, "execution_count": 0, "success_rate": 0.0, "average_execution_time": 0.0, "min_execution_time": 0.0, "max_execution_time": 0.0, "p50_execution_time": 0.0, "p95_execution_time": 0.0, "p99_execution_time": 0.0, } # Use sorted times for accurate percentiles p50 = self._calculate_percentile(50) p95 = self._calculate_percentile(95) p99 = self._calculate_percentile(99) avg_execution_time = self.total_execution_time / self.execution_count success_rate = (self.success_count / self.execution_count) * 100 min_time = 0.0 if self.min_execution_time == float('inf') else self.min_execution_time return { "tool_name": self.tool_name, "execution_count": self.execution_count, "success_count": self.success_count, "failure_count": self.failure_count, "error_count": self.error_count, "timeout_count": self.timeout_count, "success_rate": round(success_rate, 2), "average_execution_time": round(avg_execution_time, 4), "min_execution_time": round(min_time, 4), "max_execution_time": round(self.max_execution_time, 4), "p50_execution_time": round(p50, 4), "p95_execution_time": round(p95, 4), "p99_execution_time": round(p99, 4), "last_execution_time": self.last_execution_time.isoformat() if self.last_execution_time else None, "recent_failure_rate": self._calculate_recent_failure_rate(), } def _calculate_recent_failure_rate(self) -> float: """Calculate failure rate from recent executions.""" if not self.recent_executions: return 0.0 recent_failures = sum( 1 for e in self.recent_executions if not e["success"] ) return round((recent_failures / len(self.recent_executions)) * 100, 2) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return self.get_stats() class SystemMetrics: """System-wide metrics tracking with thread safety.""" def __init__(self): self.start_time = datetime.now() self.request_count = 0 self.error_count = 0 self.active_connections = 0 self._lock = threading.Lock() def increment_request_count(self): """Thread-safe request count increment.""" with self._lock: self.request_count += 1 def increment_error_count(self): """Thread-safe error count increment.""" with self._lock: self.error_count += 1 def increment_active_connections(self): """Thread-safe active connections increment.""" with self._lock: self.active_connections += 1 def decrement_active_connections(self): """Thread-safe active connections decrement.""" with self._lock: self.active_connections = max(0, self.active_connections - 1) def get_uptime(self) -> float: """Get system uptime in seconds.""" return (datetime.now() - self.start_time).total_seconds() def get_stats(self) -> Dict[str, Any]: """Get system statistics.""" with self._lock: uptime = self.get_uptime() error_rate = (self.error_count / self.request_count * 100) if self.request_count > 0 else 0 return { "uptime_seconds": round(uptime, 2), "request_count": self.request_count, "error_count": self.error_count, "error_rate": round(error_rate, 2), "active_connections": self.active_connections, "start_time": self.start_time.isoformat() } def reset(self): """Reset counters (for testing).""" with self._lock: self.request_count = 0 self.error_count = 0 self.active_connections = 0 class ToolMetrics: """Per-tool metrics wrapper with Prometheus integration.""" def __init__(self, tool_name: str): self.tool_name = tool_name self.metrics = ToolExecutionMetrics(tool_name) self._active_count = 0 self._lock = threading.Lock() self._prom_available = _prometheus_registry.available def record_execution(self, success: bool, execution_time: float, timed_out: bool = False, error_type: Optional[str] = None): """Record execution with Prometheus metrics.""" # Validate and sanitize inputs execution_time = sanitize_metric_value(execution_time, f"{self.tool_name}.execution_time") self.metrics.record_execution(success, execution_time, timed_out, error_type) # Record Prometheus metrics (cached availability check) if self._prom_available: self._record_prometheus_metrics(success, execution_time, error_type) def _record_prometheus_metrics(self, success: bool, execution_time: float, error_type: Optional[str]): """Record Prometheus metrics with error handling.""" try: status = 'success' if success else 'failure' error_type = error_type or 'none' if _prometheus_registry.execution_counter: _prometheus_registry.execution_counter.labels( tool=self.tool_name, status=status, error_type=error_type ).inc() if _prometheus_registry.execution_histogram: _prometheus_registry.execution_histogram.labels( tool=self.tool_name ).observe(execution_time) if not success and _prometheus_registry.error_counter: _prometheus_registry.error_counter.labels( tool=self.tool_name, error_type=error_type ).inc() except Exception as e: log.debug("prometheus.record_failed tool=%s error=%s", self.tool_name, str(e)) # Disable Prometheus for this instance if it keeps failing self._prom_available = False def increment_active(self): """Increment active execution count.""" with self._lock: self._active_count += 1 if self._prom_available and _prometheus_registry.active_gauge: try: _prometheus_registry.active_gauge.labels(tool=self.tool_name).inc() except Exception: self._prom_available = False def decrement_active(self): """Decrement active execution count.""" with self._lock: self._active_count = max(0, self._active_count - 1) if self._prom_available and _prometheus_registry.active_gauge: try: _prometheus_registry.active_gauge.labels(tool=self.tool_name).dec() except Exception: self._prom_available = False def get_active_count(self) -> int: """Get current active execution count.""" with self._lock: return self._active_count def get_stats(self) -> Dict[str, Any]: """Get tool statistics.""" stats = self.metrics.get_stats() stats['active_executions'] = self.get_active_count() return stats class MetricsManager: """ Enhanced metrics manager with memory management and comprehensive features. Singleton pattern with thread safety and testing support. """ _instance: Optional['MetricsManager'] = None _lock = threading.Lock() def __new__(cls, max_tools: int = 1000): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, max_tools: int = 1000): if self._initialized: return with self._lock: if self._initialized: return self.tool_metrics: Dict[str, ToolMetrics] = {} self.system_metrics = SystemMetrics() self.max_tools = max(10, min(max_tools, 10000)) # Clamp between 10 and 10000 self._metrics_lock = threading.Lock() self._last_cleanup = time.time() self._cleanup_interval = 3600 # 1 hour self.start_time = datetime.now() self._snapshots: deque = deque(maxlen=100) self._initialized = True log.info("metrics_manager.initialized max_tools=%d", self.max_tools) @classmethod def get(cls, max_tools: int = 1000) -> 'MetricsManager': """Get singleton instance.""" if cls._instance is None: cls._instance = cls(max_tools) return cls._instance @classmethod def reset_for_testing(cls): """Reset singleton instance (for testing only).""" with cls._lock: if cls._instance: cls._instance._initialized = False cls._instance = None log.debug("metrics_manager.reset_for_testing") def reset(self): """Reset all metrics (for testing).""" with self._metrics_lock: self.tool_metrics.clear() self.system_metrics.reset() self._last_cleanup = time.time() self._snapshots.clear() log.debug("metrics_manager.reset") def get_tool_metrics(self, tool_name: str) -> ToolMetrics: """Get or create tool metrics with cleanup.""" with self._metrics_lock: # Periodic cleanup if time.time() - self._last_cleanup > self._cleanup_interval: self._cleanup_old_metrics() if tool_name not in self.tool_metrics: # Evict if at capacity if len(self.tool_metrics) >= self.max_tools: self._evict_oldest_metrics() self.tool_metrics[tool_name] = ToolMetrics(tool_name) log.debug("metrics.tool_created name=%s total_tools=%d", tool_name, len(self.tool_metrics)) return self.tool_metrics[tool_name] def record_tool_execution(self, tool_name: str, success: bool = True, execution_time: float = 0.0, status: Optional[str] = None, timed_out: bool = False, error_type: Optional[str] = None, duration_seconds: Optional[float] = None): """ Record tool execution metrics with multiple parameter formats for compatibility. Args: tool_name: Name of the tool success: Whether execution was successful execution_time: Execution time in seconds status: Status string ('success' or 'failure') timed_out: Whether execution timed out error_type: Type of error if failed duration_seconds: Alternative to execution_time """ # Handle different parameter names for compatibility if duration_seconds is not None: execution_time = duration_seconds # Determine success from status if provided if status is not None: success = (status == 'success') tool_metrics = self.get_tool_metrics(tool_name) tool_metrics.record_execution(success, execution_time, timed_out, error_type) self.system_metrics.increment_request_count() if not success: self.system_metrics.increment_error_count() def get_tool_stats(self, tool_name: str) -> Dict[str, Any]: """Get statistics for a specific tool.""" if tool_name in self.tool_metrics: return self.tool_metrics[tool_name].get_stats() return { "tool_name": tool_name, "execution_count": 0, "message": "No metrics available for this tool" } def get_system_stats(self) -> Dict[str, Any]: """Get system-wide statistics.""" return self.system_metrics.get_stats() def _cleanup_old_metrics(self): """Remove metrics for tools not used recently.""" cutoff_time = datetime.now() - timedelta(hours=24) to_remove = [] for name, metrics in self.tool_metrics.items(): last_time = metrics.metrics.last_execution_time if last_time and last_time < cutoff_time: to_remove.append(name) for name in to_remove: del self.tool_metrics[name] if to_remove: log.info("metrics.cleanup removed=%d tools remaining=%d", len(to_remove), len(self.tool_metrics)) self._last_cleanup = time.time() def _evict_oldest_metrics(self): """Evict least recently used metrics.""" if not self.tool_metrics: return oldest_name = None oldest_time = datetime.now() for name, metrics in self.tool_metrics.items(): last_time = metrics.metrics.last_execution_time if last_time and last_time < oldest_time: oldest_time = last_time oldest_name = name if oldest_name: del self.tool_metrics[oldest_name] log.info("metrics.evicted tool=%s remaining=%d", oldest_name, len(self.tool_metrics)) def get_all_stats(self) -> Dict[str, Any]: """Get all metrics statistics.""" return { "system": self.system_metrics.get_stats(), "tools": {name: metrics.get_stats() for name, metrics in self.tool_metrics.items()}, "prometheus_available": _prometheus_registry.available, "collection_start_time": self.start_time.isoformat(), "total_tools_tracked": len(self.tool_metrics), "max_tools": self.max_tools } def get_prometheus_metrics(self) -> Optional[str]: """Get Prometheus metrics in text format.""" if _prometheus_registry.available and _prometheus_registry.generate_latest: try: return _prometheus_registry.generate_latest(_prometheus_registry.registry).decode('utf-8') except Exception as e: log.error("prometheus.generate_metrics_error error=%s", str(e)) return None return None def create_snapshot(self) -> MetricSnapshot: """Create immutable snapshot of current metrics.""" snapshot = MetricSnapshot( timestamp=datetime.now(), system_stats=self.get_system_stats(), tool_stats={name: metrics.get_stats() for name, metrics in self.tool_metrics.items()} ) self._snapshots.append(snapshot) return snapshot def get_snapshots(self, count: int = 10) -> List[MetricSnapshot]: """Get recent snapshots.""" count = max(1, min(count, 100)) return list(self._snapshots)[-count:] def export_json(self, pretty: bool = True) -> str: """Export metrics as JSON.""" data = self.get_all_stats() if pretty: return json.dumps(data, indent=2, default=str) return json.dumps(data, default=str) def export_csv(self) -> str: """Export tool metrics as CSV.""" output = io.StringIO() writer = csv.writer(output) # Write header writer.writerow([ 'tool_name', 'execution_count', 'success_count', 'failure_count', 'success_rate', 'avg_time', 'min_time', 'max_time', 'p50', 'p95', 'p99', 'timeout_count', 'error_count' ]) # Write data for name, metrics in self.tool_metrics.items(): stats = metrics.get_stats() writer.writerow([ stats['tool_name'], stats['execution_count'], stats['success_count'], stats['failure_count'], stats['success_rate'], stats['average_execution_time'], stats['min_execution_time'], stats['max_execution_time'], stats['p50_execution_time'], stats['p95_execution_time'], stats['p99_execution_time'], stats['timeout_count'], stats['error_count'] ]) return output.getvalue() async def cleanup(self): """Async cleanup method for proper resource management.""" log.info("metrics_manager.cleanup_started") # Clear all metrics with self._metrics_lock: self.tool_metrics.clear() self._snapshots.clear() log.info("metrics_manager.cleanup_completed") class PrometheusMetrics: """Legacy compatibility class for backward compatibility.""" def __init__(self): self.registry = _prometheus_registry.registry if _prometheus_registry.available else None def get_metrics(self) -> Optional[str]: """Get Prometheus metrics.""" manager = MetricsManager.get() return manager.get_prometheus_metrics()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nordeim/Security-MCP-Server-v3'

If you have feedback or need assistance with the MCP directory API, please join our Discord server