Skip to main content
Glama
registry_fixtures.py10.8 kB
""" Process Registry test fixtures. """ import psutil from pathlib import Path from typing import Dict, List, Any, Optional from datetime import datetime, timezone, timedelta import random import uuid import json from shannon_mcp.registry.storage import ProcessStatus class RegistryFixtures: """Fixtures for Process Registry testing.""" @staticmethod def create_mock_process_info( pid: Optional[int] = None, name: str = "claude", status: str = "running" ) -> Dict[str, Any]: """Create mock process information.""" if not pid: pid = random.randint(1000, 99999) now = datetime.now(timezone.utc) create_time = now - timedelta(seconds=random.randint(10, 3600)) return { "pid": pid, "name": name, "cmdline": [name, "--session", f"session_{uuid.uuid4().hex[:12]}"], "create_time": create_time.timestamp(), "status": status, "username": f"user_{random.randint(1000, 9999)}", "cpu_percent": round(random.uniform(0, 50), 2), "memory_info": { "rss": random.randint(50, 500) * 1024 * 1024, # RSS in bytes "vms": random.randint(100, 1000) * 1024 * 1024, # VMS in bytes }, "num_threads": random.randint(1, 20), "open_files": random.randint(5, 50), "connections": random.randint(0, 10) } @staticmethod def create_process_entry( pid: Optional[int] = None, session_id: Optional[str] = None, status: ProcessStatus = ProcessStatus.RUNNING, project_path: Optional[str] = None ) -> Dict[str, Any]: """Create a process registry entry.""" if not pid: pid = random.randint(1000, 99999) if not session_id: session_id = f"session_{uuid.uuid4().hex[:12]}" if not project_path: project_path = f"/home/user/project_{uuid.uuid4().hex[:6]}" now = datetime.now(timezone.utc) entry = { "pid": pid, "session_id": session_id, "project_path": project_path, "command": "claude", "args": ["--session", session_id, "--model", "claude-3-opus"], "env": { "CLAUDE_API_KEY": "test-key", "CLAUDE_SESSION_ID": session_id, "PATH": "/usr/local/bin:/usr/bin:/bin" }, "status": status.value, "started_at": now.isoformat(), "last_seen": now.isoformat(), "host": f"host-{uuid.uuid4().hex[:6]}", "port": random.randint(30000, 40000) if status == ProcessStatus.RUNNING else None, "user": f"user_{random.randint(1000, 9999)}", "metadata": { "version": "1.0.0", "platform": "linux" } } # Add resource metrics for running processes if status == ProcessStatus.RUNNING: entry.update({ "cpu_percent": round(random.uniform(0, 50), 2), "memory_mb": round(random.uniform(50, 500), 2), "disk_read_mb": round(random.uniform(0, 100), 2), "disk_write_mb": round(random.uniform(0, 50), 2), "open_files": random.randint(5, 50), "num_threads": random.randint(1, 20) }) return entry @staticmethod def create_zombie_process() -> Dict[str, Any]: """Create a zombie process entry.""" entry = RegistryFixtures.create_process_entry( status=ProcessStatus.ZOMBIE ) # Zombies have minimal resource usage entry.update({ "cpu_percent": 0.0, "memory_mb": 0.0, "disk_read_mb": 0.0, "disk_write_mb": 0.0, "zombie_since": datetime.now(timezone.utc).isoformat() }) return entry @staticmethod def create_stale_process() -> Dict[str, Any]: """Create a stale process entry.""" entry = RegistryFixtures.create_process_entry( status=ProcessStatus.RUNNING ) # Make it stale by setting last_seen to old timestamp stale_time = datetime.now(timezone.utc) - timedelta(minutes=30) entry["last_seen"] = stale_time.isoformat() entry["stale_detected"] = datetime.now(timezone.utc).isoformat() return entry @staticmethod def create_registry_database( db_path: Path, process_count: int = 10 ) -> List[Dict[str, Any]]: """Create a mock registry database.""" db_path.parent.mkdir(parents=True, exist_ok=True) processes = [] # Create various process states for i in range(process_count): if i % 5 == 0: # Create zombie process = RegistryFixtures.create_zombie_process() elif i % 4 == 0: # Create stale process = RegistryFixtures.create_stale_process() elif i % 3 == 0: # Create terminated process = RegistryFixtures.create_process_entry( status=ProcessStatus.TERMINATED ) else: # Create running process = RegistryFixtures.create_process_entry( status=ProcessStatus.RUNNING ) processes.append(process) # Save to file db_content = { "version": "1.0.0", "processes": processes, "metadata": { "created_at": datetime.now(timezone.utc).isoformat(), "total_processes": len(processes), "hosts": list(set(p["host"] for p in processes)) } } db_path.write_text(json.dumps(db_content, indent=2)) return processes @staticmethod def create_resource_alert( pid: int, alert_type: str = "cpu" ) -> Dict[str, Any]: """Create a resource alert.""" alerts = { "cpu": { "type": "high_cpu_usage", "metric": "cpu_percent", "value": 95.5, "threshold": 80.0, "message": "CPU usage exceeds 80% threshold" }, "memory": { "type": "high_memory_usage", "metric": "memory_percent", "value": 88.2, "threshold": 75.0, "message": "Memory usage exceeds 75% threshold" }, "disk": { "type": "high_disk_io", "metric": "disk_write_mb_per_sec", "value": 125.0, "threshold": 100.0, "message": "Disk write rate exceeds 100 MB/s" }, "files": { "type": "too_many_open_files", "metric": "open_files", "value": 950, "threshold": 900, "message": "Open files approaching system limit" } } alert = alerts.get(alert_type, alerts["cpu"]) return { "pid": pid, "timestamp": datetime.now(timezone.utc).isoformat(), "alert": alert, "severity": "warning" if alert["value"] < alert["threshold"] * 1.2 else "critical" } @staticmethod def create_cleanup_report( cleaned_pids: List[int], failed_pids: List[int] ) -> Dict[str, Any]: """Create a cleanup operation report.""" return { "timestamp": datetime.now(timezone.utc).isoformat(), "operation": "cleanup", "summary": { "total_processed": len(cleaned_pids) + len(failed_pids), "successfully_cleaned": len(cleaned_pids), "failed": len(failed_pids) }, "cleaned_pids": cleaned_pids, "failed_pids": failed_pids, "duration_seconds": round(random.uniform(0.5, 5.0), 2) } @staticmethod def create_cross_session_message( from_pid: int, to_session: str, message_type: str = "notification" ) -> Dict[str, Any]: """Create a cross-session message.""" message_types = { "notification": { "type": "notification", "content": "Task completed successfully", "priority": "info" }, "command": { "type": "command", "content": "refresh_agents", "priority": "high" }, "data": { "type": "data", "content": {"key": "value", "items": [1, 2, 3]}, "priority": "normal" } } msg = message_types.get(message_type, message_types["notification"]) return { "id": str(uuid.uuid4()), "from_pid": from_pid, "to_session": to_session, "timestamp": datetime.now(timezone.utc).isoformat(), "message": msg, "delivered": False, "ttl_seconds": 300 } @staticmethod def create_mock_psutil_process(info: Dict[str, Any]): """Create a mock psutil.Process object.""" class MockProcess: def __init__(self, data): self._data = data def pid(self): return self._data["pid"] def name(self): return self._data["name"] def cmdline(self): return self._data["cmdline"] def create_time(self): return self._data["create_time"] def status(self): return self._data["status"] def username(self): return self._data["username"] def cpu_percent(self, interval=None): return self._data["cpu_percent"] def memory_info(self): class MemInfo: def __init__(self, data): self.rss = data["rss"] self.vms = data["vms"] return MemInfo(self._data["memory_info"]) def num_threads(self): return self._data["num_threads"] def is_running(self): return self._data["status"] == "running" return MockProcess(info)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server