observability.py•25.7 kB
"""
Monitoring and Observability System for Katamari MCP
Provides comprehensive monitoring, metrics collection, and observability
features for production deployments and development insights.
"""
import asyncio
import json
import logging
import time
import psutil
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any, Callable, Union
from dataclasses import dataclass, field, asdict
from enum import Enum
from collections import defaultdict, deque
import threading
import uuid
logger = logging.getLogger(__name__)
class MetricType(Enum):
"""Types of metrics that can be collected."""
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
TIMER = "timer"
class AlertSeverity(Enum):
"""Alert severity levels."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class MetricPoint:
"""A single metric data point."""
timestamp: datetime
value: Union[int, float]
labels: Dict[str, str] = field(default_factory=dict)
@dataclass
class Metric:
"""A metric with its data points."""
name: str
metric_type: MetricType
description: str
data_points: deque = field(default_factory=lambda: deque(maxlen=1000))
unit: str = ""
def add_point(self, value: Union[int, float], labels: Optional[Dict[str, str]] = None):
"""Add a data point to the metric."""
point = MetricPoint(
timestamp=datetime.now(),
value=value,
labels=labels or {}
)
self.data_points.append(point)
def get_latest(self) -> Optional[MetricPoint]:
"""Get the latest data point."""
return self.data_points[-1] if self.data_points else None
def get_average(self, minutes: int = 5) -> Optional[float]:
"""Get average value over the last N minutes."""
cutoff = datetime.now() - timedelta(minutes=minutes)
recent_points = [p.value for p in self.data_points if p.timestamp >= cutoff]
return sum(recent_points) / len(recent_points) if recent_points else None
@dataclass
class Alert:
"""An alert condition."""
alert_id: str
name: str
description: str
severity: AlertSeverity
condition: str # Metric expression
threshold: float
enabled: bool = True
triggered_count: int = 0
last_triggered: Optional[datetime] = None
resolved_count: int = 0
last_resolved: Optional[datetime] = None
@dataclass
class HealthCheck:
"""A health check definition."""
name: str
check_func: Callable[[], Any]
interval: float = 60.0 # seconds
timeout: float = 10.0
enabled: bool = True
last_check: Optional[datetime] = None
last_status: Optional[bool] = None
failure_count: int = 0
class MetricsCollector:
"""Collects and manages metrics."""
def __init__(self):
self.metrics: Dict[str, Metric] = {}
self.counters: Dict[str, int] = defaultdict(int)
self.gauges: Dict[str, float] = defaultdict(float)
self.histograms: Dict[str, List[float]] = defaultdict(list)
self.timers: Dict[str, List[float]] = defaultdict(list)
def counter(self, name: str, description: str = "") -> str:
"""Create or get a counter metric."""
if name not in self.metrics:
self.metrics[name] = Metric(
name=name,
metric_type=MetricType.COUNTER,
description=description,
unit="count"
)
return name
def gauge(self, name: str, description: str = "") -> str:
"""Create or get a gauge metric."""
if name not in self.metrics:
self.metrics[name] = Metric(
name=name,
metric_type=MetricType.GAUGE,
description=description,
unit="value"
)
return name
def histogram(self, name: str, description: str = "") -> str:
"""Create or get a histogram metric."""
if name not in self.metrics:
self.metrics[name] = Metric(
name=name,
metric_type=MetricType.HISTOGRAM,
description=description,
unit="value"
)
return name
def timer(self, name: str, description: str = "") -> str:
"""Create or get a timer metric."""
if name not in self.metrics:
self.metrics[name] = Metric(
name=name,
metric_type=MetricType.TIMER,
description=description,
unit="seconds"
)
return name
def increment(self, name: str, value: int = 1, labels: Optional[Dict[str, str]] = None):
"""Increment a counter metric."""
self.counters[name] += value
if name in self.metrics:
self.metrics[name].add_point(self.counters[name], labels)
def set(self, name: str, value: float, labels: Optional[Dict[str, str]] = None):
"""Set a gauge metric value."""
self.gauges[name] = value
if name in self.metrics:
self.metrics[name].add_point(value, labels)
def observe(self, name: str, value: float, labels: Optional[Dict[str, str]] = None):
"""Observe a histogram metric value."""
self.histograms[name].append(value)
if name in self.metrics:
self.metrics[name].add_point(value, labels)
def record_time(self, name: str, duration: float, labels: Optional[Dict[str, str]] = None):
"""Record a timer metric duration."""
self.timers[name].append(duration)
if name in self.metrics:
self.metrics[name].add_point(duration, labels)
def get_metric(self, name: str) -> Optional[Metric]:
"""Get a metric by name."""
return self.metrics.get(name)
def get_all_metrics(self) -> Dict[str, Metric]:
"""Get all metrics."""
return self.metrics.copy()
class SystemMonitor:
"""Monitors system resources and performance."""
def __init__(self, metrics_collector: MetricsCollector):
self.metrics = metrics_collector
self.monitoring = False
self.monitor_task: Optional[asyncio.Task] = None
self.interval = 5.0 # seconds
# Create system metrics
self.cpu_metric = self.metrics.gauge("system_cpu_percent", "CPU usage percentage")
self.memory_metric = self.metrics.gauge("system_memory_percent", "Memory usage percentage")
self.disk_metric = self.metrics.gauge("system_disk_percent", "Disk usage percentage")
self.processes_metric = self.metrics.gauge("system_processes", "Number of running processes")
self.uptime_metric = self.metrics.counter("system_uptime_seconds", "System uptime in seconds")
# Start time for uptime calculation
self.start_time = time.time()
async def start_monitoring(self):
"""Start system monitoring."""
if self.monitoring:
return
self.monitoring = True
self.monitor_task = asyncio.create_task(self._monitor_loop())
logger.info("System monitoring started")
async def stop_monitoring(self):
"""Stop system monitoring."""
self.monitoring = False
if self.monitor_task:
self.monitor_task.cancel()
try:
await self.monitor_task
except asyncio.CancelledError:
pass
logger.info("System monitoring stopped")
async def _monitor_loop(self):
"""Main monitoring loop."""
while self.monitoring:
try:
await self._collect_system_metrics()
await asyncio.sleep(self.interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in system monitoring: {e}")
await asyncio.sleep(self.interval)
async def _collect_system_metrics(self):
"""Collect system metrics."""
try:
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
self.metrics.set(self.cpu_metric, cpu_percent)
# Memory usage
memory = psutil.virtual_memory()
self.metrics.set(self.memory_metric, memory.percent)
# Disk usage
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
self.metrics.set(self.disk_metric, disk_percent)
# Process count
process_count = len(psutil.pids())
self.metrics.set(self.processes_metric, process_count)
# Uptime
uptime = time.time() - self.start_time
self.metrics.set("system_uptime_seconds", uptime)
except Exception as e:
logger.error(f"Failed to collect system metrics: {e}")
class AlertManager:
"""Manages alerts and notifications."""
def __init__(self, metrics_collector: MetricsCollector):
self.metrics = metrics_collector
self.alerts: Dict[str, Alert] = {}
self.alert_history: List[Dict[str, Any]] = []
self.check_interval = 30.0 # seconds
self.checking = False
self.check_task: Optional[asyncio.Task] = None
# Setup default alerts
self._setup_default_alerts()
def _setup_default_alerts(self):
"""Setup default alert conditions."""
# High CPU usage alert
self.create_alert(
name="high_cpu_usage",
description="CPU usage is above 80%",
severity=AlertSeverity.WARNING,
condition="system_cpu_percent > 80",
threshold=80.0
)
# High memory usage alert
self.create_alert(
name="high_memory_usage",
description="Memory usage is above 85%",
severity=AlertSeverity.WARNING,
condition="system_memory_percent > 85",
threshold=85.0
)
# Critical memory usage alert
self.create_alert(
name="critical_memory_usage",
description="Memory usage is above 95%",
severity=AlertSeverity.CRITICAL,
condition="system_memory_percent > 95",
threshold=95.0
)
def create_alert(
self,
name: str,
description: str,
severity: AlertSeverity,
condition: str,
threshold: float
) -> str:
"""Create a new alert."""
alert_id = str(uuid.uuid4())
alert = Alert(
alert_id=alert_id,
name=name,
description=description,
severity=severity,
condition=condition,
threshold=threshold
)
self.alerts[alert_id] = alert
logger.info(f"Created alert: {name}")
return alert_id
async def start_monitoring(self):
"""Start alert monitoring."""
if self.checking:
return
self.checking = True
self.check_task = asyncio.create_task(self._check_loop())
logger.info("Alert monitoring started")
async def stop_monitoring(self):
"""Stop alert monitoring."""
self.checking = False
if self.check_task:
self.check_task.cancel()
try:
await self.check_task
except asyncio.CancelledError:
pass
logger.info("Alert monitoring stopped")
async def _check_loop(self):
"""Main alert checking loop."""
while self.checking:
try:
await self._check_alerts()
await asyncio.sleep(self.check_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in alert checking: {e}")
await asyncio.sleep(self.check_interval)
async def _check_alerts(self):
"""Check all alert conditions."""
for alert in self.alerts.values():
if not alert.enabled:
continue
try:
triggered = await self._evaluate_condition(alert.condition)
if triggered and alert.last_triggered is None:
# Alert triggered for the first time
await self._trigger_alert(alert)
elif not triggered and alert.last_triggered is not None:
# Alert resolved
await self._resolve_alert(alert)
except Exception as e:
logger.error(f"Error checking alert {alert.name}: {e}")
async def _evaluate_condition(self, condition: str) -> bool:
"""Evaluate an alert condition."""
# Simple condition evaluation for common metrics
try:
# Parse conditions like "system_cpu_percent > 80"
if ">" in condition:
metric_name, threshold_str = condition.split(">", 1)
metric_name = metric_name.strip()
threshold = float(threshold_str.strip())
metric = self.metrics.get_metric(metric_name)
if metric:
latest = metric.get_latest()
if latest:
return latest.value > threshold
elif "<" in condition:
metric_name, threshold_str = condition.split("<", 1)
metric_name = metric_name.strip()
threshold = float(threshold_str.strip())
metric = self.metrics.get_metric(metric_name)
if metric:
latest = metric.get_latest()
if latest:
return latest.value < threshold
return False
except Exception as e:
logger.error(f"Error evaluating condition '{condition}': {e}")
return False
async def _trigger_alert(self, alert: Alert):
"""Trigger an alert."""
alert.triggered_count += 1
alert.last_triggered = datetime.now()
alert_event = {
"alert_id": alert.alert_id,
"name": alert.name,
"description": alert.description,
"severity": alert.severity.value,
"timestamp": alert.last_triggered.isoformat(),
"type": "triggered"
}
self.alert_history.append(alert_event)
# Log the alert
log_msg = f"ALERT TRIGGERED: {alert.name} - {alert.description}"
if alert.severity == AlertSeverity.CRITICAL:
logger.critical(log_msg)
elif alert.severity == AlertSeverity.ERROR:
logger.error(log_msg)
elif alert.severity == AlertSeverity.WARNING:
logger.warning(log_msg)
else:
logger.info(log_msg)
async def _resolve_alert(self, alert: Alert):
"""Resolve an alert."""
alert.resolved_count += 1
alert.last_resolved = datetime.now()
alert_event = {
"alert_id": alert.alert_id,
"name": alert.name,
"description": alert.description,
"severity": alert.severity.value,
"timestamp": alert.last_resolved.isoformat(),
"type": "resolved"
}
self.alert_history.append(alert_event)
logger.info(f"ALERT RESOLVED: {alert.name} - {alert.description}")
class HealthChecker:
"""Manages health checks."""
def __init__(self):
self.health_checks: Dict[str, HealthCheck] = {}
self.checking = False
self.check_task: Optional[asyncio.Task] = None
# Setup default health checks
self._setup_default_checks()
def _setup_default_checks(self):
"""Setup default health checks."""
# System health check
self.add_health_check(
name="system_resources",
check_func=self._check_system_resources,
interval=60.0
)
# Disk space check
self.add_health_check(
name="disk_space",
check_func=self._check_disk_space,
interval=120.0
)
def add_health_check(
self,
name: str,
check_func: Callable[[], Any],
interval: float = 60.0,
timeout: float = 10.0
) -> str:
"""Add a health check."""
health_check = HealthCheck(
name=name,
check_func=check_func,
interval=interval,
timeout=timeout
)
self.health_checks[name] = health_check
logger.info(f"Added health check: {name}")
return name
async def start_monitoring(self):
"""Start health check monitoring."""
if self.checking:
return
self.checking = True
self.check_task = asyncio.create_task(self._check_loop())
logger.info("Health check monitoring started")
async def stop_monitoring(self):
"""Stop health check monitoring."""
self.checking = False
if self.check_task:
self.check_task.cancel()
try:
await self.check_task
except asyncio.CancelledError:
pass
logger.info("Health check monitoring stopped")
async def _check_loop(self):
"""Main health checking loop."""
while self.checking:
try:
await self._run_health_checks()
await asyncio.sleep(30.0) # Check every 30 seconds
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in health checking: {e}")
await asyncio.sleep(30.0)
async def _run_health_checks(self):
"""Run all health checks."""
current_time = datetime.now()
for check in self.health_checks.values():
if not check.enabled:
continue
# Check if it's time to run this check
if (check.last_check is None or
(current_time - check.last_check).total_seconds() >= check.interval):
try:
# Run the check with timeout
result = await asyncio.wait_for(
self._run_check(check),
timeout=check.timeout
)
check.last_check = current_time
check.last_status = result
if result:
check.failure_count = 0
else:
check.failure_count += 1
logger.warning(f"Health check failed: {check.name} (failure #{check.failure_count})")
except asyncio.TimeoutError:
check.failure_count += 1
check.last_status = False
logger.error(f"Health check timeout: {check.name}")
except Exception as e:
check.failure_count += 1
check.last_status = False
logger.error(f"Health check error: {check.name} - {e}")
async def _run_check(self, check: HealthCheck) -> bool:
"""Run a single health check."""
try:
if asyncio.iscoroutinefunction(check.check_func):
result = await check.check_func()
else:
result = check.check_func()
return bool(result)
except Exception as e:
logger.error(f"Health check {check.name} failed: {e}")
return False
def _check_system_resources(self) -> bool:
"""Check system resources."""
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
# Consider healthy if CPU < 90% and memory < 90%
return cpu_percent < 90 and memory.percent < 90
except Exception:
return False
def _check_disk_space(self) -> bool:
"""Check disk space."""
try:
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
# Consider healthy if disk usage < 95%
return disk_percent < 95
except Exception:
return False
def get_health_status(self) -> Dict[str, Any]:
"""Get overall health status."""
healthy_checks = sum(1 for check in self.health_checks.values()
if check.last_status is True)
total_checks = len(self.health_checks)
overall_healthy = healthy_checks == total_checks and total_checks > 0
return {
"healthy": overall_healthy,
"healthy_checks": healthy_checks,
"total_checks": total_checks,
"checks": {
name: {
"healthy": check.last_status,
"last_check": check.last_check.isoformat() if check.last_check else None,
"failure_count": check.failure_count
}
for name, check in self.health_checks.items()
}
}
class ObservabilitySystem:
"""Main observability system that coordinates all monitoring components."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.metrics_collector = MetricsCollector()
self.system_monitor = SystemMonitor(self.metrics_collector)
self.alert_manager = AlertManager(self.metrics_collector)
self.health_checker = HealthChecker()
self.running = False
self.start_time = datetime.now()
async def start(self):
"""Start the observability system."""
if self.running:
return
self.running = True
# Start all components
await self.system_monitor.start_monitoring()
await self.alert_manager.start_monitoring()
await self.health_checker.start_monitoring()
logger.info("Observability system started")
async def stop(self):
"""Stop the observability system."""
if not self.running:
return
self.running = False
# Stop all components
await self.system_monitor.stop_monitoring()
await self.alert_manager.stop_monitoring()
await self.health_checker.stop_monitoring()
logger.info("Observability system stopped")
def get_metrics_summary(self) -> Dict[str, Any]:
"""Get a summary of all metrics."""
metrics = self.metrics_collector.get_all_metrics()
summary = {
"total_metrics": len(metrics),
"metrics": {}
}
for name, metric in metrics.items():
latest = metric.get_latest()
average_5m = metric.get_average(5)
summary["metrics"][name] = {
"type": metric.metric_type.value,
"description": metric.description,
"unit": metric.unit,
"latest_value": latest.value if latest else None,
"latest_timestamp": latest.timestamp.isoformat() if latest else None,
"average_5m": average_5m,
"data_points_count": len(metric.data_points)
}
return summary
def get_alerts_summary(self) -> Dict[str, Any]:
"""Get a summary of alerts."""
active_alerts = [
alert for alert in self.alert_manager.alerts.values()
if alert.last_triggered is not None and alert.last_resolved is None
]
return {
"total_alerts": len(self.alert_manager.alerts),
"active_alerts": len(active_alerts),
"recent_alerts": self.alert_manager.alert_history[-10:], # Last 10 alert events
"alerts": {
alert.alert_id: {
"name": alert.name,
"description": alert.description,
"severity": alert.severity.value,
"enabled": alert.enabled,
"triggered_count": alert.triggered_count,
"last_triggered": alert.last_triggered.isoformat() if alert.last_triggered else None,
"resolved_count": alert.resolved_count,
"last_resolved": alert.last_resolved.isoformat() if alert.last_resolved else None
}
for alert in self.alert_manager.alerts.values()
}
}
def get_system_status(self) -> Dict[str, Any]:
"""Get overall system status."""
uptime = datetime.now() - self.start_time
return {
"uptime_seconds": uptime.total_seconds(),
"uptime_human": str(uptime),
"observability_running": self.running,
"health": self.health_checker.get_health_status(),
"metrics": self.get_metrics_summary(),
"alerts": self.get_alerts_summary()
}