Skip to main content
Glama
process.py125 kB
""" Process Registry Manager for Shannon MCP Server. This module provides system-wide process tracking and management with: - Process lifecycle tracking - PID management and validation - Resource monitoring - Cleanup routines - Cross-platform process management """ import asyncio import os import signal import psutil import json import uuid from pathlib import Path from typing import Optional, Dict, Any, List, Set from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import Enum import structlog from .base import BaseManager, ManagerConfig, HealthStatus from ..utils.config import get_config from ..utils.errors import ( SystemError, ValidationError, ConfigurationError, handle_errors, error_context, ErrorRecovery ) from ..utils.notifications import emit, EventCategory, EventPriority, event_handler from ..utils.shutdown import track_request_lifetime, register_shutdown_handler, ShutdownPhase from ..utils.logging import get_logger logger = get_logger("shannon-mcp.process") class ProcessStatus(Enum): """Process lifecycle status.""" STARTING = "starting" RUNNING = "running" STOPPING = "stopping" STOPPED = "stopped" ORPHANED = "orphaned" FAILED = "failed" class ProcessType(Enum): """Type of process being tracked.""" SESSION = "session" # Claude Code session process SERVER = "server" # MCP server process WORKER = "worker" # Background worker process AGENT = "agent" # AI agent process HOOK = "hook" # Hook execution process UTILITY = "utility" # Utility/maintenance process @dataclass class ProcessPIDInfo: """Detailed PID information for tracking and validation.""" pid: int ppid: Optional[int] = None # Parent PID creation_time: Optional[datetime] = None command_line: str = "" executable_path: str = "" platform_info: Dict[str, Any] = field(default_factory=dict) @classmethod def from_psutil(cls, proc: psutil.Process) -> 'ProcessPIDInfo': """Create PID info from psutil process.""" try: pid_info = cls(pid=proc.pid) # Get process info safely try: pid_info.ppid = proc.ppid() except (psutil.NoSuchProcess, psutil.AccessDenied): pass try: pid_info.creation_time = datetime.fromtimestamp(proc.create_time()) except (psutil.NoSuchProcess, psutil.AccessDenied): pass try: pid_info.command_line = ' '.join(proc.cmdline()) except (psutil.NoSuchProcess, psutil.AccessDenied): pass try: pid_info.executable_path = proc.exe() except (psutil.NoSuchProcess, psutil.AccessDenied): pass # Platform-specific info try: pid_info.platform_info = { "username": proc.username(), "status": proc.status(), "nice": proc.nice(), "num_threads": proc.num_threads() } except (psutil.NoSuchProcess, psutil.AccessDenied): pass return pid_info except Exception: # Return minimal info if everything fails return cls(pid=proc.pid) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "pid": self.pid, "ppid": self.ppid, "creation_time": self.creation_time.isoformat() if self.creation_time else None, "command_line": self.command_line, "executable_path": self.executable_path, "platform_info": self.platform_info } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'ProcessPIDInfo': """Create from dictionary.""" return cls( pid=data["pid"], ppid=data.get("ppid"), creation_time=datetime.fromisoformat(data["creation_time"]) if data.get("creation_time") else None, command_line=data.get("command_line", ""), executable_path=data.get("executable_path", ""), platform_info=data.get("platform_info", {}) ) @dataclass class PIDEvent: """PID lifecycle event for audit trail.""" id: str pid: int event_type: str # "created", "terminated", "orphaned", "reused", "collision", "validated" timestamp: datetime = field(default_factory=datetime.utcnow) process_id: Optional[str] = None details: Dict[str, Any] = field(default_factory=dict) def __post_init__(self): """Generate event ID if not provided.""" if not self.id: self.id = f"pid_event_{uuid.uuid4().hex[:12]}" @dataclass class ProcessConstraints: """Resource and security constraints for process validation.""" max_memory_mb: Optional[int] = None max_cpu_percent: Optional[float] = None max_file_descriptors: Optional[int] = None max_connections: Optional[int] = None allowed_users: Optional[List[str]] = None allowed_groups: Optional[List[str]] = None allowed_working_dirs: Optional[List[str]] = None blocked_executables: Optional[List[str]] = None required_capabilities: Optional[List[str]] = None max_child_processes: Optional[int] = None max_uptime_hours: Optional[float] = None @dataclass class ProcessValidationResult: """Result of process validation.""" process_id: str pid: int is_valid: bool validation_time: datetime = field(default_factory=datetime.utcnow) # Validation category results integrity_valid: bool = True resource_valid: bool = True security_valid: bool = True lifecycle_valid: bool = True # Detailed findings warnings: List[str] = field(default_factory=list) errors: List[str] = field(default_factory=list) metrics: Dict[str, Any] = field(default_factory=dict) def add_error(self, category: str, message: str): """Add validation error.""" self.errors.append(f"[{category}] {message}") self.is_valid = False # Update category validity if category.lower() in ["integrity", "process"]: self.integrity_valid = False elif category.lower() in ["resource", "memory", "cpu"]: self.resource_valid = False elif category.lower() in ["security", "permission", "user"]: self.security_valid = False elif category.lower() in ["lifecycle", "state", "parent"]: self.lifecycle_valid = False def add_warning(self, category: str, message: str): """Add validation warning.""" self.warnings.append(f"[{category}] {message}") def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "process_id": self.process_id, "pid": self.pid, "is_valid": self.is_valid, "validation_time": self.validation_time.isoformat(), "integrity_valid": self.integrity_valid, "resource_valid": self.resource_valid, "security_valid": self.security_valid, "lifecycle_valid": self.lifecycle_valid, "warnings": self.warnings, "errors": self.errors, "metrics": self.metrics } def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "id": self.id, "pid": self.pid, "event_type": self.event_type, "timestamp": self.timestamp.isoformat(), "process_id": self.process_id, "details": self.details } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'PIDEvent': """Create from dictionary.""" return cls( id=data["id"], pid=data["pid"], event_type=data["event_type"], timestamp=datetime.fromisoformat(data["timestamp"]), process_id=data.get("process_id"), details=data.get("details", {}) ) @dataclass class ResourceLimits: """Resource usage limits and thresholds.""" max_cpu_percent: float = 90.0 max_memory_mb: float = 1024.0 # 1GB default max_file_descriptors: int = 1024 max_threads: int = 100 max_connections: int = 100 max_disk_io_mb_per_sec: float = 100.0 # Alert thresholds (percentage of max) cpu_alert_threshold: float = 0.8 # 80% of max memory_alert_threshold: float = 0.8 # 80% of max fd_alert_threshold: float = 0.8 # 80% of max def is_cpu_exceeded(self, cpu_percent: float) -> bool: """Check if CPU usage exceeds limit.""" return cpu_percent > self.max_cpu_percent def is_memory_exceeded(self, memory_mb: float) -> bool: """Check if memory usage exceeds limit.""" return memory_mb > self.max_memory_mb def is_fd_exceeded(self, fd_count: int) -> bool: """Check if file descriptor count exceeds limit.""" return fd_count > self.max_file_descriptors def should_alert_cpu(self, cpu_percent: float) -> bool: """Check if CPU usage should trigger alert.""" return cpu_percent > (self.max_cpu_percent * self.cpu_alert_threshold) def should_alert_memory(self, memory_mb: float) -> bool: """Check if memory usage should trigger alert.""" return memory_mb > (self.max_memory_mb * self.memory_alert_threshold) @dataclass class ProcessResourceHistory: """Historical resource usage tracking.""" cpu_history: List[float] = field(default_factory=list) memory_history: List[float] = field(default_factory=list) timestamp_history: List[datetime] = field(default_factory=list) max_history_size: int = 100 # Keep last 100 measurements def add_measurement(self, cpu_percent: float, memory_mb: float) -> None: """Add a resource measurement.""" now = datetime.utcnow() self.cpu_history.append(cpu_percent) self.memory_history.append(memory_mb) self.timestamp_history.append(now) # Trim history if too large if len(self.cpu_history) > self.max_history_size: self.cpu_history.pop(0) self.memory_history.pop(0) self.timestamp_history.pop(0) def get_cpu_trend(self) -> str: """Get CPU usage trend.""" if len(self.cpu_history) < 3: return "insufficient_data" recent = sum(self.cpu_history[-3:]) / 3 older = sum(self.cpu_history[-6:-3]) / 3 if len(self.cpu_history) >= 6 else recent if recent > older * 1.1: return "increasing" elif recent < older * 0.9: return "decreasing" else: return "stable" def get_average_cpu(self, last_n: int = 10) -> float: """Get average CPU usage over last N measurements.""" if not self.cpu_history: return 0.0 recent_measurements = self.cpu_history[-last_n:] return sum(recent_measurements) / len(recent_measurements) def get_peak_memory(self) -> float: """Get peak memory usage.""" return max(self.memory_history) if self.memory_history else 0.0 @dataclass class ProcessMetrics: """Process resource usage metrics with comprehensive monitoring.""" # Basic metrics cpu_percent: float = 0.0 memory_mb: float = 0.0 file_descriptors: int = 0 threads: int = 0 # Extended metrics memory_vms_mb: float = 0.0 # Virtual memory size memory_shared_mb: float = 0.0 # Shared memory network_connections: int = 0 disk_read_mb: float = 0.0 disk_write_mb: float = 0.0 disk_io_mb_per_sec: float = 0.0 open_files: int = 0 # Context switches and system calls ctx_switches_voluntary: int = 0 ctx_switches_involuntary: int = 0 # Timestamps start_time: datetime = field(default_factory=datetime.utcnow) last_updated: datetime = field(default_factory=datetime.utcnow) last_disk_io_time: Optional[datetime] = None # Historical tracking history: ProcessResourceHistory = field(default_factory=ProcessResourceHistory) # Resource limits limits: ResourceLimits = field(default_factory=ResourceLimits) # Alert tracking alert_count: int = 0 last_alert_time: Optional[datetime] = None def update_from_psutil(self, proc: psutil.Process) -> None: """Update metrics from psutil process with comprehensive data collection.""" try: now = datetime.utcnow() # Basic metrics self.cpu_percent = proc.cpu_percent() memory_info = proc.memory_info() self.memory_mb = memory_info.rss / 1024 / 1024 self.memory_vms_mb = memory_info.vms / 1024 / 1024 # Memory details (platform-dependent) try: memory_full_info = proc.memory_full_info() if hasattr(memory_full_info, 'shared'): self.memory_shared_mb = memory_full_info.shared / 1024 / 1024 except (AttributeError, psutil.AccessDenied): pass # File descriptors and threads self.file_descriptors = proc.num_fds() if hasattr(proc, 'num_fds') else 0 self.threads = proc.num_threads() # Network connections try: connections = proc.connections() self.network_connections = len(connections) except (psutil.AccessDenied, psutil.NoSuchProcess): pass # Disk I/O try: io_counters = proc.io_counters() if io_counters: # Calculate I/O rate if we have previous measurement if self.last_disk_io_time: time_delta = (now - self.last_disk_io_time).total_seconds() if time_delta > 0: read_delta = (io_counters.read_bytes / 1024 / 1024) - self.disk_read_mb write_delta = (io_counters.write_bytes / 1024 / 1024) - self.disk_write_mb self.disk_io_mb_per_sec = (read_delta + write_delta) / time_delta self.disk_read_mb = io_counters.read_bytes / 1024 / 1024 self.disk_write_mb = io_counters.write_bytes / 1024 / 1024 self.last_disk_io_time = now except (psutil.AccessDenied, psutil.NoSuchProcess): pass # Open files try: open_files = proc.open_files() self.open_files = len(open_files) except (psutil.AccessDenied, psutil.NoSuchProcess): pass # Context switches try: ctx_switches = proc.num_ctx_switches() if ctx_switches: self.ctx_switches_voluntary = ctx_switches.voluntary self.ctx_switches_involuntary = ctx_switches.involuntary except (psutil.AccessDenied, psutil.NoSuchProcess): pass # Update history self.history.add_measurement(self.cpu_percent, self.memory_mb) # Update timestamp self.last_updated = now except (psutil.NoSuchProcess, psutil.AccessDenied): pass def check_resource_limits(self) -> List[str]: """Check if any resource limits are exceeded. Returns: List of violated limit descriptions """ violations = [] if self.limits.is_cpu_exceeded(self.cpu_percent): violations.append(f"CPU usage {self.cpu_percent:.1f}% exceeds limit {self.limits.max_cpu_percent:.1f}%") if self.limits.is_memory_exceeded(self.memory_mb): violations.append(f"Memory usage {self.memory_mb:.1f}MB exceeds limit {self.limits.max_memory_mb:.1f}MB") if self.limits.is_fd_exceeded(self.file_descriptors): violations.append(f"File descriptors {self.file_descriptors} exceeds limit {self.limits.max_file_descriptors}") if self.threads > self.limits.max_threads: violations.append(f"Thread count {self.threads} exceeds limit {self.limits.max_threads}") if self.network_connections > self.limits.max_connections: violations.append(f"Network connections {self.network_connections} exceeds limit {self.limits.max_connections}") if self.disk_io_mb_per_sec > self.limits.max_disk_io_mb_per_sec: violations.append(f"Disk I/O {self.disk_io_mb_per_sec:.1f}MB/s exceeds limit {self.limits.max_disk_io_mb_per_sec:.1f}MB/s") return violations def should_trigger_alerts(self) -> List[str]: """Check if resource usage should trigger alerts. Returns: List of alert conditions """ alerts = [] if self.limits.should_alert_cpu(self.cpu_percent): alerts.append(f"CPU usage {self.cpu_percent:.1f}% approaching limit") if self.limits.should_alert_memory(self.memory_mb): alerts.append(f"Memory usage {self.memory_mb:.1f}MB approaching limit") if self.file_descriptors > (self.limits.max_file_descriptors * self.limits.fd_alert_threshold): alerts.append(f"File descriptor usage {self.file_descriptors} approaching limit") return alerts def get_resource_summary(self) -> Dict[str, Any]: """Get comprehensive resource usage summary.""" return { "cpu": { "current_percent": self.cpu_percent, "trend": self.history.get_cpu_trend(), "average_10min": self.history.get_average_cpu(), "limit": self.limits.max_cpu_percent }, "memory": { "rss_mb": self.memory_mb, "vms_mb": self.memory_vms_mb, "shared_mb": self.memory_shared_mb, "peak_mb": self.history.get_peak_memory(), "limit_mb": self.limits.max_memory_mb }, "io": { "file_descriptors": self.file_descriptors, "open_files": self.open_files, "network_connections": self.network_connections, "disk_read_mb": self.disk_read_mb, "disk_write_mb": self.disk_write_mb, "disk_io_rate_mb_per_sec": self.disk_io_mb_per_sec }, "system": { "threads": self.threads, "ctx_switches_voluntary": self.ctx_switches_voluntary, "ctx_switches_involuntary": self.ctx_switches_involuntary }, "alerts": { "alert_count": self.alert_count, "last_alert": self.last_alert_time.isoformat() if self.last_alert_time else None } } def to_dict(self) -> Dict[str, Any]: """Convert to dictionary with comprehensive metrics.""" return { "cpu_percent": self.cpu_percent, "memory_mb": self.memory_mb, "memory_vms_mb": self.memory_vms_mb, "memory_shared_mb": self.memory_shared_mb, "file_descriptors": self.file_descriptors, "threads": self.threads, "network_connections": self.network_connections, "disk_read_mb": self.disk_read_mb, "disk_write_mb": self.disk_write_mb, "disk_io_mb_per_sec": self.disk_io_mb_per_sec, "open_files": self.open_files, "ctx_switches_voluntary": self.ctx_switches_voluntary, "ctx_switches_involuntary": self.ctx_switches_involuntary, "start_time": self.start_time.isoformat(), "last_updated": self.last_updated.isoformat(), "resource_summary": self.get_resource_summary(), "violations": self.check_resource_limits(), "alert_triggers": self.should_trigger_alerts() } @dataclass class ProcessRecord: """Process registry record.""" process_id: str pid: int process_type: ProcessType parent_process_id: Optional[str] = None session_id: Optional[str] = None binary_path: Optional[str] = None command_line: Optional[str] = None start_time: datetime = field(default_factory=datetime.utcnow) last_heartbeat: Optional[datetime] = None status: ProcessStatus = ProcessStatus.STARTING metadata: Dict[str, Any] = field(default_factory=dict) metrics: ProcessMetrics = field(default_factory=ProcessMetrics) pid_info: Optional[ProcessPIDInfo] = None # Enhanced PID tracking created_at: datetime = field(default_factory=datetime.utcnow) updated_at: datetime = field(default_factory=datetime.utcnow) def __post_init__(self): """Initialize process ID if not provided.""" if not self.process_id: self.process_id = f"proc_{uuid.uuid4().hex[:12]}" def update_heartbeat(self) -> None: """Update the last heartbeat timestamp.""" self.last_heartbeat = datetime.utcnow() self.updated_at = self.last_heartbeat def is_alive(self) -> bool: """Check if process is still alive.""" try: # Check if PID exists and is accessible os.kill(self.pid, 0) return True except (OSError, ProcessLookupError): return False def get_psutil_process(self) -> Optional[psutil.Process]: """Get psutil process object.""" try: return psutil.Process(self.pid) except (psutil.NoSuchProcess, psutil.AccessDenied): return None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "process_id": self.process_id, "pid": self.pid, "process_type": self.process_type.value, "parent_process_id": self.parent_process_id, "session_id": self.session_id, "binary_path": self.binary_path, "command_line": self.command_line, "start_time": self.start_time.isoformat(), "last_heartbeat": self.last_heartbeat.isoformat() if self.last_heartbeat else None, "status": self.status.value, "metadata": self.metadata, "metrics": self.metrics.to_dict(), "pid_info": self.pid_info.to_dict() if self.pid_info else None, "created_at": self.created_at.isoformat(), "updated_at": self.updated_at.isoformat() } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'ProcessRecord': """Create from dictionary.""" metrics_data = data.get("metrics", {}) metrics = ProcessMetrics( cpu_percent=metrics_data.get("cpu_percent", 0.0), memory_mb=metrics_data.get("memory_mb", 0.0), file_descriptors=metrics_data.get("file_descriptors", 0), threads=metrics_data.get("threads", 0), start_time=datetime.fromisoformat(metrics_data.get("start_time", datetime.utcnow().isoformat())), last_updated=datetime.fromisoformat(metrics_data.get("last_updated", datetime.utcnow().isoformat())) ) # Parse PID info if available pid_info = None if data.get("pid_info"): pid_info = ProcessPIDInfo.from_dict(data["pid_info"]) return cls( process_id=data["process_id"], pid=data["pid"], process_type=ProcessType(data["process_type"]), parent_process_id=data.get("parent_process_id"), session_id=data.get("session_id"), binary_path=data.get("binary_path"), command_line=data.get("command_line"), start_time=datetime.fromisoformat(data["start_time"]), last_heartbeat=datetime.fromisoformat(data["last_heartbeat"]) if data.get("last_heartbeat") else None, status=ProcessStatus(data["status"]), metadata=data.get("metadata", {}), metrics=metrics, pid_info=pid_info, created_at=datetime.fromisoformat(data["created_at"]), updated_at=datetime.fromisoformat(data["updated_at"]) ) class ProcessManager(BaseManager[ProcessRecord]): """Manages system-wide process registry.""" def __init__(self): """Initialize process manager.""" config = get_config() manager_config = ManagerConfig( name="process_manager", db_path=Path.home() / ".shannon-mcp" / "process_registry.db", custom_config={} ) super().__init__(manager_config) # In-memory process registry self._processes: Dict[str, ProcessRecord] = {} self._pid_to_process_id: Dict[int, str] = {} self._registry_lock = asyncio.Lock() # Enhanced PID tracking self._pid_events: List[PIDEvent] = [] # In-memory PID event buffer self._pid_audit_lock = asyncio.Lock() # PID file directory self._pid_dir = Path.home() / ".shannon-mcp" / "pids" self._pid_dir.mkdir(parents=True, exist_ok=True) # Cleanup scheduling self._last_cleanup = datetime.utcnow() self._cleanup_interval = 3600 # 1 hour between comprehensive cleanups # Monitoring settings self._monitoring_interval = 30.0 # seconds self._heartbeat_timeout = 300.0 # 5 minutes self._cleanup_age = 3600.0 # 1 hour for stopped processes # Register shutdown handler register_shutdown_handler( "process_manager", self._shutdown_processes, phase=ShutdownPhase.STOP_WORKERS, timeout=30.0 ) async def _initialize(self) -> None: """Initialize process manager.""" logger.info("initializing_process_manager") # Clean up stale PID files on startup await self._cleanup_stale_pid_files() # Load active processes from database await self._load_active_processes() # Validate loaded processes await self._validate_loaded_processes() async def _start(self) -> None: """Start process manager operations.""" # Start process monitoring self._tasks.append( asyncio.create_task(self._monitor_processes()) ) # Start resource monitoring self._tasks.append( asyncio.create_task(self._monitor_resources()) ) async def _stop(self) -> None: """Stop process manager operations.""" # Gracefully terminate tracked processes await self._shutdown_processes() async def _health_check(self) -> Dict[str, Any]: """Perform health check.""" total_processes = len(self._processes) running_processes = sum( 1 for p in self._processes.values() if p.status == ProcessStatus.RUNNING ) orphaned_processes = sum( 1 for p in self._processes.values() if p.status == ProcessStatus.ORPHANED ) # Check system resource usage system_cpu = psutil.cpu_percent() system_memory = psutil.virtual_memory().percent return { "total_processes": total_processes, "running_processes": running_processes, "orphaned_processes": orphaned_processes, "system_cpu_percent": system_cpu, "system_memory_percent": system_memory, "pid_files": len(list(self._pid_dir.glob("*.pid"))), "monitoring_active": not self._stop_event.is_set(), "pid_tracking_enabled": True, "pid_audit_events": await self._count_pid_events() } async def _create_schema(self) -> None: """Create database schema.""" await self.db.execute(""" CREATE TABLE IF NOT EXISTS process_registry ( process_id TEXT PRIMARY KEY, pid INTEGER NOT NULL, process_type TEXT NOT NULL, parent_process_id TEXT, session_id TEXT, binary_path TEXT, command_line TEXT, start_time TEXT NOT NULL, last_heartbeat TEXT, status TEXT NOT NULL, metadata TEXT, metrics TEXT, pid_info TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # Create PID audit trail table await self.db.execute(""" CREATE TABLE IF NOT EXISTS pid_audit_trail ( id TEXT PRIMARY KEY, pid INTEGER NOT NULL, event_type TEXT NOT NULL, timestamp TEXT NOT NULL, process_id TEXT, details TEXT ) """) # Create indexes for performance await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_process_registry_pid ON process_registry(pid) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_process_registry_status ON process_registry(status) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_process_registry_type ON process_registry(process_type) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_process_registry_session ON process_registry(session_id) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_process_registry_heartbeat ON process_registry(last_heartbeat) """) # Create indexes for PID audit trail await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_pid_audit_pid ON pid_audit_trail(pid) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_pid_audit_timestamp ON pid_audit_trail(timestamp) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_pid_audit_event_type ON pid_audit_trail(event_type) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_pid_audit_process_id ON pid_audit_trail(process_id) """) # Create validation results table await self.db.execute(""" CREATE TABLE IF NOT EXISTS validation_results ( id TEXT PRIMARY KEY, process_id TEXT NOT NULL, pid INTEGER NOT NULL, is_valid BOOLEAN NOT NULL, validation_time TEXT NOT NULL, integrity_valid BOOLEAN NOT NULL, resource_valid BOOLEAN NOT NULL, security_valid BOOLEAN NOT NULL, lifecycle_valid BOOLEAN NOT NULL, warnings TEXT, errors TEXT, metrics TEXT, created_at TEXT NOT NULL, FOREIGN KEY (process_id) REFERENCES process_registry(process_id) ) """) # Create indexes for validation results await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_validation_results_process_id ON validation_results(process_id) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_validation_results_pid ON validation_results(pid) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_validation_results_validation_time ON validation_results(validation_time) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_validation_results_is_valid ON validation_results(is_valid) """) @track_request_lifetime async def register_process( self, pid: int, process_type: ProcessType, binary_path: Optional[str] = None, command_line: Optional[str] = None, session_id: Optional[str] = None, parent_process_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> ProcessRecord: """ Register a new process in the registry. Args: pid: Process ID process_type: Type of process binary_path: Path to binary command_line: Full command line session_id: Associated session ID parent_process_id: Parent process ID metadata: Additional metadata Returns: Process record Raises: ValidationError: If process validation fails SystemError: If registration fails """ async with self._registry_lock: # Validate PID if not self._validate_pid(pid): raise ValidationError("pid", pid, "Process not found or not accessible") # Enhanced PID collision detection if pid in self._pid_to_process_id: existing_process_id = self._pid_to_process_id[pid] existing_record = self._processes.get(existing_process_id) if existing_record and existing_record.is_alive(): # Check for PID reuse by comparing metadata if await self._detect_pid_reuse(pid): # PID has been reused, resolve collision await self._resolve_pid_collision(pid, existing_process_id) else: logger.warning( "process_already_registered", pid=pid, existing_process_id=existing_process_id ) return existing_record with error_context("process_manager", "register_process", pid=pid): # Get detailed PID information for tracking pid_info = await self._get_detailed_pid_info(pid) # Create process record process_record = ProcessRecord( process_id="", # Will be auto-generated pid=pid, process_type=process_type, parent_process_id=parent_process_id, session_id=session_id, binary_path=binary_path, command_line=command_line, metadata=metadata or {}, status=ProcessStatus.STARTING, pid_info=pid_info # Enhanced PID tracking ) # Update metrics from psutil psutil_proc = process_record.get_psutil_process() if psutil_proc: process_record.metrics.update_from_psutil(psutil_proc) process_record.status = ProcessStatus.RUNNING # Store in memory self._processes[process_record.process_id] = process_record self._pid_to_process_id[pid] = process_record.process_id # Log PID event await self._log_pid_event( pid, "created", process_id=process_record.process_id, details={ "process_type": process_type.value, "binary_path": binary_path or "unknown", "command_line": command_line or "unknown" } ) # Save to database await self._save_process(process_record) # Create PID file await self._create_pid_file(process_record) # Emit event await emit( "process_registered", EventCategory.SYSTEM, { "process_id": process_record.process_id, "pid": pid, "process_type": process_type.value }, priority=EventPriority.MEDIUM ) logger.info( "process_registered", process_id=process_record.process_id, pid=pid, process_type=process_type.value ) return process_record async def unregister_process(self, process_id: str) -> None: """ Unregister a process from the registry. Args: process_id: Process ID to unregister Raises: ValidationError: If process not found """ async with self._registry_lock: process_record = self._processes.get(process_id) if not process_record: raise ValidationError("process_id", process_id, "Process not found") with error_context("process_manager", "unregister_process", process_id=process_id): # Update status to stopped process_record.status = ProcessStatus.STOPPED process_record.updated_at = datetime.utcnow() # Remove from memory self._processes.pop(process_id, None) self._pid_to_process_id.pop(process_record.pid, None) # Update database await self._save_process(process_record) # Clean up PID file await self._remove_pid_file(process_record) # Emit event await emit( "process_unregistered", EventCategory.SYSTEM, { "process_id": process_id, "pid": process_record.pid } ) logger.info( "process_unregistered", process_id=process_id, pid=process_record.pid ) async def get_process(self, process_id: str) -> Optional[ProcessRecord]: """Get process by ID.""" return self._processes.get(process_id) async def get_process_by_pid(self, pid: int) -> Optional[ProcessRecord]: """Get process by PID.""" process_id = self._pid_to_process_id.get(pid) if process_id: return self._processes.get(process_id) return None async def list_processes( self, process_type: Optional[ProcessType] = None, status: Optional[ProcessStatus] = None, session_id: Optional[str] = None ) -> List[ProcessRecord]: """ List processes with optional filtering. Args: process_type: Filter by process type status: Filter by status session_id: Filter by session ID Returns: List of process records """ processes = list(self._processes.values()) if process_type: processes = [p for p in processes if p.process_type == process_type] if status: processes = [p for p in processes if p.status == status] if session_id: processes = [p for p in processes if p.session_id == session_id] # Sort by creation time, newest first processes.sort(key=lambda p: p.created_at, reverse=True) return processes async def update_heartbeat(self, process_id: str) -> None: """ Update process heartbeat. Args: process_id: Process ID Raises: ValidationError: If process not found """ process_record = self._processes.get(process_id) if not process_record: raise ValidationError("process_id", process_id, "Process not found") process_record.update_heartbeat() # Update database periodically (not every heartbeat for performance) now = datetime.utcnow() if not hasattr(process_record, '_last_db_update') or \ (now - getattr(process_record, '_last_db_update', now)).total_seconds() > 60: await self._save_process(process_record) setattr(process_record, '_last_db_update', now) async def terminate_process( self, process_id: str, force: bool = False, timeout: float = 10.0 ) -> None: """ Terminate a process gracefully or forcefully. Args: process_id: Process ID force: Force termination with SIGKILL timeout: Timeout for graceful termination Raises: ValidationError: If process not found SystemError: If termination fails """ process_record = self._processes.get(process_id) if not process_record: raise ValidationError("process_id", process_id, "Process not found") if not process_record.is_alive(): logger.warning( "process_already_dead", process_id=process_id, pid=process_record.pid ) process_record.status = ProcessStatus.STOPPED await self._save_process(process_record) return with error_context("process_manager", "terminate_process", process_id=process_id): process_record.status = ProcessStatus.STOPPING await self._save_process(process_record) try: if force: # Force kill if os.name == 'nt': os.kill(process_record.pid, signal.SIGTERM) else: os.killpg(os.getpgid(process_record.pid), signal.SIGKILL) else: # Graceful termination if os.name == 'nt': os.kill(process_record.pid, signal.SIGTERM) else: os.killpg(os.getpgid(process_record.pid), signal.SIGTERM) # Wait for graceful shutdown for _ in range(int(timeout * 10)): # Check every 100ms if not process_record.is_alive(): break await asyncio.sleep(0.1) else: # Force kill after timeout logger.warning( "graceful_termination_timeout", process_id=process_id, pid=process_record.pid ) if os.name == 'nt': os.kill(process_record.pid, signal.SIGTERM) else: os.killpg(os.getpgid(process_record.pid), signal.SIGKILL) process_record.status = ProcessStatus.STOPPED await self._save_process(process_record) # Emit event await emit( "process_terminated", EventCategory.SYSTEM, { "process_id": process_id, "pid": process_record.pid, "force": force } ) logger.info( "process_terminated", process_id=process_id, pid=process_record.pid, force=force ) except (OSError, ProcessLookupError) as e: logger.error( "process_termination_failed", process_id=process_id, pid=process_record.pid, error=str(e) ) raise SystemError(f"Failed to terminate process {process_record.pid}: {e}") from e # Helper methods def _validate_pid(self, pid: int) -> bool: """Validate if PID exists and is accessible.""" try: os.kill(pid, 0) return True except (OSError, ProcessLookupError): return False async def _save_process(self, process_record: ProcessRecord) -> None: """Save process record to database.""" await self.db.execute(""" INSERT OR REPLACE INTO process_registry (process_id, pid, process_type, parent_process_id, session_id, binary_path, command_line, start_time, last_heartbeat, status, metadata, metrics, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( process_record.process_id, process_record.pid, process_record.process_type.value, process_record.parent_process_id, process_record.session_id, process_record.binary_path, process_record.command_line, process_record.start_time.isoformat(), process_record.last_heartbeat.isoformat() if process_record.last_heartbeat else None, process_record.status.value, json.dumps(process_record.metadata), json.dumps(process_record.metrics.to_dict()), process_record.created_at.isoformat(), process_record.updated_at.isoformat() )) await self.db.commit() async def _load_active_processes(self) -> None: """Load active processes from database.""" async with self.db.execute(""" SELECT * FROM process_registry WHERE status IN ('starting', 'running', 'stopping') """) as cursor: rows = await cursor.fetchall() for row in rows: try: # Convert row to dict process_data = { "process_id": row[0], "pid": row[1], "process_type": row[2], "parent_process_id": row[3], "session_id": row[4], "binary_path": row[5], "command_line": row[6], "start_time": row[7], "last_heartbeat": row[8], "status": row[9], "metadata": json.loads(row[10]) if row[10] else {}, "metrics": json.loads(row[11]) if row[11] else {}, "created_at": row[12], "updated_at": row[13] } # Create process record process_record = ProcessRecord.from_dict(process_data) # Store in memory self._processes[process_record.process_id] = process_record self._pid_to_process_id[process_record.pid] = process_record.process_id except Exception as e: logger.error( "failed_to_load_process", process_id=row[0] if len(row) > 0 else "unknown", error=str(e) ) logger.info( "processes_loaded_from_database", count=len(self._processes) ) async def _validate_loaded_processes(self) -> None: """Validate loaded processes and mark orphaned ones.""" orphaned_count = 0 for process_record in list(self._processes.values()): if not process_record.is_alive(): logger.warning( "orphaned_process_detected", process_id=process_record.process_id, pid=process_record.pid ) process_record.status = ProcessStatus.ORPHANED await self._save_process(process_record) orphaned_count += 1 if orphaned_count > 0: logger.info( "orphaned_processes_detected", count=orphaned_count ) async def _create_pid_file(self, process_record: ProcessRecord) -> None: """Create PID file for process.""" pid_file = self._pid_dir / f"{process_record.process_id}.pid" try: with pid_file.open('w') as f: json.dump({ "process_id": process_record.process_id, "pid": process_record.pid, "process_type": process_record.process_type.value, "created_at": process_record.created_at.isoformat() }, f, indent=2) logger.debug( "pid_file_created", process_id=process_record.process_id, pid_file=str(pid_file) ) except Exception as e: logger.error( "pid_file_creation_failed", process_id=process_record.process_id, pid_file=str(pid_file), error=str(e) ) async def _remove_pid_file(self, process_record: ProcessRecord) -> None: """Remove PID file for process.""" pid_file = self._pid_dir / f"{process_record.process_id}.pid" try: if pid_file.exists(): pid_file.unlink() logger.debug( "pid_file_removed", process_id=process_record.process_id, pid_file=str(pid_file) ) except Exception as e: logger.error( "pid_file_removal_failed", process_id=process_record.process_id, pid_file=str(pid_file), error=str(e) ) async def _cleanup_stale_pid_files(self) -> None: """Clean up stale PID files on startup.""" if not self._pid_dir.exists(): return cleaned_count = 0 for pid_file in self._pid_dir.glob("*.pid"): try: with pid_file.open('r') as f: data = json.load(f) pid = data.get("pid") if pid and not self._validate_pid(pid): # Process is dead, remove stale PID file pid_file.unlink() cleaned_count += 1 logger.debug( "stale_pid_file_removed", pid_file=str(pid_file), pid=pid ) except Exception as e: # Invalid PID file, remove it try: pid_file.unlink() cleaned_count += 1 logger.debug( "invalid_pid_file_removed", pid_file=str(pid_file), error=str(e) ) except: pass if cleaned_count > 0: logger.info( "stale_pid_files_cleaned", count=cleaned_count ) # Monitoring methods async def _monitor_processes(self) -> None: """Monitor process health and status.""" while not self._stop_event.is_set(): try: await asyncio.sleep(self._monitoring_interval) if self._stop_event.is_set(): break now = datetime.utcnow() # Check for dead processes dead_processes = [] orphaned_processes = [] stale_processes = [] for process_id, process_record in list(self._processes.items()): # Check if process is still alive if not process_record.is_alive(): if process_record.status in (ProcessStatus.RUNNING, ProcessStatus.STARTING): # Process died unexpectedly process_record.status = ProcessStatus.ORPHANED orphaned_processes.append(process_record) elif process_record.status == ProcessStatus.STOPPING: # Process completed shutdown process_record.status = ProcessStatus.STOPPED dead_processes.append(process_record) # Check for stale heartbeat if (process_record.last_heartbeat and (now - process_record.last_heartbeat).total_seconds() > self._heartbeat_timeout): logger.warning( "process_heartbeat_stale", process_id=process_id, pid=process_record.pid, last_heartbeat=process_record.last_heartbeat.isoformat() ) # Check for old stopped processes if (process_record.status in (ProcessStatus.STOPPED, ProcessStatus.FAILED) and (now - process_record.updated_at).total_seconds() > self._cleanup_age): stale_processes.append(process_record) # Handle dead processes for process_record in dead_processes: logger.info( "process_completed_shutdown", process_id=process_record.process_id, pid=process_record.pid ) await self._save_process(process_record) # Handle orphaned processes for process_record in orphaned_processes: logger.warning( "process_orphaned", process_id=process_record.process_id, pid=process_record.pid ) await self._save_process(process_record) # Emit orphaned process event await emit( "process_orphaned", EventCategory.SYSTEM, { "process_id": process_record.process_id, "pid": process_record.pid, "process_type": process_record.process_type.value }, priority=EventPriority.HIGH ) # Clean up stale processes for process_record in stale_processes: logger.debug( "cleaning_stale_process", process_id=process_record.process_id, pid=process_record.pid ) self._processes.pop(process_record.process_id, None) self._pid_to_process_id.pop(process_record.pid, None) await self._remove_pid_file(process_record) # Run comprehensive cleanup periodically if (now - self._last_cleanup).total_seconds() >= self._cleanup_interval: logger.info("running_scheduled_cleanup") try: cleanup_stats = await self.run_comprehensive_cleanup() self._last_cleanup = now # Log cleanup summary if any(cleanup_stats.values()): logger.info( "scheduled_cleanup_completed", **cleanup_stats ) except Exception as cleanup_error: logger.error( "scheduled_cleanup_failed", error=str(cleanup_error) ) except asyncio.CancelledError: break except Exception as e: logger.error( "process_monitoring_error", error=str(e), exc_info=True ) async def _monitor_resources(self) -> None: """Monitor resource usage of tracked processes with comprehensive validation.""" # Configurable monitoring intervals config = get_config() monitoring_interval = getattr(config, 'process_monitoring_interval', 60.0) # Default 60 seconds resource_validation_interval = getattr(config, 'resource_validation_interval', 300.0) # Default 5 minutes last_validation_time = datetime.utcnow() logger.info( "resource_monitoring_started", monitoring_interval=monitoring_interval, validation_interval=resource_validation_interval ) while not self._stop_event.is_set(): try: await asyncio.sleep(monitoring_interval) if self._stop_event.is_set(): break now = datetime.utcnow() # Comprehensive resource monitoring await self._collect_comprehensive_metrics() # Periodic resource validation and enforcement if (now - last_validation_time).total_seconds() >= resource_validation_interval: await self._validate_and_enforce_resource_limits() last_validation_time = now # Log aggregate system statistics await self._log_system_resource_summary() except asyncio.CancelledError: break except Exception as e: logger.error( "resource_monitoring_error", error=str(e), exc_info=True ) async def _collect_comprehensive_metrics(self) -> None: """Collect comprehensive resource metrics for all tracked processes.""" collection_start = datetime.utcnow() successful_collections = 0 failed_collections = 0 # Process processes in batches for performance batch_size = 50 # Process up to 50 processes at once process_items = list(self._processes.items()) for i in range(0, len(process_items), batch_size): batch = process_items[i:i + batch_size] # Collect metrics for batch for process_id, process_record in batch: try: if process_record.status == ProcessStatus.RUNNING: psutil_proc = process_record.get_psutil_process() if psutil_proc: # Update comprehensive metrics process_record.metrics.update_from_psutil(psutil_proc) successful_collections += 1 # Check for immediate alerts (critical violations) violations = process_record.metrics.check_resource_limits() if violations: await self._handle_resource_violations(process_id, process_record, violations) # Check for alert thresholds alerts = process_record.metrics.should_trigger_alerts() if alerts: await self._handle_resource_alerts(process_id, process_record, alerts) else: # Process no longer accessible, mark as orphaned process_record.status = ProcessStatus.ORPHANED failed_collections += 1 except (psutil.NoSuchProcess, psutil.AccessDenied) as e: failed_collections += 1 logger.debug( "process_metrics_collection_failed", process_id=process_id, pid=process_record.pid, error=str(e) ) except Exception as e: failed_collections += 1 logger.error( "process_metrics_collection_error", process_id=process_id, pid=process_record.pid, error=str(e) ) # Yield control between batches to prevent blocking if not self._stop_event.is_set(): await asyncio.sleep(0.1) collection_duration = (datetime.utcnow() - collection_start).total_seconds() logger.debug( "resource_metrics_collection_completed", successful_collections=successful_collections, failed_collections=failed_collections, total_processes=len(self._processes), collection_duration_seconds=collection_duration ) async def _handle_resource_violations( self, process_id: str, process_record: ProcessRecord, violations: List[str] ) -> None: """Handle processes that exceed resource limits.""" logger.warning( "resource_limits_exceeded", process_id=process_id, pid=process_record.pid, violations=violations, metrics_summary=process_record.metrics.get_resource_summary() ) # Emit violation event await emit( "process_resource_violation", EventCategory.PROCESS, { "process_id": process_id, "pid": process_record.pid, "violations": violations, "metrics": process_record.metrics.get_resource_summary() }, priority=EventPriority.HIGH ) # Update audit trail await self._add_audit_event( process_id=process_id, event_type="resource_violation", details={ "violations": violations, "metrics_snapshot": process_record.metrics.get_resource_summary() } ) # Determine action based on violation severity critical_violations = [v for v in violations if "exceeds limit" in v] if critical_violations: # For critical violations, consider process termination await self._handle_critical_resource_violations(process_id, process_record, critical_violations) # Update process validation results await self._update_process_validation_result( process_id, "resource_limits", False, f"Resource violations: {'; '.join(violations)}" ) async def _handle_critical_resource_violations( self, process_id: str, process_record: ProcessRecord, violations: List[str] ) -> None: """Handle critical resource violations that may require termination.""" config = get_config() auto_terminate_violators = getattr(config, 'auto_terminate_resource_violators', False) if auto_terminate_violators: logger.error( "terminating_process_for_resource_violations", process_id=process_id, pid=process_record.pid, violations=violations ) try: # Attempt graceful termination first await self.terminate_process(process_id, force=False, timeout=10.0) # Emit termination event await emit( "process_terminated_resource_violation", EventCategory.PROCESS, { "process_id": process_id, "pid": process_record.pid, "violations": violations, "action": "terminated" }, priority=EventPriority.CRITICAL ) except Exception as e: logger.error( "process_termination_failed", process_id=process_id, pid=process_record.pid, error=str(e) ) else: logger.warning( "critical_resource_violation_detected", process_id=process_id, pid=process_record.pid, violations=violations, note="auto_termination_disabled" ) async def _handle_resource_alerts( self, process_id: str, process_record: ProcessRecord, alerts: List[str] ) -> None: """Handle resource usage alerts.""" # Throttle alerts to prevent spam now = datetime.utcnow() if (process_record.metrics.last_alert_time and (now - process_record.metrics.last_alert_time).total_seconds() < 300): # 5 minutes return process_record.metrics.alert_count += 1 process_record.metrics.last_alert_time = now logger.info( "resource_usage_alert", process_id=process_id, pid=process_record.pid, alerts=alerts, alert_count=process_record.metrics.alert_count ) # Emit alert event await emit( "process_resource_alert", EventCategory.PROCESS, { "process_id": process_id, "pid": process_record.pid, "alerts": alerts, "alert_count": process_record.metrics.alert_count, "metrics": process_record.metrics.get_resource_summary() }, priority=EventPriority.MEDIUM ) async def _validate_and_enforce_resource_limits(self) -> None: """Validate resource usage across all processes and enforce limits.""" validation_start = datetime.utcnow() # Collect system-wide resource statistics total_processes = len([p for p in self._processes.values() if p.status == ProcessStatus.RUNNING]) system_metrics = await self._collect_system_wide_metrics() # Check system-wide resource usage await self._validate_system_resource_limits(system_metrics) # Run comprehensive validation for processes that haven't been validated recently validation_tasks = [] for process_id, process_record in self._processes.items(): if process_record.status == ProcessStatus.RUNNING: # Check if process needs validation last_validation = await self._get_last_validation_time(process_id, "resource_comprehensive") if not last_validation or (validation_start - last_validation).total_seconds() > 1800: # 30 minutes validation_tasks.append( self._comprehensive_process_resource_validation(process_id, process_record) ) if validation_tasks: # Run validations in parallel, but limit concurrency semaphore = asyncio.Semaphore(10) # Max 10 concurrent validations async def bounded_validation(task): async with semaphore: return await task bounded_tasks = [bounded_validation(task) for task in validation_tasks] results = await asyncio.gather(*bounded_tasks, return_exceptions=True) # Log validation results successful_validations = sum(1 for r in results if r is True) failed_validations = sum(1 for r in results if isinstance(r, Exception)) logger.info( "resource_validation_completed", total_validations=len(validation_tasks), successful=successful_validations, failed=failed_validations, duration_seconds=(datetime.utcnow() - validation_start).total_seconds() ) async def _collect_system_wide_metrics(self) -> Dict[str, Any]: """Collect system-wide resource metrics.""" try: # CPU usage cpu_percent = psutil.cpu_percent(interval=1.0) cpu_count = psutil.cpu_count() # Memory usage memory = psutil.virtual_memory() swap = psutil.swap_memory() # Disk usage disk_usage = psutil.disk_usage('/') disk_io = psutil.disk_io_counters() # Network usage network_io = psutil.net_io_counters() # Process counts running_processes = len([p for p in self._processes.values() if p.status == ProcessStatus.RUNNING]) total_processes = len(self._processes) return { "cpu": { "percent": cpu_percent, "count": cpu_count, "load_avg": os.getloadavg() if hasattr(os, 'getloadavg') else None }, "memory": { "total_mb": memory.total / 1024 / 1024, "available_mb": memory.available / 1024 / 1024, "used_mb": memory.used / 1024 / 1024, "percent": memory.percent, "swap_total_mb": swap.total / 1024 / 1024, "swap_used_mb": swap.used / 1024 / 1024, "swap_percent": swap.percent }, "disk": { "total_gb": disk_usage.total / 1024 / 1024 / 1024, "used_gb": disk_usage.used / 1024 / 1024 / 1024, "free_gb": disk_usage.free / 1024 / 1024 / 1024, "percent": (disk_usage.used / disk_usage.total) * 100, "read_mb": disk_io.read_bytes / 1024 / 1024 if disk_io else 0, "write_mb": disk_io.write_bytes / 1024 / 1024 if disk_io else 0 }, "network": { "bytes_sent_mb": network_io.bytes_sent / 1024 / 1024 if network_io else 0, "bytes_recv_mb": network_io.bytes_recv / 1024 / 1024 if network_io else 0, "packets_sent": network_io.packets_sent if network_io else 0, "packets_recv": network_io.packets_recv if network_io else 0 }, "processes": { "shannon_running": running_processes, "shannon_total": total_processes, "system_total": len(psutil.pids()) } } except Exception as e: logger.error( "system_metrics_collection_failed", error=str(e) ) return {} async def _validate_system_resource_limits(self, system_metrics: Dict[str, Any]) -> None: """Validate system-wide resource limits.""" config = get_config() system_limits = getattr(config, 'system_resource_limits', {}) violations = [] # Check system CPU usage max_system_cpu = system_limits.get('max_cpu_percent', 95.0) if system_metrics.get('cpu', {}).get('percent', 0) > max_system_cpu: violations.append(f"System CPU usage {system_metrics['cpu']['percent']:.1f}% exceeds limit {max_system_cpu:.1f}%") # Check system memory usage max_system_memory = system_limits.get('max_memory_percent', 90.0) memory_percent = system_metrics.get('memory', {}).get('percent', 0) if memory_percent > max_system_memory: violations.append(f"System memory usage {memory_percent:.1f}% exceeds limit {max_system_memory:.1f}%") # Check disk usage max_disk_usage = system_limits.get('max_disk_percent', 85.0) disk_percent = system_metrics.get('disk', {}).get('percent', 0) if disk_percent > max_disk_usage: violations.append(f"System disk usage {disk_percent:.1f}% exceeds limit {max_disk_usage:.1f}%") if violations: logger.warning( "system_resource_limits_exceeded", violations=violations, system_metrics=system_metrics ) # Emit system violation event await emit( "system_resource_violation", EventCategory.SYSTEM, { "violations": violations, "system_metrics": system_metrics }, priority=EventPriority.HIGH ) async def _comprehensive_process_resource_validation( self, process_id: str, process_record: ProcessRecord ) -> bool: """Perform comprehensive resource validation for a single process.""" try: validation_start = datetime.utcnow() # Get current metrics psutil_proc = process_record.get_psutil_process() if not psutil_proc: await self._update_process_validation_result( process_id, "resource_comprehensive", False, "Process not accessible for validation" ) return False # Update metrics process_record.metrics.update_from_psutil(psutil_proc) # Check all resource limits violations = process_record.metrics.check_resource_limits() alerts = process_record.metrics.should_trigger_alerts() # Analyze resource trends cpu_trend = process_record.metrics.history.get_cpu_trend() avg_cpu = process_record.metrics.history.get_average_cpu() peak_memory = process_record.metrics.history.get_peak_memory() # Determine validation result validation_passed = len(violations) == 0 # Create detailed validation result validation_details = { "violations": violations, "alerts": alerts, "trends": { "cpu_trend": cpu_trend, "average_cpu_10min": avg_cpu, "peak_memory_mb": peak_memory }, "metrics_snapshot": process_record.metrics.get_resource_summary(), "validation_duration_ms": (datetime.utcnow() - validation_start).total_seconds() * 1000 } # Update validation result await self._update_process_validation_result( process_id, "resource_comprehensive", validation_passed, f"Comprehensive resource validation: {len(violations)} violations, {len(alerts)} alerts", validation_details ) # Handle violations if any if violations: await self._handle_resource_violations(process_id, process_record, violations) return validation_passed except Exception as e: logger.error( "comprehensive_resource_validation_failed", process_id=process_id, error=str(e) ) await self._update_process_validation_result( process_id, "resource_comprehensive", False, f"Validation failed: {str(e)}" ) return False async def _log_system_resource_summary(self) -> None: """Log comprehensive system resource usage summary.""" try: # Collect process-level statistics running_processes = [p for p in self._processes.values() if p.status == ProcessStatus.RUNNING] if not running_processes: return total_cpu = sum(p.metrics.cpu_percent for p in running_processes) total_memory = sum(p.metrics.memory_mb for p in running_processes) total_file_descriptors = sum(p.metrics.file_descriptors for p in running_processes) total_threads = sum(p.metrics.threads for p in running_processes) total_connections = sum(p.metrics.network_connections for p in running_processes) # Calculate averages process_count = len(running_processes) avg_cpu = total_cpu / process_count if process_count > 0 else 0 avg_memory = total_memory / process_count if process_count > 0 else 0 # Count processes with violations and alerts processes_with_violations = sum(1 for p in running_processes if p.metrics.check_resource_limits()) processes_with_alerts = sum(1 for p in running_processes if p.metrics.should_trigger_alerts()) # Get system metrics system_metrics = await self._collect_system_wide_metrics() logger.info( "comprehensive_resource_summary", shannon_processes={ "count": process_count, "total_cpu_percent": total_cpu, "avg_cpu_percent": avg_cpu, "total_memory_mb": total_memory, "avg_memory_mb": avg_memory, "total_file_descriptors": total_file_descriptors, "total_threads": total_threads, "total_connections": total_connections, "processes_with_violations": processes_with_violations, "processes_with_alerts": processes_with_alerts }, system_resources=system_metrics ) except Exception as e: logger.error( "resource_summary_logging_failed", error=str(e) ) async def _shutdown_processes(self) -> None: """Shutdown all tracked processes.""" logger.info( "shutting_down_processes", count=len(self._processes) ) # Terminate all running processes terminate_tasks = [] for process_id, process_record in list(self._processes.items()): if process_record.status in (ProcessStatus.RUNNING, ProcessStatus.STARTING): terminate_tasks.append( self.terminate_process(process_id, force=False, timeout=5.0) ) if terminate_tasks: # Wait for graceful termination results = await asyncio.gather(*terminate_tasks, return_exceptions=True) # Log any failures for i, result in enumerate(results): if isinstance(result, Exception): logger.error( "process_shutdown_failed", task_index=i, error=str(result) ) # Enhanced PID Tracking Methods (Task 12.2) async def _get_detailed_pid_info(self, pid: int) -> Optional[ProcessPIDInfo]: """Get detailed PID information using psutil. Args: pid: Process ID to inspect Returns: Detailed PID information or None if process not found """ try: process = psutil.Process(pid) # Get basic process info info = ProcessPIDInfo( pid=pid, ppid=process.ppid(), creation_time=datetime.fromtimestamp(process.create_time()), command_line=" ".join(process.cmdline()), executable_path=process.exe() ) # Add platform-specific information platform_info = { "name": process.name(), "status": process.status(), "username": process.username(), "cpu_percent": process.cpu_percent(), "memory_percent": process.memory_percent(), "num_threads": process.num_threads(), "cwd": process.cwd() if hasattr(process, 'cwd') else None } # Add file descriptors count (Unix-like systems) try: platform_info["num_fds"] = process.num_fds() except (AttributeError, psutil.AccessDenied): pass # Add connections count try: platform_info["num_connections"] = len(process.connections()) except (psutil.AccessDenied, psutil.NoSuchProcess): pass info.platform_info = platform_info return info except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): return None except Exception as e: logger.warning( "failed_to_get_pid_info", pid=pid, error=str(e) ) return None async def _validate_pid_with_metadata(self, pid: int, expected_info: Optional[ProcessPIDInfo] = None) -> bool: """Validate PID with metadata comparison. Args: pid: Process ID to validate expected_info: Expected process information for comparison Returns: True if PID is valid and matches expected info """ current_info = await self._get_detailed_pid_info(pid) if not current_info: return False if not expected_info: return True # PID exists, no specific validation required # Compare critical fields for PID reuse detection if current_info.creation_time != expected_info.creation_time: await self._log_pid_event( pid, "reused", details={ "expected_creation": expected_info.creation_time.isoformat(), "actual_creation": current_info.creation_time.isoformat() } ) return False if current_info.ppid != expected_info.ppid: logger.warning( "pid_parent_changed", pid=pid, expected_ppid=expected_info.ppid, actual_ppid=current_info.ppid ) return True async def _detect_pid_reuse(self, pid: int) -> bool: """Detect if a PID has been reused by comparing with stored metadata. Args: pid: Process ID to check Returns: True if PID reuse is detected """ # Get stored PID info from database cursor = await self.db.execute( "SELECT pid_info FROM process_registry WHERE pid = ? ORDER BY created_at DESC LIMIT 1", (pid,) ) row = await cursor.fetchone() if not row or not row[0]: return False # No stored info to compare try: stored_info_dict = json.loads(row[0]) stored_info = ProcessPIDInfo( pid=stored_info_dict["pid"], ppid=stored_info_dict.get("ppid"), creation_time=datetime.fromisoformat(stored_info_dict["creation_time"]) if stored_info_dict.get("creation_time") else None, command_line=stored_info_dict.get("command_line", ""), executable_path=stored_info_dict.get("executable_path", ""), platform_info=stored_info_dict.get("platform_info", {}) ) except (json.JSONDecodeError, KeyError, ValueError) as e: logger.error("failed_to_parse_stored_pid_info", pid=pid, error=str(e)) return False # Compare with current PID info current_info = await self._get_detailed_pid_info(pid) if not current_info: return True # PID no longer exists, likely reused # Check creation time - most reliable indicator of PID reuse if stored_info.creation_time and current_info.creation_time: if current_info.creation_time != stored_info.creation_time: await self._log_pid_event( pid, "reused", details={ "stored_creation": stored_info.creation_time.isoformat(), "current_creation": current_info.creation_time.isoformat(), "stored_command": stored_info.command_line, "current_command": current_info.command_line } ) return True return False async def _log_pid_event(self, pid: int, event_type: str, process_id: Optional[str] = None, details: Optional[Dict[str, Any]] = None) -> str: """Log a PID-related event to the audit trail. Args: pid: Process ID event_type: Event type (created, terminated, orphaned, reused, collision) process_id: Associated process record ID details: Additional event details Returns: Event ID """ event = PIDEvent( id=f"pid_event_{uuid.uuid4().hex[:12]}", pid=pid, event_type=event_type, process_id=process_id, details=details or {} ) await self.db.execute(""" INSERT INTO pid_audit_trail (id, pid, event_type, timestamp, process_id, details) VALUES (?, ?, ?, ?, ?, ?) """, ( event.id, event.pid, event.event_type, event.timestamp.isoformat(), event.process_id, json.dumps(event.details) )) await self.db.commit() logger.info( "pid_event_logged", event_id=event.id, pid=pid, event_type=event_type, process_id=process_id ) return event.id async def _resolve_pid_collision(self, pid: int, new_process_id: str) -> bool: """Resolve PID collision by updating existing records. Args: pid: Colliding PID new_process_id: New process record ID Returns: True if collision was resolved successfully """ try: # Find existing process record with this PID cursor = await self.db.execute( "SELECT process_id, status FROM process_registry WHERE pid = ? AND status IN ('running', 'starting')", (pid,) ) existing_records = await cursor.fetchall() if not existing_records: return True # No collision to resolve # Mark existing records as terminated due to PID reuse for record in existing_records: existing_id, existing_status = record await self.db.execute(""" UPDATE process_registry SET status = 'terminated', updated_at = ?, metadata = json_set(COALESCE(metadata, '{}'), '$.termination_reason', 'pid_reused') WHERE process_id = ? """, (datetime.utcnow().isoformat(), existing_id)) # Log the collision event await self._log_pid_event( pid, "collision", details={ "existing_process_id": existing_id, "new_process_id": new_process_id, "previous_status": existing_status } ) logger.warning( "pid_collision_resolved", pid=pid, existing_process_id=existing_id, new_process_id=new_process_id ) await self.db.commit() return True except Exception as e: logger.error( "failed_to_resolve_pid_collision", pid=pid, new_process_id=new_process_id, error=str(e), exc_info=True ) return False async def _count_pid_events(self) -> int: """Count total PID audit events.""" cursor = await self.db.execute("SELECT COUNT(*) FROM pid_audit_trail") row = await cursor.fetchone() return row[0] if row else 0 # Process Validation Methods async def validate_process_integrity( self, process_id: str, result: ProcessValidationResult ) -> None: """ Validate process integrity and existence. Checks: - Process still exists and matches PID - Command line and executable haven't changed - Creation time matches (detects PID reuse) - Parent process relationship is valid """ try: process_record = self._processes.get(process_id) if not process_record: result.add_error("integrity", f"Process record not found: {process_id}") return # Check if process still exists try: proc = psutil.Process(process_record.pid) # Verify PID hasn't been reused current_pid_info = await self._get_detailed_pid_info(process_record.pid) if current_pid_info and process_record.pid_info: if current_pid_info.creation_time != process_record.pid_info.creation_time: result.add_error("integrity", f"PID {process_record.pid} has been reused") await self._log_pid_event( process_record.pid, "reused", process_id, {"old_creation_time": process_record.pid_info.creation_time.isoformat() if process_record.pid_info.creation_time else None, "new_creation_time": current_pid_info.creation_time.isoformat() if current_pid_info.creation_time else None} ) return # Verify command line matches if (process_record.pid_info and process_record.pid_info.command_line and current_pid_info and current_pid_info.command_line): if process_record.pid_info.command_line != current_pid_info.command_line: result.add_warning("integrity", f"Command line mismatch for PID {process_record.pid}") # Verify executable path if (process_record.pid_info and process_record.pid_info.executable_path and current_pid_info and current_pid_info.executable_path): if process_record.pid_info.executable_path != current_pid_info.executable_path: result.add_warning("integrity", f"Executable path changed for PID {process_record.pid}") # Check parent process relationship if current_pid_info and current_pid_info.ppid: try: parent_proc = psutil.Process(current_pid_info.ppid) result.metrics["parent_pid"] = current_pid_info.ppid result.metrics["parent_status"] = parent_proc.status() except psutil.NoSuchProcess: result.add_warning("integrity", f"Parent process {current_pid_info.ppid} no longer exists") result.metrics["process_status"] = proc.status() result.metrics["process_threads"] = proc.num_threads() except psutil.NoSuchProcess: result.add_error("integrity", f"Process {process_record.pid} no longer exists") # Mark as orphaned process_record.status = ProcessStatus.ORPHANED await self._log_pid_event(process_record.pid, "orphaned", process_id) except Exception as e: result.add_error("integrity", f"Integrity validation failed: {e}") async def validate_resource_constraints( self, process_id: str, constraints: ProcessConstraints, result: ProcessValidationResult ) -> None: """ Validate process resource usage against constraints. Checks: - Memory usage within limits - CPU usage patterns - File descriptor usage - Network connections - Child process count """ try: process_record = self._processes.get(process_id) if not process_record: result.add_error("resource", f"Process record not found: {process_id}") return try: proc = psutil.Process(process_record.pid) # Memory validation if constraints.max_memory_mb: memory_mb = proc.memory_info().rss / 1024 / 1024 result.metrics["memory_mb"] = memory_mb if memory_mb > constraints.max_memory_mb: result.add_error("resource", f"Memory usage {memory_mb:.1f}MB exceeds limit {constraints.max_memory_mb}MB") # CPU validation if constraints.max_cpu_percent: cpu_percent = proc.cpu_percent(interval=1.0) result.metrics["cpu_percent"] = cpu_percent if cpu_percent > constraints.max_cpu_percent: result.add_error("resource", f"CPU usage {cpu_percent:.1f}% exceeds limit {constraints.max_cpu_percent}%") # File descriptor validation if constraints.max_file_descriptors and hasattr(proc, 'num_fds'): try: fd_count = proc.num_fds() result.metrics["file_descriptors"] = fd_count if fd_count > constraints.max_file_descriptors: result.add_error("resource", f"File descriptor count {fd_count} exceeds limit {constraints.max_file_descriptors}") except (psutil.AccessDenied, AttributeError): result.add_warning("resource", "Cannot access file descriptor count") # Network connections validation if constraints.max_connections: try: connections = proc.connections() conn_count = len(connections) result.metrics["connections"] = conn_count if conn_count > constraints.max_connections: result.add_error("resource", f"Connection count {conn_count} exceeds limit {constraints.max_connections}") except (psutil.AccessDenied, psutil.NoSuchProcess): result.add_warning("resource", "Cannot access network connections") # Child process validation if constraints.max_child_processes: try: children = proc.children(recursive=True) child_count = len(children) result.metrics["child_processes"] = child_count if child_count > constraints.max_child_processes: result.add_error("resource", f"Child process count {child_count} exceeds limit {constraints.max_child_processes}") except (psutil.AccessDenied, psutil.NoSuchProcess): result.add_warning("resource", "Cannot access child processes") # Uptime validation if constraints.max_uptime_hours: try: create_time = datetime.fromtimestamp(proc.create_time()) uptime_hours = (datetime.utcnow() - create_time).total_seconds() / 3600 result.metrics["uptime_hours"] = uptime_hours if uptime_hours > constraints.max_uptime_hours: result.add_warning("resource", f"Process uptime {uptime_hours:.1f}h exceeds recommended limit {constraints.max_uptime_hours}h") except (psutil.AccessDenied, psutil.NoSuchProcess): result.add_warning("resource", "Cannot access process creation time") except psutil.NoSuchProcess: result.add_error("resource", f"Process {process_record.pid} no longer exists") except Exception as e: result.add_error("resource", f"Resource validation failed: {e}") async def validate_security_policies( self, process_id: str, constraints: ProcessConstraints, result: ProcessValidationResult ) -> None: """ Validate process security policies and permissions. Checks: - User/group permissions - Working directory restrictions - Executable path validation - Environment variable checks """ try: process_record = self._processes.get(process_id) if not process_record: result.add_error("security", f"Process record not found: {process_id}") return try: proc = psutil.Process(process_record.pid) # User validation if constraints.allowed_users: try: username = proc.username() result.metrics["username"] = username if username not in constraints.allowed_users: result.add_error("security", f"Process running as unauthorized user: {username}") except (psutil.AccessDenied, psutil.NoSuchProcess): result.add_warning("security", "Cannot access process username") # Working directory validation if constraints.allowed_working_dirs: try: cwd = proc.cwd() result.metrics["working_dir"] = cwd allowed = any(cwd.startswith(allowed_dir) for allowed_dir in constraints.allowed_working_dirs) if not allowed: result.add_error("security", f"Process running in unauthorized directory: {cwd}") except (psutil.AccessDenied, psutil.NoSuchProcess): result.add_warning("security", "Cannot access working directory") # Executable path validation if constraints.blocked_executables: try: exe_path = proc.exe() result.metrics["executable"] = exe_path for blocked_exe in constraints.blocked_executables: if blocked_exe in exe_path or exe_path.endswith(blocked_exe): result.add_error("security", f"Process using blocked executable: {exe_path}") except (psutil.AccessDenied, psutil.NoSuchProcess): result.add_warning("security", "Cannot access executable path") # Environment variable validation try: environ = proc.environ() suspicious_vars = ["LD_PRELOAD", "LD_LIBRARY_PATH", "DYLD_INSERT_LIBRARIES"] for var in suspicious_vars: if var in environ: result.add_warning("security", f"Suspicious environment variable detected: {var}") result.metrics["env_var_count"] = len(environ) except (psutil.AccessDenied, psutil.NoSuchProcess): result.add_warning("security", "Cannot access environment variables") # Process capabilities (Linux only) if hasattr(proc, 'uids') and os.name == 'posix': try: uids = proc.uids() if uids.effective == 0: # Running as root if not constraints.allowed_users or 'root' not in constraints.allowed_users: result.add_warning("security", "Process running with root privileges") result.metrics["effective_uid"] = uids.effective result.metrics["real_uid"] = uids.real except (psutil.AccessDenied, psutil.NoSuchProcess): result.add_warning("security", "Cannot access process UIDs") except psutil.NoSuchProcess: result.add_error("security", f"Process {process_record.pid} no longer exists") except Exception as e: result.add_error("security", f"Security validation failed: {e}") async def validate_process_lifecycle( self, process_id: str, result: ProcessValidationResult ) -> None: """ Validate process lifecycle and state consistency. Checks: - Process state consistency - Zombie process detection - Parent-child relationships - Process tree integrity """ try: process_record = self._processes.get(process_id) if not process_record: result.add_error("lifecycle", f"Process record not found: {process_id}") return try: proc = psutil.Process(process_record.pid) # Process status validation proc_status = proc.status() result.metrics["psutil_status"] = proc_status result.metrics["registry_status"] = process_record.status.value # Check for zombie processes if proc_status == psutil.STATUS_ZOMBIE: result.add_error("lifecycle", f"Process {process_record.pid} is a zombie") # Check status consistency if (process_record.status == ProcessStatus.RUNNING and proc_status not in [psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING, psutil.STATUS_IDLE]): result.add_warning("lifecycle", f"Status mismatch: registry={process_record.status.value}, actual={proc_status}") # Parent process validation try: ppid = proc.ppid() if ppid > 0: parent_proc = psutil.Process(ppid) result.metrics["parent_status"] = parent_proc.status() # Check if parent is init (process was orphaned) if ppid == 1 and process_record.pid_info and process_record.pid_info.ppid != 1: result.add_warning("lifecycle", f"Process appears to be orphaned (parent is init)") except psutil.NoSuchProcess: result.add_warning("lifecycle", "Parent process no longer exists") # Child process validation try: children = proc.children() child_pids = [child.pid for child in children] result.metrics["child_pids"] = child_pids result.metrics["child_count"] = len(child_pids) # Check for excessive child processes if len(child_pids) > 50: # Arbitrary threshold result.add_warning("lifecycle", f"Process has {len(child_pids)} child processes") except (psutil.AccessDenied, psutil.NoSuchProcess): result.add_warning("lifecycle", "Cannot access child processes") # Process creation time validation try: create_time = datetime.fromtimestamp(proc.create_time()) age_seconds = (datetime.utcnow() - create_time).total_seconds() result.metrics["age_seconds"] = age_seconds # Check for very old processes that might be stuck if age_seconds > 86400: # 24 hours result.add_warning("lifecycle", f"Process is very old ({age_seconds/3600:.1f} hours)") except (psutil.AccessDenied, psutil.NoSuchProcess): result.add_warning("lifecycle", "Cannot access process creation time") except psutil.NoSuchProcess: result.add_error("lifecycle", f"Process {process_record.pid} no longer exists") # Update status to orphaned process_record.status = ProcessStatus.ORPHANED await self._log_pid_event(process_record.pid, "orphaned", process_id) except Exception as e: result.add_error("lifecycle", f"Lifecycle validation failed: {e}") async def run_comprehensive_validation( self, process_id: str, constraints: Optional[ProcessConstraints] = None ) -> ProcessValidationResult: """ Run comprehensive process validation. Args: process_id: Process to validate constraints: Resource and security constraints Returns: Validation result with all checks """ process_record = self._processes.get(process_id) if not process_record: raise ValidationError("process_id", process_id, "Process not found") # Initialize validation result result = ProcessValidationResult( process_id=process_id, pid=process_record.pid, is_valid=True ) try: # Run all validation categories await self.validate_process_integrity(process_id, result) if constraints: await self.validate_resource_constraints(process_id, constraints, result) await self.validate_security_policies(process_id, constraints, result) await self.validate_process_lifecycle(process_id, result) # Log validation event await self._log_pid_event( process_record.pid, "validated", process_id, { "is_valid": result.is_valid, "error_count": len(result.errors), "warning_count": len(result.warnings) } ) # Store validation result in database await self._save_validation_result(result) logger.info( "process_validation_completed", process_id=process_id, pid=process_record.pid, is_valid=result.is_valid, errors=len(result.errors), warnings=len(result.warnings) ) return result except Exception as e: result.add_error("validation", f"Validation framework error: {e}") logger.error( "process_validation_failed", process_id=process_id, error=str(e), exc_info=True ) return result async def _save_validation_result(self, result: ProcessValidationResult) -> None: """Save validation result to database.""" try: await self.db.execute(""" INSERT INTO validation_results (process_id, pid, is_valid, validation_time, integrity_valid, resource_valid, security_valid, lifecycle_valid, warnings, errors, metrics) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( result.process_id, result.pid, result.is_valid, result.validation_time.isoformat(), result.integrity_valid, result.resource_valid, result.security_valid, result.lifecycle_valid, json.dumps(result.warnings), json.dumps(result.errors), json.dumps(result.metrics) )) await self.db.commit() except Exception as e: logger.error("failed_to_save_validation_result", error=str(e)) # Cleanup Operations async def cleanup_stale_processes(self, max_age_hours: int = 24) -> int: """ Clean up processes that haven't had a heartbeat in the specified time. Args: max_age_hours: Maximum age in hours for last heartbeat Returns: Number of processes cleaned up """ try: cutoff_time = (datetime.utcnow() - timedelta(hours=max_age_hours)).isoformat() # Get stale processes cursor = await self.db.execute(""" SELECT process_id, pid, last_heartbeat FROM process_registry WHERE last_heartbeat < ? OR last_heartbeat IS NULL """, (cutoff_time,)) stale_processes = await cursor.fetchall() cleanup_count = 0 for process_id, pid, last_heartbeat in stale_processes: # Check if process is actually dead try: if pid and psutil.pid_exists(pid): # Process still exists, update heartbeat await self.update_heartbeat(process_id) continue except (psutil.NoSuchProcess, psutil.AccessDenied): pass # Remove stale process await self.db.execute(""" DELETE FROM process_registry WHERE process_id = ? """, (process_id,)) # Log cleanup await self._log_pid_event( pid or 0, PIDEventType.CLEANUP, process_id, {"reason": "stale_heartbeat", "last_heartbeat": last_heartbeat} ) cleanup_count += 1 logger.info( "cleaned_up_stale_process", process_id=process_id, pid=pid, last_heartbeat=last_heartbeat ) await self.db.commit() logger.info( "cleanup_stale_processes_completed", cleaned_up=cleanup_count, max_age_hours=max_age_hours ) return cleanup_count except Exception as e: logger.error("cleanup_stale_processes_failed", error=str(e)) return 0 async def cleanup_orphaned_pids(self) -> int: """ Clean up PID records where the actual process no longer exists. Returns: Number of orphaned PIDs cleaned up """ try: # Get all registered PIDs cursor = await self.db.execute(""" SELECT DISTINCT pid FROM process_registry WHERE pid IS NOT NULL """) registered_pids = [row[0] for row in await cursor.fetchall()] cleanup_count = 0 for pid in registered_pids: try: if not psutil.pid_exists(pid): # PID no longer exists, remove all records await self.db.execute(""" DELETE FROM process_registry WHERE pid = ? """, (pid,)) # Log cleanup await self._log_pid_event( pid, PIDEventType.CLEANUP, None, {"reason": "orphaned_pid"} ) cleanup_count += 1 logger.info("cleaned_up_orphaned_pid", pid=pid) except (psutil.NoSuchProcess, psutil.AccessDenied): # PID definitely doesn't exist await self.db.execute(""" DELETE FROM process_registry WHERE pid = ? """, (pid,)) cleanup_count += 1 logger.info("cleaned_up_orphaned_pid", pid=pid) await self.db.commit() logger.info( "cleanup_orphaned_pids_completed", cleaned_up=cleanup_count ) return cleanup_count except Exception as e: logger.error("cleanup_orphaned_pids_failed", error=str(e)) return 0 async def cleanup_old_validation_results(self, retention_days: int = 30) -> int: """ Clean up old validation results based on retention policy. Args: retention_days: Number of days to retain validation results Returns: Number of validation results cleaned up """ try: cutoff_time = (datetime.utcnow() - timedelta(days=retention_days)).isoformat() # Count old validation results cursor = await self.db.execute(""" SELECT COUNT(*) FROM validation_results WHERE validation_time < ? """, (cutoff_time,)) count_result = await cursor.fetchone() old_count = count_result[0] if count_result else 0 if old_count > 0: # Delete old validation results await self.db.execute(""" DELETE FROM validation_results WHERE validation_time < ? """, (cutoff_time,)) await self.db.commit() logger.info( "cleanup_old_validation_results_completed", cleaned_up=old_count, retention_days=retention_days ) return old_count except Exception as e: logger.error("cleanup_old_validation_results_failed", error=str(e)) return 0 async def cleanup_old_audit_trail(self, retention_days: int = 90) -> int: """ Clean up old audit trail entries. Args: retention_days: Number of days to retain audit trail Returns: Number of audit trail entries cleaned up """ try: cutoff_time = (datetime.utcnow() - timedelta(days=retention_days)).isoformat() # Count old audit entries cursor = await self.db.execute(""" SELECT COUNT(*) FROM pid_audit_trail WHERE timestamp < ? """, (cutoff_time,)) count_result = await cursor.fetchone() old_count = count_result[0] if count_result else 0 if old_count > 0: # Delete old audit entries await self.db.execute(""" DELETE FROM pid_audit_trail WHERE timestamp < ? """, (cutoff_time,)) await self.db.commit() logger.info( "cleanup_old_audit_trail_completed", cleaned_up=old_count, retention_days=retention_days ) return old_count except Exception as e: logger.error("cleanup_old_audit_trail_failed", error=str(e)) return 0 async def perform_database_maintenance(self) -> bool: """ Perform database maintenance operations. Returns: True if maintenance completed successfully """ try: # Get database statistics before maintenance cursor = await self.db.execute("PRAGMA page_count") page_count_before = (await cursor.fetchone())[0] cursor = await self.db.execute("PRAGMA freelist_count") freelist_count_before = (await cursor.fetchone())[0] # Vacuum database to reclaim space await self.db.execute("VACUUM") # Analyze tables to update statistics await self.db.execute("ANALYZE") # Get statistics after maintenance cursor = await self.db.execute("PRAGMA page_count") page_count_after = (await cursor.fetchone())[0] cursor = await self.db.execute("PRAGMA freelist_count") freelist_count_after = (await cursor.fetchone())[0] pages_reclaimed = page_count_before - page_count_after freelist_reduced = freelist_count_before - freelist_count_after logger.info( "database_maintenance_completed", pages_reclaimed=pages_reclaimed, freelist_reduced=freelist_reduced, page_count_before=page_count_before, page_count_after=page_count_after ) return True except Exception as e: logger.error("database_maintenance_failed", error=str(e)) return False async def run_comprehensive_cleanup( self, stale_process_hours: int = 24, validation_retention_days: int = 30, audit_retention_days: int = 90, perform_db_maintenance: bool = True ) -> Dict[str, int]: """ Run comprehensive cleanup operations. Args: stale_process_hours: Hours before considering process stale validation_retention_days: Days to retain validation results audit_retention_days: Days to retain audit trail perform_db_maintenance: Whether to perform database maintenance Returns: Dictionary with cleanup statistics """ cleanup_stats = { "stale_processes": 0, "orphaned_pids": 0, "old_validation_results": 0, "old_audit_entries": 0, "database_maintenance": False } start_time = datetime.utcnow() logger.info("starting_comprehensive_cleanup") try: # Cleanup stale processes cleanup_stats["stale_processes"] = await self.cleanup_stale_processes(stale_process_hours) # Cleanup orphaned PIDs cleanup_stats["orphaned_pids"] = await self.cleanup_orphaned_pids() # Cleanup old validation results cleanup_stats["old_validation_results"] = await self.cleanup_old_validation_results(validation_retention_days) # Cleanup old audit trail cleanup_stats["old_audit_entries"] = await self.cleanup_old_audit_trail(audit_retention_days) # Perform database maintenance if perform_db_maintenance: cleanup_stats["database_maintenance"] = await self.perform_database_maintenance() end_time = datetime.utcnow() duration = (end_time - start_time).total_seconds() logger.info( "comprehensive_cleanup_completed", duration_seconds=duration, **cleanup_stats ) # Emit cleanup event await emit( "process_cleanup_completed", EventCategory.SYSTEM, { "cleanup_stats": cleanup_stats, "duration_seconds": duration }, priority=EventPriority.LOW ) return cleanup_stats except Exception as e: logger.error("comprehensive_cleanup_failed", error=str(e)) return cleanup_stats # Export public API __all__ = [ 'ProcessManager', 'ProcessRecord', 'ProcessStatus', 'ProcessType', 'ProcessMetrics', 'ProcessPIDInfo', 'PIDEvent', 'ProcessConstraints', 'ProcessValidationResult', ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server