Skip to main content
Glama
validator.py17.1 kB
""" Process Validator for Process Registry. Validates process state and integrity. """ import os import asyncio import psutil from pathlib import Path from datetime import datetime, timezone, timedelta from typing import Optional, List, Dict, Any, Tuple from dataclasses import dataclass from enum import Enum from ..utils.logging import get_logger from ..utils.errors import ShannonError from .storage import RegistryStorage, ProcessEntry, ProcessStatus from .tracker import ProcessTracker, ProcessInfo logger = get_logger(__name__) class ValidationStatus(str, Enum): """Process validation status.""" VALID = "valid" STALE = "stale" ZOMBIE = "zombie" HIJACKED = "hijacked" # PID reused by different process MISSING = "missing" UNHEALTHY = "unhealthy" RESOURCE_EXCEEDED = "resource_exceeded" @dataclass class ValidationResult: """Result of process validation.""" pid: int host: str status: ValidationStatus process_status: Optional[ProcessStatus] reason: str details: Dict[str, Any] recommended_action: Optional[str] @property def is_valid(self) -> bool: """Check if process is valid.""" return self.status == ValidationStatus.VALID class ProcessValidator: """Validates process health and integrity.""" def __init__( self, storage: RegistryStorage, tracker: ProcessTracker ): """ Initialize validator. Args: storage: Registry storage instance tracker: Process tracker instance """ self.storage = storage self.tracker = tracker self.hostname = os.uname().nodename # Validation thresholds self.stale_threshold_seconds = 300 # 5 minutes self.cpu_threshold_percent = 90.0 self.memory_threshold_mb = 4096 # 4GB self.zombie_threshold_seconds = 600 # 10 minutes async def validate_process( self, entry: ProcessEntry ) -> ValidationResult: """ Validate a single process. Args: entry: Process entry to validate Returns: Validation result """ # Check if process exists if not psutil.pid_exists(entry.pid): return ValidationResult( pid=entry.pid, host=entry.host, status=ValidationStatus.MISSING, process_status=ProcessStatus.STOPPED, reason="Process no longer exists", details={"last_seen": entry.last_seen.isoformat()}, recommended_action="remove_from_registry" ) try: # Get current process info process = psutil.Process(entry.pid) info = ProcessInfo.from_psutil(process) # Check if it's the same process (not PID reuse) if not self._is_same_process(entry, info): return ValidationResult( pid=entry.pid, host=entry.host, status=ValidationStatus.HIJACKED, process_status=ProcessStatus.STOPPED, reason="PID has been reused by a different process", details={ "original_command": entry.command, "current_command": info.name, "original_start": entry.started_at.isoformat(), "current_start": info.create_time.isoformat() }, recommended_action="remove_from_registry" ) # Check if process is zombie if info.status == psutil.STATUS_ZOMBIE: zombie_duration = ( datetime.now(timezone.utc) - entry.last_seen ).total_seconds() if zombie_duration > self.zombie_threshold_seconds: return ValidationResult( pid=entry.pid, host=entry.host, status=ValidationStatus.ZOMBIE, process_status=ProcessStatus.ZOMBIE, reason=f"Process has been zombie for {zombie_duration:.0f}s", details={ "zombie_since": entry.last_seen.isoformat(), "parent_pid": process.ppid() }, recommended_action="kill_and_remove" ) # Check if process is stale (not updated recently) stale_duration = ( datetime.now(timezone.utc) - entry.last_seen ).total_seconds() if stale_duration > self.stale_threshold_seconds: return ValidationResult( pid=entry.pid, host=entry.host, status=ValidationStatus.STALE, process_status=entry.status, reason=f"Process not updated for {stale_duration:.0f}s", details={ "last_seen": entry.last_seen.isoformat(), "threshold_seconds": self.stale_threshold_seconds }, recommended_action="refresh_tracking" ) # Check resource usage resource_issues = [] if info.cpu_percent > self.cpu_threshold_percent: resource_issues.append( f"High CPU usage: {info.cpu_percent:.1f}%" ) memory_mb = info.memory_info.get('rss', 0) / (1024 * 1024) if memory_mb > self.memory_threshold_mb: resource_issues.append( f"High memory usage: {memory_mb:.0f}MB" ) if resource_issues: return ValidationResult( pid=entry.pid, host=entry.host, status=ValidationStatus.RESOURCE_EXCEEDED, process_status=ProcessStatus.BUSY, reason="; ".join(resource_issues), details={ "cpu_percent": info.cpu_percent, "memory_mb": memory_mb, "cpu_threshold": self.cpu_threshold_percent, "memory_threshold_mb": self.memory_threshold_mb }, recommended_action="monitor_closely" ) # Check process health indicators health_issues = [] # Check for excessive file handles if len(info.open_files) > 1000: health_issues.append( f"Excessive open files: {len(info.open_files)}" ) # Check for excessive connections if len(info.connections) > 100: health_issues.append( f"Excessive connections: {len(info.connections)}" ) # Check for excessive threads if info.num_threads > 100: health_issues.append( f"Excessive threads: {info.num_threads}" ) if health_issues: return ValidationResult( pid=entry.pid, host=entry.host, status=ValidationStatus.UNHEALTHY, process_status=entry.status, reason="; ".join(health_issues), details={ "open_files": len(info.open_files), "connections": len(info.connections), "threads": info.num_threads }, recommended_action="investigate_health" ) # Process is valid return ValidationResult( pid=entry.pid, host=entry.host, status=ValidationStatus.VALID, process_status=self._determine_status(info), reason="Process is healthy and running normally", details={ "cpu_percent": info.cpu_percent, "memory_mb": memory_mb, "uptime_seconds": ( datetime.now(timezone.utc) - info.create_time ).total_seconds() }, recommended_action=None ) except psutil.NoSuchProcess: return ValidationResult( pid=entry.pid, host=entry.host, status=ValidationStatus.MISSING, process_status=ProcessStatus.STOPPED, reason="Process disappeared during validation", details={}, recommended_action="remove_from_registry" ) except psutil.AccessDenied: return ValidationResult( pid=entry.pid, host=entry.host, status=ValidationStatus.VALID, process_status=entry.status, reason="Access denied to process (likely running)", details={"note": "Process exists but cannot access details"}, recommended_action=None ) except Exception as e: logger.error(f"Validation error for PID {entry.pid}: {e}") return ValidationResult( pid=entry.pid, host=entry.host, status=ValidationStatus.UNHEALTHY, process_status=entry.status, reason=f"Validation error: {str(e)}", details={"error": str(e)}, recommended_action="manual_check" ) async def validate_all_processes( self, fix_issues: bool = False ) -> Dict[str, List[ValidationResult]]: """ Validate all processes in the registry. Args: fix_issues: Whether to automatically fix issues Returns: Dict categorizing validation results by status """ # Get all processes for this host processes = await self.storage.get_all_processes(host=self.hostname) results = { status.value: [] for status in ValidationStatus } for entry in processes: result = await self.validate_process(entry) results[result.status.value].append(result) # Apply fixes if requested if fix_issues: await self._apply_fix(result) # Log summary total = len(processes) valid = len(results[ValidationStatus.VALID.value]) logger.info( f"Validated {total} processes: {valid} valid, " f"{total - valid} with issues" ) return results async def validate_session( self, session_id: str ) -> Tuple[bool, List[ValidationResult]]: """ Validate all processes in a session. Args: session_id: Session ID to validate Returns: Tuple of (all_valid, results) """ processes = await self.storage.get_session_processes(session_id) results = [] for entry in processes: result = await self.validate_process(entry) results.append(result) all_valid = all(r.is_valid for r in results) return all_valid, results async def check_port_conflicts(self) -> List[Dict[str, Any]]: """ Check for port conflicts between processes. Returns: List of conflict details """ conflicts = [] port_map: Dict[int, List[ProcessEntry]] = {} # Get all running processes processes = await self.storage.get_all_processes( status=ProcessStatus.RUNNING ) # Build port map for entry in processes: if entry.port: if entry.port not in port_map: port_map[entry.port] = [] port_map[entry.port].append(entry) # Find conflicts for port, entries in port_map.items(): if len(entries) > 1: conflicts.append({ 'port': port, 'processes': [ { 'pid': e.pid, 'host': e.host, 'session_id': e.session_id, 'command': e.command } for e in entries ] }) return conflicts async def find_orphaned_processes(self) -> List[ProcessInfo]: """ Find Claude processes not in the registry. Returns: List of orphaned process info """ # Get all Claude processes all_claude = await self.tracker.find_claude_processes() # Get registered PIDs registered = await self.storage.get_all_processes(host=self.hostname) registered_pids = {p.pid for p in registered} # Find orphans orphans = [ p for p in all_claude if p.pid not in registered_pids ] if orphans: logger.warning(f"Found {len(orphans)} orphaned Claude processes") return orphans def _is_same_process( self, entry: ProcessEntry, info: ProcessInfo ) -> bool: """Check if process info matches registry entry.""" # Compare creation times (within 1 second tolerance) time_diff = abs( (info.create_time - entry.started_at).total_seconds() ) if time_diff > 1: return False # Compare command names if entry.command.lower() not in info.name.lower(): # Check cmdline as fallback if entry.command not in ' '.join(info.cmdline): return False return True def _determine_status(self, info: ProcessInfo) -> ProcessStatus: """Determine process status from info.""" if info.status == psutil.STATUS_ZOMBIE: return ProcessStatus.ZOMBIE elif info.status in [psutil.STATUS_STOPPED, psutil.STATUS_DEAD]: return ProcessStatus.STOPPED elif info.cpu_percent > 50: return ProcessStatus.BUSY elif info.cpu_percent > 0: return ProcessStatus.RUNNING else: return ProcessStatus.IDLE async def _apply_fix(self, result: ValidationResult) -> None: """Apply recommended fix for validation result.""" if not result.recommended_action: return try: if result.recommended_action == "remove_from_registry": await self.storage.remove_process(result.pid, result.host) logger.info(f"Removed {result.status} process {result.pid}") elif result.recommended_action == "kill_and_remove": try: process = psutil.Process(result.pid) process.kill() await asyncio.sleep(1) # Wait for termination except (psutil.NoSuchProcess, psutil.AccessDenied): pass await self.storage.remove_process(result.pid, result.host) logger.info(f"Killed and removed zombie process {result.pid}") elif result.recommended_action == "refresh_tracking": # Re-track the process entry = await self.storage.get_process(result.pid, result.host) if entry: await self.tracker.track_process( result.pid, entry.session_id, entry.project_path, entry.metadata ) logger.info(f"Refreshed tracking for stale process {result.pid}") elif result.recommended_action == "monitor_closely": # Add monitoring flag to metadata await self.storage.update_process_status( result.pid, result.host, result.process_status or ProcessStatus.BUSY, {"monitoring": "high_resource_usage"} ) except Exception as e: logger.error(f"Failed to apply fix for {result.pid}: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server