Skip to main content
Glama

Katamari MCP Server

by ciphernaut
observability.py25.7 kB
""" Monitoring and Observability System for Katamari MCP Provides comprehensive monitoring, metrics collection, and observability features for production deployments and development insights. """ import asyncio import json import logging import time import psutil from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Any, Callable, Union from dataclasses import dataclass, field, asdict from enum import Enum from collections import defaultdict, deque import threading import uuid logger = logging.getLogger(__name__) class MetricType(Enum): """Types of metrics that can be collected.""" COUNTER = "counter" GAUGE = "gauge" HISTOGRAM = "histogram" TIMER = "timer" class AlertSeverity(Enum): """Alert severity levels.""" INFO = "info" WARNING = "warning" ERROR = "error" CRITICAL = "critical" @dataclass class MetricPoint: """A single metric data point.""" timestamp: datetime value: Union[int, float] labels: Dict[str, str] = field(default_factory=dict) @dataclass class Metric: """A metric with its data points.""" name: str metric_type: MetricType description: str data_points: deque = field(default_factory=lambda: deque(maxlen=1000)) unit: str = "" def add_point(self, value: Union[int, float], labels: Optional[Dict[str, str]] = None): """Add a data point to the metric.""" point = MetricPoint( timestamp=datetime.now(), value=value, labels=labels or {} ) self.data_points.append(point) def get_latest(self) -> Optional[MetricPoint]: """Get the latest data point.""" return self.data_points[-1] if self.data_points else None def get_average(self, minutes: int = 5) -> Optional[float]: """Get average value over the last N minutes.""" cutoff = datetime.now() - timedelta(minutes=minutes) recent_points = [p.value for p in self.data_points if p.timestamp >= cutoff] return sum(recent_points) / len(recent_points) if recent_points else None @dataclass class Alert: """An alert condition.""" alert_id: str name: str description: str severity: AlertSeverity condition: str # Metric expression threshold: float enabled: bool = True triggered_count: int = 0 last_triggered: Optional[datetime] = None resolved_count: int = 0 last_resolved: Optional[datetime] = None @dataclass class HealthCheck: """A health check definition.""" name: str check_func: Callable[[], Any] interval: float = 60.0 # seconds timeout: float = 10.0 enabled: bool = True last_check: Optional[datetime] = None last_status: Optional[bool] = None failure_count: int = 0 class MetricsCollector: """Collects and manages metrics.""" def __init__(self): self.metrics: Dict[str, Metric] = {} self.counters: Dict[str, int] = defaultdict(int) self.gauges: Dict[str, float] = defaultdict(float) self.histograms: Dict[str, List[float]] = defaultdict(list) self.timers: Dict[str, List[float]] = defaultdict(list) def counter(self, name: str, description: str = "") -> str: """Create or get a counter metric.""" if name not in self.metrics: self.metrics[name] = Metric( name=name, metric_type=MetricType.COUNTER, description=description, unit="count" ) return name def gauge(self, name: str, description: str = "") -> str: """Create or get a gauge metric.""" if name not in self.metrics: self.metrics[name] = Metric( name=name, metric_type=MetricType.GAUGE, description=description, unit="value" ) return name def histogram(self, name: str, description: str = "") -> str: """Create or get a histogram metric.""" if name not in self.metrics: self.metrics[name] = Metric( name=name, metric_type=MetricType.HISTOGRAM, description=description, unit="value" ) return name def timer(self, name: str, description: str = "") -> str: """Create or get a timer metric.""" if name not in self.metrics: self.metrics[name] = Metric( name=name, metric_type=MetricType.TIMER, description=description, unit="seconds" ) return name def increment(self, name: str, value: int = 1, labels: Optional[Dict[str, str]] = None): """Increment a counter metric.""" self.counters[name] += value if name in self.metrics: self.metrics[name].add_point(self.counters[name], labels) def set(self, name: str, value: float, labels: Optional[Dict[str, str]] = None): """Set a gauge metric value.""" self.gauges[name] = value if name in self.metrics: self.metrics[name].add_point(value, labels) def observe(self, name: str, value: float, labels: Optional[Dict[str, str]] = None): """Observe a histogram metric value.""" self.histograms[name].append(value) if name in self.metrics: self.metrics[name].add_point(value, labels) def record_time(self, name: str, duration: float, labels: Optional[Dict[str, str]] = None): """Record a timer metric duration.""" self.timers[name].append(duration) if name in self.metrics: self.metrics[name].add_point(duration, labels) def get_metric(self, name: str) -> Optional[Metric]: """Get a metric by name.""" return self.metrics.get(name) def get_all_metrics(self) -> Dict[str, Metric]: """Get all metrics.""" return self.metrics.copy() class SystemMonitor: """Monitors system resources and performance.""" def __init__(self, metrics_collector: MetricsCollector): self.metrics = metrics_collector self.monitoring = False self.monitor_task: Optional[asyncio.Task] = None self.interval = 5.0 # seconds # Create system metrics self.cpu_metric = self.metrics.gauge("system_cpu_percent", "CPU usage percentage") self.memory_metric = self.metrics.gauge("system_memory_percent", "Memory usage percentage") self.disk_metric = self.metrics.gauge("system_disk_percent", "Disk usage percentage") self.processes_metric = self.metrics.gauge("system_processes", "Number of running processes") self.uptime_metric = self.metrics.counter("system_uptime_seconds", "System uptime in seconds") # Start time for uptime calculation self.start_time = time.time() async def start_monitoring(self): """Start system monitoring.""" if self.monitoring: return self.monitoring = True self.monitor_task = asyncio.create_task(self._monitor_loop()) logger.info("System monitoring started") async def stop_monitoring(self): """Stop system monitoring.""" self.monitoring = False if self.monitor_task: self.monitor_task.cancel() try: await self.monitor_task except asyncio.CancelledError: pass logger.info("System monitoring stopped") async def _monitor_loop(self): """Main monitoring loop.""" while self.monitoring: try: await self._collect_system_metrics() await asyncio.sleep(self.interval) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in system monitoring: {e}") await asyncio.sleep(self.interval) async def _collect_system_metrics(self): """Collect system metrics.""" try: # CPU usage cpu_percent = psutil.cpu_percent(interval=1) self.metrics.set(self.cpu_metric, cpu_percent) # Memory usage memory = psutil.virtual_memory() self.metrics.set(self.memory_metric, memory.percent) # Disk usage disk = psutil.disk_usage('/') disk_percent = (disk.used / disk.total) * 100 self.metrics.set(self.disk_metric, disk_percent) # Process count process_count = len(psutil.pids()) self.metrics.set(self.processes_metric, process_count) # Uptime uptime = time.time() - self.start_time self.metrics.set("system_uptime_seconds", uptime) except Exception as e: logger.error(f"Failed to collect system metrics: {e}") class AlertManager: """Manages alerts and notifications.""" def __init__(self, metrics_collector: MetricsCollector): self.metrics = metrics_collector self.alerts: Dict[str, Alert] = {} self.alert_history: List[Dict[str, Any]] = [] self.check_interval = 30.0 # seconds self.checking = False self.check_task: Optional[asyncio.Task] = None # Setup default alerts self._setup_default_alerts() def _setup_default_alerts(self): """Setup default alert conditions.""" # High CPU usage alert self.create_alert( name="high_cpu_usage", description="CPU usage is above 80%", severity=AlertSeverity.WARNING, condition="system_cpu_percent > 80", threshold=80.0 ) # High memory usage alert self.create_alert( name="high_memory_usage", description="Memory usage is above 85%", severity=AlertSeverity.WARNING, condition="system_memory_percent > 85", threshold=85.0 ) # Critical memory usage alert self.create_alert( name="critical_memory_usage", description="Memory usage is above 95%", severity=AlertSeverity.CRITICAL, condition="system_memory_percent > 95", threshold=95.0 ) def create_alert( self, name: str, description: str, severity: AlertSeverity, condition: str, threshold: float ) -> str: """Create a new alert.""" alert_id = str(uuid.uuid4()) alert = Alert( alert_id=alert_id, name=name, description=description, severity=severity, condition=condition, threshold=threshold ) self.alerts[alert_id] = alert logger.info(f"Created alert: {name}") return alert_id async def start_monitoring(self): """Start alert monitoring.""" if self.checking: return self.checking = True self.check_task = asyncio.create_task(self._check_loop()) logger.info("Alert monitoring started") async def stop_monitoring(self): """Stop alert monitoring.""" self.checking = False if self.check_task: self.check_task.cancel() try: await self.check_task except asyncio.CancelledError: pass logger.info("Alert monitoring stopped") async def _check_loop(self): """Main alert checking loop.""" while self.checking: try: await self._check_alerts() await asyncio.sleep(self.check_interval) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in alert checking: {e}") await asyncio.sleep(self.check_interval) async def _check_alerts(self): """Check all alert conditions.""" for alert in self.alerts.values(): if not alert.enabled: continue try: triggered = await self._evaluate_condition(alert.condition) if triggered and alert.last_triggered is None: # Alert triggered for the first time await self._trigger_alert(alert) elif not triggered and alert.last_triggered is not None: # Alert resolved await self._resolve_alert(alert) except Exception as e: logger.error(f"Error checking alert {alert.name}: {e}") async def _evaluate_condition(self, condition: str) -> bool: """Evaluate an alert condition.""" # Simple condition evaluation for common metrics try: # Parse conditions like "system_cpu_percent > 80" if ">" in condition: metric_name, threshold_str = condition.split(">", 1) metric_name = metric_name.strip() threshold = float(threshold_str.strip()) metric = self.metrics.get_metric(metric_name) if metric: latest = metric.get_latest() if latest: return latest.value > threshold elif "<" in condition: metric_name, threshold_str = condition.split("<", 1) metric_name = metric_name.strip() threshold = float(threshold_str.strip()) metric = self.metrics.get_metric(metric_name) if metric: latest = metric.get_latest() if latest: return latest.value < threshold return False except Exception as e: logger.error(f"Error evaluating condition '{condition}': {e}") return False async def _trigger_alert(self, alert: Alert): """Trigger an alert.""" alert.triggered_count += 1 alert.last_triggered = datetime.now() alert_event = { "alert_id": alert.alert_id, "name": alert.name, "description": alert.description, "severity": alert.severity.value, "timestamp": alert.last_triggered.isoformat(), "type": "triggered" } self.alert_history.append(alert_event) # Log the alert log_msg = f"ALERT TRIGGERED: {alert.name} - {alert.description}" if alert.severity == AlertSeverity.CRITICAL: logger.critical(log_msg) elif alert.severity == AlertSeverity.ERROR: logger.error(log_msg) elif alert.severity == AlertSeverity.WARNING: logger.warning(log_msg) else: logger.info(log_msg) async def _resolve_alert(self, alert: Alert): """Resolve an alert.""" alert.resolved_count += 1 alert.last_resolved = datetime.now() alert_event = { "alert_id": alert.alert_id, "name": alert.name, "description": alert.description, "severity": alert.severity.value, "timestamp": alert.last_resolved.isoformat(), "type": "resolved" } self.alert_history.append(alert_event) logger.info(f"ALERT RESOLVED: {alert.name} - {alert.description}") class HealthChecker: """Manages health checks.""" def __init__(self): self.health_checks: Dict[str, HealthCheck] = {} self.checking = False self.check_task: Optional[asyncio.Task] = None # Setup default health checks self._setup_default_checks() def _setup_default_checks(self): """Setup default health checks.""" # System health check self.add_health_check( name="system_resources", check_func=self._check_system_resources, interval=60.0 ) # Disk space check self.add_health_check( name="disk_space", check_func=self._check_disk_space, interval=120.0 ) def add_health_check( self, name: str, check_func: Callable[[], Any], interval: float = 60.0, timeout: float = 10.0 ) -> str: """Add a health check.""" health_check = HealthCheck( name=name, check_func=check_func, interval=interval, timeout=timeout ) self.health_checks[name] = health_check logger.info(f"Added health check: {name}") return name async def start_monitoring(self): """Start health check monitoring.""" if self.checking: return self.checking = True self.check_task = asyncio.create_task(self._check_loop()) logger.info("Health check monitoring started") async def stop_monitoring(self): """Stop health check monitoring.""" self.checking = False if self.check_task: self.check_task.cancel() try: await self.check_task except asyncio.CancelledError: pass logger.info("Health check monitoring stopped") async def _check_loop(self): """Main health checking loop.""" while self.checking: try: await self._run_health_checks() await asyncio.sleep(30.0) # Check every 30 seconds except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in health checking: {e}") await asyncio.sleep(30.0) async def _run_health_checks(self): """Run all health checks.""" current_time = datetime.now() for check in self.health_checks.values(): if not check.enabled: continue # Check if it's time to run this check if (check.last_check is None or (current_time - check.last_check).total_seconds() >= check.interval): try: # Run the check with timeout result = await asyncio.wait_for( self._run_check(check), timeout=check.timeout ) check.last_check = current_time check.last_status = result if result: check.failure_count = 0 else: check.failure_count += 1 logger.warning(f"Health check failed: {check.name} (failure #{check.failure_count})") except asyncio.TimeoutError: check.failure_count += 1 check.last_status = False logger.error(f"Health check timeout: {check.name}") except Exception as e: check.failure_count += 1 check.last_status = False logger.error(f"Health check error: {check.name} - {e}") async def _run_check(self, check: HealthCheck) -> bool: """Run a single health check.""" try: if asyncio.iscoroutinefunction(check.check_func): result = await check.check_func() else: result = check.check_func() return bool(result) except Exception as e: logger.error(f"Health check {check.name} failed: {e}") return False def _check_system_resources(self) -> bool: """Check system resources.""" try: cpu_percent = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() # Consider healthy if CPU < 90% and memory < 90% return cpu_percent < 90 and memory.percent < 90 except Exception: return False def _check_disk_space(self) -> bool: """Check disk space.""" try: disk = psutil.disk_usage('/') disk_percent = (disk.used / disk.total) * 100 # Consider healthy if disk usage < 95% return disk_percent < 95 except Exception: return False def get_health_status(self) -> Dict[str, Any]: """Get overall health status.""" healthy_checks = sum(1 for check in self.health_checks.values() if check.last_status is True) total_checks = len(self.health_checks) overall_healthy = healthy_checks == total_checks and total_checks > 0 return { "healthy": overall_healthy, "healthy_checks": healthy_checks, "total_checks": total_checks, "checks": { name: { "healthy": check.last_status, "last_check": check.last_check.isoformat() if check.last_check else None, "failure_count": check.failure_count } for name, check in self.health_checks.items() } } class ObservabilitySystem: """Main observability system that coordinates all monitoring components.""" def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or {} self.metrics_collector = MetricsCollector() self.system_monitor = SystemMonitor(self.metrics_collector) self.alert_manager = AlertManager(self.metrics_collector) self.health_checker = HealthChecker() self.running = False self.start_time = datetime.now() async def start(self): """Start the observability system.""" if self.running: return self.running = True # Start all components await self.system_monitor.start_monitoring() await self.alert_manager.start_monitoring() await self.health_checker.start_monitoring() logger.info("Observability system started") async def stop(self): """Stop the observability system.""" if not self.running: return self.running = False # Stop all components await self.system_monitor.stop_monitoring() await self.alert_manager.stop_monitoring() await self.health_checker.stop_monitoring() logger.info("Observability system stopped") def get_metrics_summary(self) -> Dict[str, Any]: """Get a summary of all metrics.""" metrics = self.metrics_collector.get_all_metrics() summary = { "total_metrics": len(metrics), "metrics": {} } for name, metric in metrics.items(): latest = metric.get_latest() average_5m = metric.get_average(5) summary["metrics"][name] = { "type": metric.metric_type.value, "description": metric.description, "unit": metric.unit, "latest_value": latest.value if latest else None, "latest_timestamp": latest.timestamp.isoformat() if latest else None, "average_5m": average_5m, "data_points_count": len(metric.data_points) } return summary def get_alerts_summary(self) -> Dict[str, Any]: """Get a summary of alerts.""" active_alerts = [ alert for alert in self.alert_manager.alerts.values() if alert.last_triggered is not None and alert.last_resolved is None ] return { "total_alerts": len(self.alert_manager.alerts), "active_alerts": len(active_alerts), "recent_alerts": self.alert_manager.alert_history[-10:], # Last 10 alert events "alerts": { alert.alert_id: { "name": alert.name, "description": alert.description, "severity": alert.severity.value, "enabled": alert.enabled, "triggered_count": alert.triggered_count, "last_triggered": alert.last_triggered.isoformat() if alert.last_triggered else None, "resolved_count": alert.resolved_count, "last_resolved": alert.last_resolved.isoformat() if alert.last_resolved else None } for alert in self.alert_manager.alerts.values() } } def get_system_status(self) -> Dict[str, Any]: """Get overall system status.""" uptime = datetime.now() - self.start_time return { "uptime_seconds": uptime.total_seconds(), "uptime_human": str(uptime), "observability_running": self.running, "health": self.health_checker.get_health_status(), "metrics": self.get_metrics_summary(), "alerts": self.get_alerts_summary() }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server