Skip to main content
Glama
monitor.py23.7 kB
""" Resource Monitor for Process Registry. Monitors system and process resource usage. """ import asyncio import psutil from datetime import datetime, timezone, timedelta from typing import Optional, List, Dict, Any, Callable from dataclasses import dataclass, field from enum import Enum from collections import deque import statistics from ..utils.logging import get_logger from ..utils.errors import ShannonError from .storage import RegistryStorage, ProcessEntry, ProcessStatus from .tracker import ProcessTracker logger = get_logger(__name__) class ResourceType(str, Enum): """Types of resources to monitor.""" CPU = "cpu" MEMORY = "memory" DISK_IO = "disk_io" NETWORK_IO = "network_io" FILE_HANDLES = "file_handles" THREADS = "threads" class AlertSeverity(str, Enum): """Alert severity levels.""" INFO = "info" WARNING = "warning" CRITICAL = "critical" EMERGENCY = "emergency" @dataclass class ResourceStats: """Resource usage statistics.""" timestamp: datetime resource_type: ResourceType # Current values current_value: float current_percent: Optional[float] = None # Historical stats avg_1min: Optional[float] = None avg_5min: Optional[float] = None avg_15min: Optional[float] = None # Peaks peak_value: Optional[float] = None peak_time: Optional[datetime] = None # Additional metrics metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "timestamp": self.timestamp.isoformat(), "resource_type": self.resource_type.value, "current_value": self.current_value, "current_percent": self.current_percent, "averages": { "1min": self.avg_1min, "5min": self.avg_5min, "15min": self.avg_15min }, "peak": { "value": self.peak_value, "time": self.peak_time.isoformat() if self.peak_time else None }, "metadata": self.metadata } @dataclass class ResourceAlert: """Resource usage alert.""" timestamp: datetime severity: AlertSeverity resource_type: ResourceType process_id: Optional[int] session_id: Optional[str] message: str current_value: float threshold_value: float # Alert context duration_seconds: Optional[float] = None previous_alerts: int = 0 def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "timestamp": self.timestamp.isoformat(), "severity": self.severity.value, "resource_type": self.resource_type.value, "process_id": self.process_id, "session_id": self.session_id, "message": self.message, "current_value": self.current_value, "threshold_value": self.threshold_value, "duration_seconds": self.duration_seconds, "previous_alerts": self.previous_alerts } class ResourceMonitor: """Monitors resource usage for processes and system.""" def __init__( self, storage: RegistryStorage, tracker: ProcessTracker ): """ Initialize resource monitor. Args: storage: Registry storage instance tracker: Process tracker instance """ self.storage = storage self.tracker = tracker # Monitoring configuration self.sample_interval_seconds = 5 self.history_size = 180 # 15 minutes at 5s intervals # Resource thresholds self.thresholds = { ResourceType.CPU: { AlertSeverity.WARNING: 70.0, AlertSeverity.CRITICAL: 90.0, AlertSeverity.EMERGENCY: 95.0 }, ResourceType.MEMORY: { AlertSeverity.WARNING: 2048, # 2GB AlertSeverity.CRITICAL: 4096, # 4GB AlertSeverity.EMERGENCY: 8192 # 8GB }, ResourceType.FILE_HANDLES: { AlertSeverity.WARNING: 500, AlertSeverity.CRITICAL: 1000, AlertSeverity.EMERGENCY: 2000 }, ResourceType.THREADS: { AlertSeverity.WARNING: 50, AlertSeverity.CRITICAL: 100, AlertSeverity.EMERGENCY: 200 } } # Historical data self._system_history: Dict[ResourceType, deque] = { res_type: deque(maxlen=self.history_size) for res_type in ResourceType } self._process_history: Dict[int, Dict[ResourceType, deque]] = {} # Alert tracking self._active_alerts: Dict[str, ResourceAlert] = {} self._alert_callbacks: List[Callable[[ResourceAlert], None]] = [] # Monitoring task self._monitoring_task: Optional[asyncio.Task] = None self._stop_event = asyncio.Event() async def start_monitoring(self) -> None: """Start resource monitoring.""" if self._monitoring_task and not self._monitoring_task.done(): logger.warning("Resource monitoring already running") return self._stop_event.clear() self._monitoring_task = asyncio.create_task(self._monitoring_loop()) logger.info(f"Started resource monitoring with {self.sample_interval_seconds}s interval") async def stop_monitoring(self) -> None: """Stop resource monitoring.""" if not self._monitoring_task: return self._stop_event.set() try: await asyncio.wait_for(self._monitoring_task, timeout=5.0) except asyncio.TimeoutError: logger.warning("Monitoring task didn't stop gracefully, cancelling") self._monitoring_task.cancel() logger.info("Stopped resource monitoring") def add_alert_callback( self, callback: Callable[[ResourceAlert], None] ) -> None: """ Add alert callback. Args: callback: Function to call on alerts """ self._alert_callbacks.append(callback) async def get_system_stats(self) -> Dict[ResourceType, ResourceStats]: """ Get current system resource statistics. Returns: Dict of resource stats by type """ stats = {} now = datetime.now(timezone.utc) # CPU stats cpu_percent = psutil.cpu_percent(interval=1) stats[ResourceType.CPU] = ResourceStats( timestamp=now, resource_type=ResourceType.CPU, current_value=cpu_percent, current_percent=cpu_percent, metadata={ "cpu_count": psutil.cpu_count(), "cpu_freq": psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None } ) # Memory stats memory = psutil.virtual_memory() stats[ResourceType.MEMORY] = ResourceStats( timestamp=now, resource_type=ResourceType.MEMORY, current_value=memory.used / (1024 * 1024), # MB current_percent=memory.percent, metadata={ "total_mb": memory.total / (1024 * 1024), "available_mb": memory.available / (1024 * 1024), "swap_percent": psutil.swap_memory().percent } ) # Disk I/O stats disk_io = psutil.disk_io_counters() if disk_io: stats[ResourceType.DISK_IO] = ResourceStats( timestamp=now, resource_type=ResourceType.DISK_IO, current_value=(disk_io.read_bytes + disk_io.write_bytes) / (1024 * 1024), metadata={ "read_mb": disk_io.read_bytes / (1024 * 1024), "write_mb": disk_io.write_bytes / (1024 * 1024), "read_count": disk_io.read_count, "write_count": disk_io.write_count } ) # Network I/O stats net_io = psutil.net_io_counters() if net_io: stats[ResourceType.NETWORK_IO] = ResourceStats( timestamp=now, resource_type=ResourceType.NETWORK_IO, current_value=(net_io.bytes_sent + net_io.bytes_recv) / (1024 * 1024), metadata={ "sent_mb": net_io.bytes_sent / (1024 * 1024), "recv_mb": net_io.bytes_recv / (1024 * 1024), "packets_sent": net_io.packets_sent, "packets_recv": net_io.packets_recv } ) # Add historical averages for res_type, stat in stats.items(): self._add_historical_stats(stat, self._system_history[res_type]) return stats async def get_process_stats( self, pid: int ) -> Optional[Dict[ResourceType, ResourceStats]]: """ Get resource statistics for a process. Args: pid: Process ID Returns: Dict of resource stats by type, or None if process not found """ try: process = psutil.Process(pid) stats = {} now = datetime.now(timezone.utc) with process.oneshot(): # CPU stats cpu_percent = process.cpu_percent() stats[ResourceType.CPU] = ResourceStats( timestamp=now, resource_type=ResourceType.CPU, current_value=cpu_percent, current_percent=cpu_percent, metadata={ "cpu_num": process.cpu_num() if hasattr(process, 'cpu_num') else None } ) # Memory stats memory = process.memory_info() memory_percent = process.memory_percent() stats[ResourceType.MEMORY] = ResourceStats( timestamp=now, resource_type=ResourceType.MEMORY, current_value=memory.rss / (1024 * 1024), # MB current_percent=memory_percent, metadata={ "vms_mb": memory.vms / (1024 * 1024), "shared_mb": getattr(memory, 'shared', 0) / (1024 * 1024) } ) # File handles try: open_files = len(process.open_files()) stats[ResourceType.FILE_HANDLES] = ResourceStats( timestamp=now, resource_type=ResourceType.FILE_HANDLES, current_value=open_files ) except (psutil.AccessDenied, AttributeError): pass # Threads stats[ResourceType.THREADS] = ResourceStats( timestamp=now, resource_type=ResourceType.THREADS, current_value=process.num_threads() ) # Disk I/O (if available) try: io_counters = process.io_counters() stats[ResourceType.DISK_IO] = ResourceStats( timestamp=now, resource_type=ResourceType.DISK_IO, current_value=(io_counters.read_bytes + io_counters.write_bytes) / (1024 * 1024), metadata={ "read_mb": io_counters.read_bytes / (1024 * 1024), "write_mb": io_counters.write_bytes / (1024 * 1024) } ) except (psutil.AccessDenied, AttributeError): pass # Add historical averages if pid in self._process_history: for res_type, stat in stats.items(): if res_type in self._process_history[pid]: self._add_historical_stats( stat, self._process_history[pid][res_type] ) return stats except (psutil.NoSuchProcess, psutil.AccessDenied): return None async def get_session_stats( self, session_id: str ) -> Dict[str, Any]: """ Get aggregated resource statistics for a session. Args: session_id: Session ID Returns: Aggregated statistics """ processes = await self.storage.get_session_processes(session_id) total_stats = { "session_id": session_id, "process_count": len(processes), "total_cpu_percent": 0.0, "total_memory_mb": 0.0, "total_threads": 0, "total_file_handles": 0, "processes": [] } for entry in processes: stats = await self.get_process_stats(entry.pid) if stats: # Aggregate totals total_stats["total_cpu_percent"] += stats[ResourceType.CPU].current_value total_stats["total_memory_mb"] += stats[ResourceType.MEMORY].current_value if ResourceType.THREADS in stats: total_stats["total_threads"] += stats[ResourceType.THREADS].current_value if ResourceType.FILE_HANDLES in stats: total_stats["total_file_handles"] += stats[ResourceType.FILE_HANDLES].current_value # Add process info total_stats["processes"].append({ "pid": entry.pid, "command": entry.command, "cpu_percent": stats[ResourceType.CPU].current_value, "memory_mb": stats[ResourceType.MEMORY].current_value }) return total_stats async def check_alerts(self) -> List[ResourceAlert]: """ Check for resource alerts. Returns: List of new alerts """ new_alerts = [] # Check system alerts system_stats = await self.get_system_stats() for res_type, stats in system_stats.items(): if res_type in self.thresholds: alert = self._check_threshold( stats, None, None, self.thresholds[res_type] ) if alert: new_alerts.append(alert) # Check process alerts processes = await self.storage.get_all_processes( status=ProcessStatus.RUNNING, host=self.tracker.hostname ) for entry in processes: process_stats = await self.get_process_stats(entry.pid) if process_stats: for res_type, stats in process_stats.items(): if res_type in self.thresholds: alert = self._check_threshold( stats, entry.pid, entry.session_id, self.thresholds[res_type] ) if alert: new_alerts.append(alert) # Fire callbacks for new alerts for alert in new_alerts: for callback in self._alert_callbacks: try: callback(alert) except Exception as e: logger.error(f"Alert callback error: {e}") return new_alerts def _check_threshold( self, stats: ResourceStats, pid: Optional[int], session_id: Optional[str], thresholds: Dict[AlertSeverity, float] ) -> Optional[ResourceAlert]: """Check if stats exceed thresholds.""" # Find highest severity threshold exceeded triggered_severity = None threshold_value = None for severity in [ AlertSeverity.EMERGENCY, AlertSeverity.CRITICAL, AlertSeverity.WARNING ]: if severity in thresholds: if stats.current_value >= thresholds[severity]: triggered_severity = severity threshold_value = thresholds[severity] break if not triggered_severity: # Clear any existing alert alert_key = f"{stats.resource_type}_{pid or 'system'}" if alert_key in self._active_alerts: del self._active_alerts[alert_key] return None # Check if this is a new or escalated alert alert_key = f"{stats.resource_type}_{pid or 'system'}" existing_alert = self._active_alerts.get(alert_key) if existing_alert and existing_alert.severity == triggered_severity: # Same severity, update duration existing_alert.duration_seconds = ( datetime.now(timezone.utc) - existing_alert.timestamp ).total_seconds() return None # Not a new alert # Create new alert if pid: message = ( f"Process {pid} {stats.resource_type.value} usage " f"({stats.current_value:.1f}) exceeds {triggered_severity.value} " f"threshold ({threshold_value})" ) else: message = ( f"System {stats.resource_type.value} usage " f"({stats.current_value:.1f}) exceeds {triggered_severity.value} " f"threshold ({threshold_value})" ) alert = ResourceAlert( timestamp=datetime.now(timezone.utc), severity=triggered_severity, resource_type=stats.resource_type, process_id=pid, session_id=session_id, message=message, current_value=stats.current_value, threshold_value=threshold_value, previous_alerts=existing_alert.previous_alerts + 1 if existing_alert else 0 ) self._active_alerts[alert_key] = alert return alert def _add_historical_stats( self, stats: ResourceStats, history: deque ) -> None: """Add historical statistics to stats object.""" if not history: return now = datetime.now(timezone.utc) # Filter samples by time window samples_1min = [] samples_5min = [] samples_15min = [] for sample in history: age = (now - sample['timestamp']).total_seconds() if age <= 60: samples_1min.append(sample['value']) if age <= 300: samples_5min.append(sample['value']) if age <= 900: samples_15min.append(sample['value']) # Calculate averages if samples_1min: stats.avg_1min = statistics.mean(samples_1min) if samples_5min: stats.avg_5min = statistics.mean(samples_5min) if samples_15min: stats.avg_15min = statistics.mean(samples_15min) # Find peak if history: peak_sample = max(history, key=lambda x: x['value']) stats.peak_value = peak_sample['value'] stats.peak_time = peak_sample['timestamp'] async def _monitoring_loop(self) -> None: """Background monitoring loop.""" while not self._stop_event.is_set(): try: # Collect system stats system_stats = await self.get_system_stats() # Store history for res_type, stats in system_stats.items(): self._system_history[res_type].append({ 'timestamp': stats.timestamp, 'value': stats.current_value }) # Collect process stats processes = await self.storage.get_all_processes( status=ProcessStatus.RUNNING, host=self.tracker.hostname ) for entry in processes: process_stats = await self.get_process_stats(entry.pid) if process_stats: # Initialize history if needed if entry.pid not in self._process_history: self._process_history[entry.pid] = { res_type: deque(maxlen=self.history_size) for res_type in ResourceType } # Store history for res_type, stats in process_stats.items(): self._process_history[entry.pid][res_type].append({ 'timestamp': stats.timestamp, 'value': stats.current_value }) # Update storage with latest resource usage await self.storage.update_process_resources( pid=entry.pid, host=entry.host, cpu_percent=process_stats[ResourceType.CPU].current_value, memory_mb=process_stats[ResourceType.MEMORY].current_value, disk_read_mb=process_stats.get( ResourceType.DISK_IO, ResourceStats( datetime.now(timezone.utc), ResourceType.DISK_IO, 0 ) ).metadata.get('read_mb'), disk_write_mb=process_stats.get( ResourceType.DISK_IO, ResourceStats( datetime.now(timezone.utc), ResourceType.DISK_IO, 0 ) ).metadata.get('write_mb') ) # Clean up old process history current_pids = {p.pid for p in processes} dead_pids = set(self._process_history.keys()) - current_pids for pid in dead_pids: del self._process_history[pid] # Check for alerts await self.check_alerts() # Wait for next iteration await asyncio.wait_for( self._stop_event.wait(), timeout=self.sample_interval_seconds ) except asyncio.TimeoutError: # Expected timeout - continue loop continue except Exception as e: logger.error(f"Error in monitoring loop: {e}") await asyncio.sleep(5)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server