Skip to main content
Glama
benchmark_registry.py16 kB
""" Performance benchmarks for Process Registry. """ import pytest import asyncio import time import psutil from typing import List, Dict, Any import statistics import random from shannon_mcp.registry.storage import RegistryStorage, ProcessStatus from shannon_mcp.registry.tracker import ProcessTracker from shannon_mcp.registry.monitor import ResourceMonitor from tests.fixtures.registry_fixtures import RegistryFixtures from tests.utils.performance import PerformanceTimer, PerformanceMonitor class BenchmarkRegistryStorage: """Benchmark registry storage performance.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_registry_write_performance(self, benchmark, temp_dir): """Benchmark registry write performance.""" storage = RegistryStorage(temp_dir / "registry.db") await storage.initialize() # Test different batch sizes batch_sizes = [1, 10, 100, 1000] results = {} for batch_size in batch_sizes: # Generate process entries entries = [] for i in range(batch_size): entry = RegistryFixtures.create_process_entry( pid=50000 + i, session_id=f"bench-session-{i}", status=ProcessStatus.RUNNING ) entries.append(entry) # Benchmark writes write_times = [] for run in range(5): # Clear database await storage.clear_all() start = time.perf_counter() for entry in entries: await storage.register_process( entry["pid"], entry["session_id"], entry["project_path"], entry["command"], entry["args"], entry["env"] ) duration = time.perf_counter() - start write_times.append(duration) avg_time = statistics.mean(write_times) throughput = batch_size / avg_time results[f"batch_{batch_size}"] = { "avg_time": avg_time, "processes_per_second": throughput, "latency_per_process_ms": (avg_time / batch_size) * 1000 } await storage.close() # Performance assertions assert results["batch_1"]["processes_per_second"] > 500 assert results["batch_100"]["processes_per_second"] > 1000 assert results["batch_1000"]["processes_per_second"] > 500 return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_registry_query_performance(self, benchmark, temp_dir): """Benchmark registry query performance.""" storage = RegistryStorage(temp_dir / "registry.db") await storage.initialize() # Pre-populate with processes process_count = 10000 session_count = 100 for i in range(process_count): await storage.register_process( pid=10000 + i, session_id=f"session-{i % session_count}", project_path=f"/project/{i % 10}", command="claude", args=["--session", f"session-{i % session_count}"], env={"CLAUDE_API_KEY": "test"} ) # Benchmark different queries queries = [ ("get_by_pid", lambda: storage.get_process(15000)), ("get_by_session", lambda: storage.get_processes_by_session(f"session-50")), ("get_by_status", lambda: storage.get_processes_by_status(ProcessStatus.RUNNING)), ("get_all_active", lambda: storage.get_active_processes()), ("count_total", lambda: storage.count_processes()) ] results = {} for query_name, query_func in queries: query_times = [] for _ in range(20): start = time.perf_counter() result = await query_func() duration = time.perf_counter() - start query_times.append(duration) avg_time = statistics.mean(query_times) p95_time = statistics.quantiles(query_times, n=20)[18] results[query_name] = { "avg_time_ms": avg_time * 1000, "p95_time_ms": p95_time * 1000, "queries_per_second": 1 / avg_time } await storage.close() # Query performance assertions assert results["get_by_pid"]["avg_time_ms"] < 5 assert results["get_by_session"]["avg_time_ms"] < 10 assert results["count_total"]["avg_time_ms"] < 2 return results class BenchmarkProcessTracking: """Benchmark process tracking performance.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_process_validation_performance(self, benchmark): """Benchmark process validation performance.""" tracker = ProcessTracker() monitor = PerformanceMonitor() # Get real system processes all_pids = psutil.pids() test_pids = random.sample(all_pids, min(100, len(all_pids))) # Benchmark validation validation_times = [] for pid in test_pids: with PerformanceTimer(f"validate_{pid}") as timer: try: info = await tracker.get_process_info(pid) is_valid = info is not None except: is_valid = False validation_times.append(timer.metrics.duration_seconds) monitor.add_measurement(timer.metrics) avg_time = statistics.mean(validation_times) results = { "processes_checked": len(test_pids), "avg_validation_time_ms": avg_time * 1000, "validations_per_second": 1 / avg_time, "total_duration": sum(validation_times) } # Validation should be fast assert results["avg_validation_time_ms"] < 10 assert results["validations_per_second"] > 100 return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_batch_tracking_performance(self, benchmark): """Benchmark batch process tracking.""" tracker = ProcessTracker() # Create mock process data process_counts = [10, 50, 100, 500] results = {} for count in process_counts: # Generate PIDs pids = list(range(10000, 10000 + count)) # Benchmark batch tracking start = time.perf_counter() # Track all processes tasks = [] for pid in pids: tasks.append(tracker.track_process( pid, f"session-{pid % 10}", {"test": True} )) await asyncio.gather(*tasks) duration = time.perf_counter() - start results[f"{count}_processes"] = { "duration": duration, "processes_per_second": count / duration, "avg_time_per_process_ms": (duration / count) * 1000 } # Should scale well assert results["100_processes"]["processes_per_second"] > 100 assert results["500_processes"]["processes_per_second"] > 200 return results class BenchmarkResourceMonitoring: """Benchmark resource monitoring performance.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_resource_collection_performance(self, benchmark): """Benchmark resource collection performance.""" monitor = ResourceMonitor() # Get some real PIDs all_pids = psutil.pids() test_pids = random.sample(all_pids, min(50, len(all_pids))) # Benchmark resource collection collection_times = [] for _ in range(10): start = time.perf_counter() stats_list = [] for pid in test_pids: stats = await monitor.get_process_stats(pid) if stats: stats_list.append(stats) duration = time.perf_counter() - start collection_times.append(duration) avg_time = statistics.mean(collection_times) results = { "pids_monitored": len(test_pids), "avg_collection_time": avg_time, "avg_time_per_pid_ms": (avg_time / len(test_pids)) * 1000, "collections_per_second": 1 / avg_time } # Resource collection should be efficient assert results["avg_time_per_pid_ms"] < 5 return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_alert_detection_performance(self, benchmark): """Benchmark alert detection performance.""" monitor = ResourceMonitor() # Create test data with various resource levels test_cases = [] for i in range(1000): stats = { "pid": 10000 + i, "cpu_percent": random.uniform(0, 100), "memory_mb": random.uniform(50, 2000), "disk_read_mb": random.uniform(0, 100), "disk_write_mb": random.uniform(0, 50), "open_files": random.randint(10, 1000) } test_cases.append(stats) # Benchmark alert detection start = time.perf_counter() alerts = [] for stats in test_cases: alert = await monitor.check_thresholds( stats["pid"], stats, cpu_threshold=80.0, memory_threshold_mb=1500.0, disk_io_threshold_mb=75.0 ) if alert: alerts.append(alert) duration = time.perf_counter() - start results = { "total_checks": len(test_cases), "alerts_generated": len(alerts), "duration": duration, "checks_per_second": len(test_cases) / duration, "avg_check_time_ms": (duration / len(test_cases)) * 1000 } # Alert detection should be fast assert results["checks_per_second"] > 10000 assert results["avg_check_time_ms"] < 0.1 return results class BenchmarkRegistryCleanup: """Benchmark registry cleanup performance.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_cleanup_performance(self, benchmark, temp_dir): """Benchmark cleanup operation performance.""" storage = RegistryStorage(temp_dir / "registry.db") await storage.initialize() # Create mix of active and stale processes total_processes = 1000 stale_percentage = 0.3 for i in range(total_processes): is_stale = i < (total_processes * stale_percentage) entry = RegistryFixtures.create_process_entry( pid=20000 + i, session_id=f"cleanup-session-{i}", status=ProcessStatus.RUNNING ) # Register process await storage.register_process( entry["pid"], entry["session_id"], entry["project_path"], entry["command"], entry["args"], entry["env"] ) # Mark some as stale if is_stale: await storage.update_process_status( entry["pid"], ProcessStatus.STALE ) # Benchmark cleanup start = time.perf_counter() # Get stale processes stale_processes = await storage.get_stale_processes( threshold_minutes=30 ) # Clean them up cleaned_count = 0 for process in stale_processes: success = await storage.remove_process(process.pid) if success: cleaned_count += 1 duration = time.perf_counter() - start results = { "total_processes": total_processes, "stale_processes": len(stale_processes), "cleaned_processes": cleaned_count, "duration": duration, "cleanup_rate": cleaned_count / duration if duration > 0 else 0 } await storage.close() # Cleanup should be efficient assert results["cleanup_rate"] > 100 # >100 processes/second return results class BenchmarkCrossSessionMessaging: """Benchmark cross-session messaging performance.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_message_routing_performance(self, benchmark, temp_dir): """Benchmark message routing between sessions.""" storage = RegistryStorage(temp_dir / "registry.db") await storage.initialize() # Create sessions session_count = 100 for i in range(session_count): await storage.register_process( pid=30000 + i, session_id=f"msg-session-{i}", project_path="/test", command="claude", args=[], env={} ) # Benchmark message sending message_counts = [10, 100, 1000] results = {} for count in message_counts: messages = [] for i in range(count): from_session = f"msg-session-{i % session_count}" to_session = f"msg-session-{(i + 1) % session_count}" message = RegistryFixtures.create_cross_session_message( 30000 + (i % session_count), to_session ) messages.append((from_session, to_session, message)) # Send messages start = time.perf_counter() for from_session, to_session, message in messages: await storage.send_message( from_session, to_session, message["message"] ) send_duration = time.perf_counter() - start # Receive messages start = time.perf_counter() received_count = 0 for i in range(session_count): session_id = f"msg-session-{i}" messages = await storage.get_messages(session_id) received_count += len(messages) receive_duration = time.perf_counter() - start results[f"{count}_messages"] = { "send_duration": send_duration, "receive_duration": receive_duration, "send_rate": count / send_duration, "receive_rate": received_count / receive_duration, "total_duration": send_duration + receive_duration } await storage.close() # Messaging should be fast assert results["100_messages"]["send_rate"] > 1000 assert results["1000_messages"]["send_rate"] > 5000 return results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server