Skip to main content
Glama

NetBox Read/Write MCP Server

monitoring.py36.3 kB
""" NetBox MCP Performance Monitoring and Metrics System. This module provides comprehensive performance monitoring, metrics collection, and health checking for the NetBox MCP server. It tracks operation times, cache performance, system resources, and provides dashboard functionality. """ import asyncio import json import logging import psutil import threading import time from collections import defaultdict, deque from contextlib import contextmanager from dataclasses import dataclass, asdict, field from datetime import datetime, timedelta from typing import Dict, Any, List, Optional, Callable, Union import csv import io logger = logging.getLogger(__name__) @dataclass class OperationMetrics: """Metrics for a single operation execution.""" operation_name: str duration: float success: bool timestamp: datetime error_details: Optional[Dict[str, Any]] = None parameters: Optional[Dict[str, Any]] = None cache_hit: Optional[bool] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return asdict(self) @dataclass class CacheMetrics: """Cache performance metrics.""" hits: int misses: int hit_ratio: float cache_size_mb: float evictions: int timestamp: datetime @property def total_requests(self) -> int: """Total cache requests.""" return self.hits + self.misses @classmethod def calculate(cls, hits: int, misses: int, cache_size_bytes: int, evictions: int) -> 'CacheMetrics': """Calculate cache metrics from raw data.""" total_requests = hits + misses hit_ratio = hits / total_requests if total_requests > 0 else 0.0 cache_size_mb = cache_size_bytes / (1024 * 1024) return cls( hits=hits, misses=misses, hit_ratio=hit_ratio, cache_size_mb=cache_size_mb, evictions=evictions, timestamp=datetime.now() ) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return asdict(self) @dataclass class SystemMetrics: """System resource metrics.""" cpu_usage: float memory_usage: float # MB memory_available: float # MB active_connections: int request_queue_size: int timestamp: datetime disk_usage: Optional[float] = None # MB network_io: Optional[Dict[str, float]] = None @property def memory_usage_percentage(self) -> float: """Calculate memory usage percentage.""" total_memory = self.memory_usage + self.memory_available return (self.memory_usage / total_memory) * 100 if total_memory > 0 else 0.0 @classmethod def collect(cls) -> 'SystemMetrics': """Collect current system metrics.""" try: cpu_usage = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory() # Convert bytes to MB memory_usage = memory.used / (1024 * 1024) memory_available = memory.available / (1024 * 1024) # Get network connections count (approximation) try: active_connections = len(psutil.net_connections()) except (psutil.AccessDenied, psutil.NoSuchProcess): active_connections = 0 return cls( cpu_usage=cpu_usage, memory_usage=memory_usage, memory_available=memory_available, active_connections=active_connections, request_queue_size=0, # Would need framework integration timestamp=datetime.now() ) except Exception as e: logger.warning(f"Failed to collect system metrics: {e}") return cls( cpu_usage=0.0, memory_usage=0.0, memory_available=0.0, active_connections=0, request_queue_size=0, timestamp=datetime.now() ) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return asdict(self) class PerformanceMonitor: """Main performance monitoring class.""" def __init__(self, max_history_size: int = 1000): """ Initialize performance monitor. Args: max_history_size: Maximum number of metrics to keep in memory """ self.max_history_size = max_history_size self.enabled = True self._lock = threading.RLock() # Metrics storage self._metrics_history: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_history_size)) self._operation_stats: Dict[str, Dict[str, Any]] = defaultdict(dict) logger.info(f"Performance monitor initialized with max_history_size={max_history_size}") @contextmanager def time_operation(self, operation_name: str, parameters: Optional[Dict[str, Any]] = None): """ Context manager for timing operations. Args: operation_name: Name of the operation being timed parameters: Optional parameters passed to the operation Example: with monitor.time_operation("netbox_create_device", {"name": "server-01"}): # Operation code here pass """ if not self.enabled: yield return start_time = time.time() success = True error_details = None try: yield except Exception as e: success = False error_details = { "error_type": type(e).__name__, "message": str(e) } raise finally: duration = time.time() - start_time metrics = OperationMetrics( operation_name=operation_name, duration=duration, success=success, timestamp=datetime.now(), error_details=error_details, parameters=parameters ) self._record_operation_metrics(metrics) def _record_operation_metrics(self, metrics: OperationMetrics): """Record operation metrics.""" with self._lock: self._metrics_history[metrics.operation_name].append(metrics) self._update_operation_stats(metrics) def _update_operation_stats(self, metrics: OperationMetrics): """Update aggregated operation statistics.""" op_name = metrics.operation_name stats = self._operation_stats[op_name] # Initialize stats if first operation if not stats: stats.update({ "total_operations": 0, "successful_operations": 0, "total_duration": 0.0, "min_duration": float('inf'), "max_duration": 0.0, "last_execution": None }) # Update stats stats["total_operations"] += 1 stats["total_duration"] += metrics.duration stats["min_duration"] = min(stats["min_duration"], metrics.duration) stats["max_duration"] = max(stats["max_duration"], metrics.duration) stats["last_execution"] = metrics.timestamp if metrics.success: stats["successful_operations"] += 1 def get_operation_history(self, operation_name: str) -> List[OperationMetrics]: """Get operation history for a specific operation.""" with self._lock: return list(self._metrics_history[operation_name]) def get_operation_statistics(self, operation_name: str) -> Dict[str, Any]: """Get aggregated statistics for an operation.""" with self._lock: stats = self._operation_stats[operation_name].copy() if stats.get("total_operations", 0) > 0: stats["success_rate"] = stats["successful_operations"] / stats["total_operations"] stats["average_duration"] = stats["total_duration"] / stats["total_operations"] # Convert min_duration from inf if no operations if stats["min_duration"] == float('inf'): stats["min_duration"] = 0.0 else: stats.update({ "success_rate": 0.0, "average_duration": 0.0, "min_duration": 0.0, "max_duration": 0.0 }) return stats def get_all_operations_summary(self) -> Dict[str, Dict[str, Any]]: """Get summary statistics for all operations.""" summary = {} for operation_name in self._operation_stats: summary[operation_name] = self.get_operation_statistics(operation_name) return summary def record_cache_metrics(self, metrics: CacheMetrics): """Record cache performance metrics.""" if not self.enabled: return with self._lock: self._metrics_history["cache_metrics"].append(metrics) def record_system_metrics(self, metrics: SystemMetrics): """Record system performance metrics.""" if not self.enabled: return with self._lock: self._metrics_history["system_metrics"].append(metrics) def get_latest_cache_metrics(self) -> Optional[CacheMetrics]: """Get the latest cache metrics.""" with self._lock: cache_history = self._metrics_history["cache_metrics"] return cache_history[-1] if cache_history else None def get_latest_system_metrics(self) -> Optional[SystemMetrics]: """Get the latest system metrics.""" with self._lock: system_history = self._metrics_history["system_metrics"] return system_history[-1] if system_history else None def clear_history(self): """Clear all metrics history.""" with self._lock: self._metrics_history.clear() self._operation_stats.clear() logger.info("Performance monitor history cleared") class MetricsCollector: """Automatic metrics collector.""" def __init__(self, performance_monitor: Optional[PerformanceMonitor] = None): """ Initialize metrics collector. Args: performance_monitor: Performance monitor instance """ self.performance_monitor = performance_monitor or PerformanceMonitor() self._collection_interval = 60 # seconds self._collection_task: Optional[asyncio.Task] = None self._stop_event = asyncio.Event() self.is_running = False logger.info(f"Metrics collector initialized with interval={self._collection_interval}s") def start_collection(self): """Start automatic metrics collection.""" if self.is_running: logger.warning("Metrics collection already running") return self.is_running = True self._stop_event.clear() # Start collection in background try: loop = asyncio.get_event_loop() if loop.is_running(): self._collection_task = loop.create_task(self._collection_loop()) else: # This should be started from an async context where the loop is already running # For example, in a FastAPI startup event or ASGI server startup raise RuntimeError("MetricsCollector must be started from a running event loop. " "Start the collector from an async context like a FastAPI startup event.") except RuntimeError as e: logger.error(f"Failed to start metrics collection: {e}") self.is_running = False raise logger.info("Metrics collection started") def stop_collection(self): """Stop automatic metrics collection.""" if not self.is_running: return self.is_running = False self._stop_event.set() if self._collection_task: self._collection_task.cancel() logger.info("Metrics collection stopped") async def _collection_loop(self): """Background collection loop.""" while not self._stop_event.is_set(): try: self.collect_metrics() await asyncio.sleep(self._collection_interval) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in metrics collection loop: {e}") await asyncio.sleep(5) # Brief pause before retry def collect_metrics(self): """Collect current metrics.""" start_time = time.time() try: # Collect system metrics system_metrics = SystemMetrics.collect() self.performance_monitor.record_system_metrics(system_metrics) # Collect cache metrics (would integrate with actual cache) # This is a placeholder - would get real cache stats cache_metrics = CacheMetrics( hits=0, misses=0, hit_ratio=0.0, cache_size_mb=0.0, evictions=0, timestamp=datetime.now() ) self.performance_monitor.record_cache_metrics(cache_metrics) collection_duration = time.time() - start_time logger.debug(f"Metrics collection completed in {collection_duration:.3f}s") except Exception as e: logger.error(f"Failed to collect metrics: {e}") def get_metrics_summary(self) -> Dict[str, Any]: """Get summary of collected metrics.""" summary = { "timestamp": datetime.now().isoformat(), "collection_duration": 0.0, "operation_count": len(self.performance_monitor._operation_stats), "system_metrics": None, "cache_metrics": None } # Add latest metrics latest_system = self.performance_monitor.get_latest_system_metrics() if latest_system: summary["system_metrics"] = latest_system.to_dict() latest_cache = self.performance_monitor.get_latest_cache_metrics() if latest_cache: summary["cache_metrics"] = latest_cache.to_dict() return summary def export_metrics(self) -> Dict[str, Any]: """Export all collected metrics.""" export_data = { "export_timestamp": datetime.now().isoformat(), "system_metrics": [], "cache_metrics": [], "operation_metrics": {} } # Export system metrics system_history = self.performance_monitor._metrics_history["system_metrics"] export_data["system_metrics"] = [m.to_dict() for m in system_history] # Export cache metrics cache_history = self.performance_monitor._metrics_history["cache_metrics"] export_data["cache_metrics"] = [m.to_dict() for m in cache_history] # Export operation metrics for op_name, history in self.performance_monitor._metrics_history.items(): if op_name not in ["system_metrics", "cache_metrics"]: export_data["operation_metrics"][op_name] = [m.to_dict() for m in history] return export_data def reset_metrics(self): """Reset all collected metrics.""" self.performance_monitor.clear_history() logger.info("All metrics reset") class HealthCheck: """System health check functionality.""" def __init__(self, performance_monitor: Optional[PerformanceMonitor] = None): """ Initialize health check. Args: performance_monitor: Performance monitor instance """ self.performance_monitor = performance_monitor or PerformanceMonitor() self.netbox_client = None # Will be set by dependency injection # Health check thresholds self._thresholds = { "cpu_usage_warning": 70.0, "cpu_usage_critical": 90.0, "memory_usage_warning": 80.0, "memory_usage_critical": 95.0, "cache_hit_ratio_warning": 0.7, "cache_hit_ratio_critical": 0.5, "response_time_warning": 2.0, # seconds "response_time_critical": 5.0 } # Custom health checks self._checks: Dict[str, Callable] = {} logger.info("Health check system initialized") def register_check(self, name: str, check_function: Callable) -> None: """ Register a custom health check. Args: name: Name of the health check check_function: Function that returns health status dict """ self._checks[name] = check_function logger.info(f"Registered custom health check: {name}") def get_health_status(self) -> Dict[str, Any]: """Get overall system health status.""" health_status = { "overall_status": "healthy", "timestamp": datetime.now().isoformat(), "checks": {} } # Standard health checks checks = [ ("system_resources", self._check_system_resources), ("cache_performance", self._check_cache_performance), ("operation_performance", self._check_operation_performance), ("netbox_connectivity", self._check_netbox_connectivity) ] # Add custom checks for name, check_func in self._checks.items(): checks.append((name, check_func)) # Run all checks overall_status = "healthy" for check_name, check_func in checks: try: check_result = check_func() health_status["checks"][check_name] = check_result # Update overall status based on worst individual status check_status = check_result.get("status", "healthy") if check_status == "critical": overall_status = "critical" elif check_status == "warning" and overall_status != "critical": overall_status = "warning" except Exception as e: logger.error(f"Health check '{check_name}' failed: {e}") health_status["checks"][check_name] = { "status": "critical", "message": f"Health check failed: {e}", "timestamp": datetime.now().isoformat() } overall_status = "critical" health_status["overall_status"] = overall_status return health_status def _check_system_resources(self) -> Dict[str, Any]: """Check system resource usage.""" try: metrics = SystemMetrics.collect() status = "healthy" messages = [] # Check CPU usage if metrics.cpu_usage >= self._thresholds["cpu_usage_critical"]: status = "critical" messages.append(f"CPU usage critical: {metrics.cpu_usage:.1f}%") elif metrics.cpu_usage >= self._thresholds["cpu_usage_warning"]: status = "warning" messages.append(f"CPU usage high: {metrics.cpu_usage:.1f}%") # Check memory usage memory_percentage = metrics.memory_usage_percentage if memory_percentage >= self._thresholds["memory_usage_critical"]: status = "critical" messages.append(f"Memory usage critical: {memory_percentage:.1f}%") elif memory_percentage >= self._thresholds["memory_usage_warning"]: if status != "critical": status = "warning" messages.append(f"Memory usage high: {memory_percentage:.1f}%") return { "status": status, "message": "; ".join(messages) if messages else "System resources normal", "details": { "cpu_usage": metrics.cpu_usage, "memory_usage_percentage": memory_percentage, "active_connections": metrics.active_connections }, "timestamp": datetime.now().isoformat() } except Exception as e: return { "status": "critical", "message": f"Failed to check system resources: {e}", "timestamp": datetime.now().isoformat() } def _check_cache_performance(self) -> Dict[str, Any]: """Check cache performance.""" try: cache_metrics = self.performance_monitor.get_latest_cache_metrics() if not cache_metrics: return { "status": "warning", "message": "No cache metrics available", "timestamp": datetime.now().isoformat() } status = "healthy" messages = [] # Check hit ratio if cache_metrics.hit_ratio <= self._thresholds["cache_hit_ratio_critical"]: status = "critical" messages.append(f"Cache hit ratio critical: {cache_metrics.hit_ratio:.2%}") elif cache_metrics.hit_ratio <= self._thresholds["cache_hit_ratio_warning"]: status = "warning" messages.append(f"Cache hit ratio low: {cache_metrics.hit_ratio:.2%}") return { "status": status, "message": "; ".join(messages) if messages else "Cache performance normal", "details": { "hit_ratio": cache_metrics.hit_ratio, "cache_size_mb": cache_metrics.cache_size_mb, "total_requests": cache_metrics.total_requests }, "timestamp": datetime.now().isoformat() } except Exception as e: return { "status": "critical", "message": f"Failed to check cache performance: {e}", "timestamp": datetime.now().isoformat() } def _check_operation_performance(self) -> Dict[str, Any]: """Check operation performance.""" try: all_stats = self.performance_monitor.get_all_operations_summary() if not all_stats: return { "status": "healthy", "message": "No operation metrics available", "timestamp": datetime.now().isoformat() } status = "healthy" messages = [] slow_operations = [] # Check for slow operations for op_name, stats in all_stats.items(): avg_duration = stats.get("average_duration", 0) if avg_duration >= self._thresholds["response_time_critical"]: status = "critical" slow_operations.append(f"{op_name}: {avg_duration:.2f}s") elif avg_duration >= self._thresholds["response_time_warning"]: if status != "critical": status = "warning" slow_operations.append(f"{op_name}: {avg_duration:.2f}s") if slow_operations: messages.append(f"Slow operations detected: {', '.join(slow_operations)}") return { "status": status, "message": "; ".join(messages) if messages else "Operation performance normal", "details": { "total_operations": len(all_stats), "slow_operations": len(slow_operations) }, "timestamp": datetime.now().isoformat() } except Exception as e: return { "status": "critical", "message": f"Failed to check operation performance: {e}", "timestamp": datetime.now().isoformat() } def _check_netbox_connectivity(self) -> Dict[str, Any]: """Check NetBox connectivity.""" try: if not self.netbox_client: return { "status": "warning", "message": "NetBox client not configured", "timestamp": datetime.now().isoformat() } # Perform an actual health check using the client import time start_time = time.time() try: # Try to fetch a small amount of data to test connectivity sites = self.netbox_client.dcim.sites.filter(limit=1) response_time_ms = round((time.time() - start_time) * 1000, 2) return { "status": "healthy", "message": "NetBox connectivity normal", "details": { "connected": True, "response_time_ms": response_time_ms, "test_endpoint": "dcim.sites" }, "timestamp": datetime.now().isoformat() } except Exception as conn_error: response_time_ms = round((time.time() - start_time) * 1000, 2) return { "status": "critical", "message": f"NetBox connectivity failed: {str(conn_error)}", "details": { "connected": False, "response_time_ms": response_time_ms, "error": str(conn_error) }, "timestamp": datetime.now().isoformat() } except Exception as e: return { "status": "critical", "message": f"NetBox connectivity failed: {e}", "timestamp": datetime.now().isoformat() } class MetricsDashboard: """Metrics dashboard for visualization and reporting.""" def __init__(self, metrics_collector: Optional[MetricsCollector] = None): """ Initialize metrics dashboard. Args: metrics_collector: Metrics collector instance """ self.metrics_collector = metrics_collector or MetricsCollector() self.health_check = HealthCheck(self.metrics_collector.performance_monitor) logger.info("Metrics dashboard initialized") def get_dashboard_data(self) -> Dict[str, Any]: """Get complete dashboard data.""" dashboard_data = { "timestamp": datetime.now().isoformat(), "overview": self._get_overview(), "operation_metrics": self._get_operation_metrics(), "cache_metrics": self._get_cache_metrics(), "system_metrics": self._get_system_metrics(), "health_status": self.health_check.get_health_status() } return dashboard_data def _get_overview(self) -> Dict[str, Any]: """Get overview statistics.""" monitor = self.metrics_collector.performance_monitor all_stats = monitor.get_all_operations_summary() # Calculate overview metrics total_operations = sum(stats.get("total_operations", 0) for stats in all_stats.values()) total_successful = sum(stats.get("successful_operations", 0) for stats in all_stats.values()) success_rate = total_successful / total_operations if total_operations > 0 else 0.0 # Get average response time avg_durations = [stats.get("average_duration", 0) for stats in all_stats.values() if stats.get("average_duration", 0) > 0] avg_response_time = sum(avg_durations) / len(avg_durations) if avg_durations else 0.0 return { "total_operations": total_operations, "success_rate": success_rate, "average_response_time": avg_response_time, "active_operations": len(all_stats), "uptime": "N/A" # Would track actual uptime } def _get_operation_metrics(self) -> List[Dict[str, Any]]: """Get operation metrics for dashboard.""" monitor = self.metrics_collector.performance_monitor all_stats = monitor.get_all_operations_summary() operation_metrics = [] for op_name, stats in all_stats.items(): operation_metrics.append({ "operation_name": op_name, "total_executions": stats.get("total_operations", 0), "success_rate": stats.get("success_rate", 0.0), "average_duration": stats.get("average_duration", 0.0), "min_duration": stats.get("min_duration", 0.0), "max_duration": stats.get("max_duration", 0.0), "last_execution": stats.get("last_execution", {}).isoformat() if stats.get("last_execution") else None }) # Sort by total executions operation_metrics.sort(key=lambda x: x["total_executions"], reverse=True) return operation_metrics def _get_cache_metrics(self) -> Optional[Dict[str, Any]]: """Get cache metrics for dashboard.""" cache_metrics = self.metrics_collector.performance_monitor.get_latest_cache_metrics() if cache_metrics: return cache_metrics.to_dict() return None def _get_system_metrics(self) -> Optional[Dict[str, Any]]: """Get system metrics for dashboard.""" system_metrics = self.metrics_collector.performance_monitor.get_latest_system_metrics() if system_metrics: return system_metrics.to_dict() return None def get_time_series_data(self, metric_type: str, duration_minutes: int = 60) -> Dict[str, Any]: """ Get time series data for charts. Args: metric_type: Type of metrics (system_metrics, cache_metrics, etc.) duration_minutes: Duration of data to retrieve Returns: Time series data for charting """ monitor = self.metrics_collector.performance_monitor history = monitor._metrics_history.get(metric_type, []) # Filter by time range cutoff_time = datetime.now() - timedelta(minutes=duration_minutes) filtered_history = [m for m in history if m.timestamp >= cutoff_time] timestamps = [m.timestamp.isoformat() for m in filtered_history] data_points = [m.to_dict() for m in filtered_history] return { "timestamps": timestamps, "data_points": data_points, "metric_type": metric_type, "duration_minutes": duration_minutes } def get_performance_summary(self) -> Dict[str, Any]: """Get performance summary for dashboard.""" overview = self._get_overview() operation_metrics = self._get_operation_metrics() # Get most frequent operations most_frequent = operation_metrics[:5] if operation_metrics else [] return { "total_operations": overview["total_operations"], "average_response_time": overview["average_response_time"], "success_rate": overview["success_rate"], "most_frequent_operations": most_frequent } def get_active_alerts(self) -> List[Dict[str, Any]]: """Get active alerts based on health checks.""" health_status = self.health_check.get_health_status() alerts = [] for check_name, check_result in health_status["checks"].items(): status = check_result.get("status", "healthy") if status in ["warning", "critical"]: alerts.append({ "check_name": check_name, "severity": status, "message": check_result.get("message", ""), "timestamp": check_result.get("timestamp", datetime.now().isoformat()) }) # Sort by severity severity_order = {"critical": 0, "warning": 1} alerts.sort(key=lambda x: severity_order.get(x["severity"], 2)) return alerts def export_data(self, format: str = "json") -> str: """ Export dashboard data in specified format. Args: format: Export format ("json" or "csv") Returns: Exported data as string """ dashboard_data = self.get_dashboard_data() if format.lower() == "json": return json.dumps(dashboard_data, indent=2, default=str) elif format.lower() == "csv": output = io.StringIO() writer = csv.writer(output) # Write operation metrics as CSV writer.writerow(["Operation", "Total Executions", "Success Rate", "Avg Duration", "Min Duration", "Max Duration"]) for metric in dashboard_data["operation_metrics"]: writer.writerow([ metric["operation_name"], metric["total_executions"], f"{metric['success_rate']:.2%}", f"{metric['average_duration']:.3f}", f"{metric['min_duration']:.3f}", f"{metric['max_duration']:.3f}" ]) return output.getvalue() else: raise ValueError(f"Unsupported export format: {format}") # Global performance monitor instance _global_monitor: Optional[PerformanceMonitor] = None _monitor_lock = threading.Lock() def get_performance_monitor() -> PerformanceMonitor: """Get global performance monitor instance (singleton).""" global _global_monitor if _global_monitor is None: with _monitor_lock: if _global_monitor is None: _global_monitor = PerformanceMonitor() return _global_monitor # Decorator for monitoring function performance def monitor_performance(operation_name: Optional[str] = None): """ Decorator to monitor function performance. Args: operation_name: Custom operation name (defaults to function name) Example: @monitor_performance("netbox_create_device") def create_device(name: str) -> Dict[str, Any]: # Function implementation pass """ def decorator(func): nonlocal operation_name if operation_name is None: operation_name = func.__name__ if asyncio.iscoroutinefunction(func): async def async_wrapper(*args, **kwargs): monitor = get_performance_monitor() with monitor.time_operation(operation_name, kwargs): return await func(*args, **kwargs) return async_wrapper else: def sync_wrapper(*args, **kwargs): monitor = get_performance_monitor() with monitor.time_operation(operation_name, kwargs): return func(*args, **kwargs) return sync_wrapper return decorator

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Deployment-Team/netbox-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server