Skip to main content
Glama
benchmark_commands.py20.8 kB
""" Performance benchmarks for Command System. """ import pytest import asyncio import time import json from typing import List, Dict, Any, Optional import statistics import random from unittest.mock import AsyncMock, Mock from shannon_mcp.commands.parser import CommandParser from shannon_mcp.commands.registry import CommandRegistry from shannon_mcp.commands.executor import CommandExecutor from tests.fixtures.command_fixtures import CommandFixtures from tests.utils.performance import PerformanceTimer, PerformanceMonitor class BenchmarkCommandParsing: """Benchmark command parsing performance.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_parse_performance(self, benchmark): """Benchmark parsing various command formats.""" parser = CommandParser() # Test different command types command_types = [ ("simple", "/help"), ("with_args", "/session create --model opus --temperature 0.7"), ("complex_args", "/agent create --name test --capabilities streaming,sessions --priority high --config '{\"key\": \"value\"}'"), ("nested", "/checkpoint create --state '{\"files\": {\"main.py\": {\"content\": \"print()\"}}, \"metadata\": {\"version\": 1}}'"), ("long_args", f"/analyze --data '{json.dumps({f\"key_{i}\": f\"value_{i}\" for i in range(100)})}'") ] results = {} for cmd_type, command_str in command_types: parse_times = [] for _ in range(1000): start = time.perf_counter() parsed = parser.parse(command_str) duration = time.perf_counter() - start parse_times.append(duration) avg_time = statistics.mean(parse_times) p95_time = statistics.quantiles(parse_times, n=20)[18] results[cmd_type] = { "command_length": len(command_str), "avg_parse_time_us": avg_time * 1_000_000, "p95_parse_time_us": p95_time * 1_000_000, "parses_per_second": 1 / avg_time } # Parsing should be very fast assert all(r["avg_parse_time_us"] < 100 for r in results.values()) assert results["simple"]["parses_per_second"] > 50000 return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_validation_performance(self, benchmark): """Benchmark command validation performance.""" parser = CommandParser() registry = CommandRegistry() # Register commands with various validation rules commands = [ CommandFixtures.create_command( name="simple", args=[] ), CommandFixtures.create_command( name="validated", args=[ {"name": "model", "type": "string", "choices": ["opus", "sonnet", "haiku"]}, {"name": "temperature", "type": "float", "min": 0.0, "max": 1.0}, {"name": "max_tokens", "type": "int", "min": 1, "max": 100000} ] ), CommandFixtures.create_command( name="complex", args=[ {"name": "config", "type": "json", "schema": {"type": "object"}}, {"name": "tags", "type": "list", "item_type": "string"}, {"name": "options", "type": "dict", "key_type": "string", "value_type": "any"} ] ) ] for cmd in commands: registry.register(cmd) # Test validation scenarios test_cases = [ ("valid_simple", "/simple"), ("valid_args", "/validated --model opus --temperature 0.7 --max_tokens 4096"), ("invalid_choice", "/validated --model gpt4 --temperature 0.5"), ("invalid_range", "/validated --model opus --temperature 1.5"), ("valid_complex", "/complex --config '{\"key\": \"value\"}' --tags tag1,tag2 --options key1=val1,key2=val2") ] results = {} for case_name, command_str in test_cases: validation_times = [] for _ in range(500): parsed = parser.parse(command_str) start = time.perf_counter() is_valid = registry.validate(parsed) duration = time.perf_counter() - start validation_times.append(duration) avg_time = statistics.mean(validation_times) results[case_name] = { "avg_validation_time_us": avg_time * 1_000_000, "validations_per_second": 1 / avg_time } # Validation should be fast assert all(r["avg_validation_time_us"] < 50 for r in results.values()) return results class BenchmarkCommandExecution: """Benchmark command execution performance.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_execution_performance(self, benchmark): """Benchmark executing commands with various complexities.""" executor = CommandExecutor() registry = CommandRegistry() # Register commands with different execution times execution_profiles = [ ("instant", 0), # No delay ("fast", 0.001), # 1ms ("moderate", 0.01), # 10ms ("slow", 0.1), # 100ms ("async_io", None) # Simulates async I/O ] for profile_name, delay in execution_profiles: async def handler(args, exec_delay=delay): if exec_delay is None: # Simulate async I/O await asyncio.sleep(0.001) await asyncio.gather(*[ asyncio.sleep(0.001) for _ in range(5) ]) elif exec_delay > 0: await asyncio.sleep(exec_delay) return { "status": "success", "result": f"Executed {profile_name}", "args": args } command = CommandFixtures.create_command( name=profile_name, handler=handler ) registry.register(command) results = {} for profile_name, expected_delay in execution_profiles: exec_times = [] for i in range(50): start = time.perf_counter() result = await executor.execute( profile_name, {"test_arg": f"value_{i}"} ) duration = time.perf_counter() - start exec_times.append(duration) avg_time = statistics.mean(exec_times) overhead = avg_time - (expected_delay or 0.006) # async_io ~6ms results[profile_name] = { "avg_execution_time_ms": avg_time * 1000, "overhead_ms": overhead * 1000, "executions_per_second": 1 / avg_time } # Execution overhead should be minimal assert results["instant"]["overhead_ms"] < 1 assert results["fast"]["overhead_ms"] < 2 return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_concurrent_execution_performance(self, benchmark): """Benchmark concurrent command execution.""" executor = CommandExecutor() registry = CommandRegistry() # Register a command that can handle concurrency async def concurrent_handler(args): await asyncio.sleep(random.uniform(0.01, 0.05)) # 10-50ms return {"id": args.get("id"), "result": "completed"} command = CommandFixtures.create_command( name="concurrent_test", handler=concurrent_handler ) registry.register(command) # Test different concurrency levels concurrency_levels = [1, 10, 50, 100, 200] results = {} for level in concurrency_levels: start = time.perf_counter() # Execute commands concurrently tasks = [] for i in range(level): task = executor.execute( "concurrent_test", {"id": i} ) tasks.append(task) results_list = await asyncio.gather(*tasks) duration = time.perf_counter() - start results[f"concurrency_{level}"] = { "commands": level, "duration": duration, "commands_per_second": level / duration, "avg_time_per_command_ms": (duration / level) * 1000 } # Should handle concurrency well assert results["concurrency_50"]["commands_per_second"] > 100 assert results["concurrency_100"]["commands_per_second"] > 150 return results class BenchmarkCommandChaining: """Benchmark command chaining and pipelines.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_pipeline_performance(self, benchmark): """Benchmark command pipeline execution.""" executor = CommandExecutor() registry = CommandRegistry() # Register pipeline commands async def transform_handler(args): data = args.get("input", []) transform = args.get("transform", "upper") if transform == "upper": result = [str(item).upper() for item in data] elif transform == "double": result = [item * 2 for item in data if isinstance(item, (int, float))] else: result = data return {"output": result} async def filter_handler(args): data = args.get("input", []) condition = args.get("condition", "all") if condition == "even": result = [item for item in data if isinstance(item, int) and item % 2 == 0] elif condition == "long": result = [item for item in data if len(str(item)) > 3] else: result = data return {"output": result} async def aggregate_handler(args): data = args.get("input", []) operation = args.get("operation", "sum") if operation == "sum": result = sum(item for item in data if isinstance(item, (int, float))) elif operation == "count": result = len(data) else: result = data return {"output": result} commands = [ CommandFixtures.create_command(name="transform", handler=transform_handler), CommandFixtures.create_command(name="filter", handler=filter_handler), CommandFixtures.create_command(name="aggregate", handler=aggregate_handler) ] for cmd in commands: registry.register(cmd) # Test different pipeline lengths pipeline_configs = [ ("short", 2), # 2 commands ("medium", 5), # 5 commands ("long", 10), # 10 commands ("complex", 20) # 20 commands ] results = {} for config_name, pipeline_length in pipeline_configs: # Generate test data test_data = list(range(1000)) # Create pipeline pipeline = [] for i in range(pipeline_length): if i % 3 == 0: pipeline.append(("transform", {"transform": "double"})) elif i % 3 == 1: pipeline.append(("filter", {"condition": "even"})) else: pipeline.append(("aggregate", {"operation": "count"})) # Execute pipeline pipeline_times = [] for _ in range(20): start = time.perf_counter() current_data = test_data for cmd_name, cmd_args in pipeline: result = await executor.execute( cmd_name, {"input": current_data, **cmd_args} ) current_data = result.get("output", []) duration = time.perf_counter() - start pipeline_times.append(duration) avg_time = statistics.mean(pipeline_times) results[config_name] = { "pipeline_length": pipeline_length, "avg_execution_time": avg_time, "avg_time_per_stage_ms": (avg_time / pipeline_length) * 1000, "pipelines_per_second": 1 / avg_time } # Pipeline execution should scale linearly assert results["medium"]["avg_time_per_stage_ms"] < 10 assert results["long"]["avg_time_per_stage_ms"] < 15 return results class BenchmarkCommandAutocomplete: """Benchmark command autocomplete performance.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_autocomplete_performance(self, benchmark): """Benchmark command autocomplete suggestions.""" registry = CommandRegistry() # Register many commands command_count = 1000 # Create commands with various prefixes prefixes = ["session", "agent", "checkpoint", "hook", "transport", "analytics", "registry"] for i in range(command_count): prefix = prefixes[i % len(prefixes)] command = CommandFixtures.create_command( name=f"{prefix}_{i}", aliases=[f"{prefix[:3]}{i}", f"{prefix}{i}"], args=[ {"name": "arg1", "type": "string"}, {"name": "arg2", "type": "int"}, {"name": f"{prefix}_specific", "type": "bool"} ] ) registry.register(command) # Test autocomplete scenarios test_queries = [ ("empty", ""), ("single_char", "s"), ("prefix", "sess"), ("full_prefix", "session"), ("partial_cmd", "session_1"), ("with_args", "session_100 --ar"), ("deep_match", "checkpoint_5") ] results = {} for query_name, query in test_queries: autocomplete_times = [] for _ in range(100): start = time.perf_counter() suggestions = registry.autocomplete(query) duration = time.perf_counter() - start autocomplete_times.append(duration) avg_time = statistics.mean(autocomplete_times) results[query_name] = { "query": query, "suggestions_count": len(suggestions), "avg_time_ms": avg_time * 1000, "queries_per_second": 1 / avg_time } # Autocomplete should be responsive assert all(r["avg_time_ms"] < 10 for r in results.values()) assert results["prefix"]["queries_per_second"] > 1000 return results class BenchmarkCommandHistory: """Benchmark command history operations.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_history_performance(self, benchmark, temp_dir): """Benchmark command history storage and retrieval.""" from shannon_mcp.commands.history import CommandHistory history = CommandHistory(temp_dir / "command_history.db") await history.initialize() # Test different history sizes history_sizes = [100, 1000, 10000] results = {} for size in history_sizes: # Add commands to history add_times = [] for i in range(size): command = CommandFixtures.create_command_entry( command=f"/test_command_{i % 10} --arg value_{i}", timestamp=time.time() - (size - i), result={"status": "success", "data": f"result_{i}"} ) start = time.perf_counter() await history.add(command) duration = time.perf_counter() - start add_times.append(duration) # Search history search_patterns = ["test_command", "arg value_5", "success"] search_times = [] for pattern in search_patterns: start = time.perf_counter() results_list = await history.search(pattern) duration = time.perf_counter() - start search_times.append(duration) # Get recent history recent_times = [] for _ in range(20): start = time.perf_counter() recent = await history.get_recent(100) duration = time.perf_counter() - start recent_times.append(duration) results[f"size_{size}"] = { "history_size": size, "avg_add_time_ms": statistics.mean(add_times) * 1000, "avg_search_time_ms": statistics.mean(search_times) * 1000, "avg_recent_time_ms": statistics.mean(recent_times) * 1000, "adds_per_second": 1 / statistics.mean(add_times) } # Clear for next test await history.clear() # History operations should be fast assert results["size_1000"]["avg_add_time_ms"] < 5 assert results["size_10000"]["avg_search_time_ms"] < 50 await history.close() return results class BenchmarkCommandAliases: """Benchmark command alias resolution.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_alias_resolution_performance(self, benchmark): """Benchmark resolving command aliases.""" registry = CommandRegistry() # Register commands with many aliases command_count = 500 aliases_per_command = 10 for i in range(command_count): aliases = [f"alias_{i}_{j}" for j in range(aliases_per_command)] command = CommandFixtures.create_command( name=f"command_{i}", aliases=aliases ) registry.register(command) # Test alias resolution resolution_times = [] # Test resolving various aliases test_count = 1000 for _ in range(test_count): # Pick random alias cmd_idx = random.randint(0, command_count - 1) alias_idx = random.randint(0, aliases_per_command - 1) alias = f"alias_{cmd_idx}_{alias_idx}" start = time.perf_counter() resolved = registry.resolve_alias(alias) duration = time.perf_counter() - start resolution_times.append(duration) avg_time = statistics.mean(resolution_times) p95_time = statistics.quantiles(resolution_times, n=20)[18] results = { "total_commands": command_count, "total_aliases": command_count * aliases_per_command, "avg_resolution_time_us": avg_time * 1_000_000, "p95_resolution_time_us": p95_time * 1_000_000, "resolutions_per_second": 1 / avg_time } # Alias resolution should be very fast assert results["avg_resolution_time_us"] < 10 assert results["resolutions_per_second"] > 100000 return results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server