Skip to main content
Glama
test_hooks_commands.py19.2 kB
""" Functional tests for hooks framework and command system. """ import pytest import asyncio import json import time import tempfile from pathlib import Path from shannon_mcp.hooks.manager import HookManager from shannon_mcp.hooks.registry import HookRegistry from shannon_mcp.commands.parser import CommandParser from shannon_mcp.commands.executor import CommandExecutor from shannon_mcp.commands.registry import CommandRegistry from shannon_mcp.managers.session import SessionManager from shannon_mcp.managers.binary import BinaryManager class TestHooksSystem: """Test hooks framework with real events.""" @pytest.fixture async def hooks_setup(self): """Set up hooks system.""" registry = HookRegistry() manager = HookManager(registry) # Set up session manager for hook integration binary_manager = BinaryManager() binaries = await binary_manager.discover_binaries() if not binaries: pytest.skip("No Claude Code binary found") session_manager = SessionManager(binary_manager=binary_manager) # Connect hooks to session events session_manager.hook_manager = manager yield { "registry": registry, "manager": manager, "session_manager": session_manager } @pytest.mark.asyncio async def test_session_hooks(self, hooks_setup): """Test hooks triggered by session events.""" setup = hooks_setup manager = setup["manager"] session_manager = setup["session_manager"] # Track hook executions hook_calls = [] # Register session lifecycle hooks async def on_session_start(event, data): hook_calls.append(("start", data.get("session_id"))) print(f"\n[HOOK] Session started: {data.get('session_id')}") return {"status": "handled"} async def on_session_end(event, data): hook_calls.append(("end", data.get("session_id"))) print(f"\n[HOOK] Session ended: {data.get('session_id')}") return {"status": "handled"} async def on_prompt_execute(event, data): hook_calls.append(("prompt", data.get("prompt", "")[:50])) print(f"\n[HOOK] Prompt: {data.get('prompt', '')[:50]}...") return {"status": "handled"} # Register hooks await manager.register_hook({ "name": "session_start_logger", "event": "session.start", "handler": on_session_start, "priority": 10 }) await manager.register_hook({ "name": "session_end_logger", "event": "session.end", "handler": on_session_end, "priority": 10 }) await manager.register_hook({ "name": "prompt_logger", "event": "prompt.execute", "handler": on_prompt_execute, "priority": 5 }) # Create and use session session = await session_manager.create_session("hook-test") # Trigger start hook await manager.trigger_event("session.start", {"session_id": session.id}) await session_manager.start_session(session.id) # Execute prompt (trigger prompt hook) await manager.trigger_event("prompt.execute", { "session_id": session.id, "prompt": "What is 2+2?" }) result = await session_manager.execute_prompt(session.id, "What is 2+2?") # Close session (trigger end hook) await session_manager.close_session(session.id) await manager.trigger_event("session.end", {"session_id": session.id}) # Verify hooks were called print(f"\nHook calls: {hook_calls}") assert ("start", "hook-test") in hook_calls assert ("end", "hook-test") in hook_calls assert any(call[0] == "prompt" for call in hook_calls) @pytest.mark.asyncio async def test_conditional_hooks(self, hooks_setup): """Test hooks with conditions.""" setup = hooks_setup manager = setup["manager"] # Track executions executions = [] # Hook that only runs for specific models async def model_specific_hook(event, data): executions.append(f"model:{data.get('model')}") return {"handled": True} # Hook that only runs for long prompts async def long_prompt_hook(event, data): prompt_len = len(data.get("prompt", "")) executions.append(f"long_prompt:{prompt_len}") return {"handled": True} # Register conditional hooks await manager.register_hook({ "name": "opus_only", "event": "model.select", "handler": model_specific_hook, "conditions": { "model": "claude-3-opus-20240229" } }) await manager.register_hook({ "name": "long_prompts", "event": "prompt.validate", "handler": long_prompt_hook, "conditions": { "min_prompt_length": 100 } }) # Test with different conditions test_events = [ ("model.select", {"model": "claude-3-opus-20240229"}), # Should trigger ("model.select", {"model": "claude-3-sonnet-20240229"}), # Should not trigger ("prompt.validate", {"prompt": "short"}), # Should not trigger ("prompt.validate", {"prompt": "x" * 150}) # Should trigger ] for event_name, event_data in test_events: await manager.trigger_event(event_name, event_data) print(f"\nConditional executions: {executions}") assert "model:claude-3-opus-20240229" in executions assert "model:claude-3-sonnet-20240229" not in executions assert any("long_prompt:150" in ex for ex in executions) assert not any("long_prompt:5" in ex for ex in executions) @pytest.mark.asyncio async def test_hook_chains(self, hooks_setup): """Test chained hook execution.""" setup = hooks_setup manager = setup["manager"] # Create chain of data transformations async def step1_uppercase(event, data): data["text"] = data.get("text", "").upper() print(f"\n[Chain 1] Uppercase: {data['text']}") return {"continue": True, "data": data} async def step2_reverse(event, data): data["text"] = data.get("text", "")[::-1] print(f"\n[Chain 2] Reverse: {data['text']}") return {"continue": True, "data": data} async def step3_add_prefix(event, data): data["text"] = f"PROCESSED: {data.get('text', '')}" print(f"\n[Chain 3] Final: {data['text']}") return {"final": True, "result": data["text"]} # Register hooks in order await manager.register_hook({ "name": "transform_1", "event": "text.process", "handler": step1_uppercase, "priority": 1 }) await manager.register_hook({ "name": "transform_2", "event": "text.process", "handler": step2_reverse, "priority": 2 }) await manager.register_hook({ "name": "transform_3", "event": "text.process", "handler": step3_add_prefix, "priority": 3 }) # Process text through chain results = await manager.trigger_event("text.process", {"text": "hello world"}) # Get final result final_result = None for result in results: if result.get("final"): final_result = result.get("result") print(f"\nFinal result: {final_result}") assert final_result == "PROCESSED: DLROW OLLEH" @pytest.mark.asyncio async def test_hook_error_handling(self, hooks_setup): """Test hook error recovery.""" setup = hooks_setup manager = setup["manager"] # Hooks with different error behaviors async def failing_hook(event, data): raise Exception("Intentional failure") async def recovery_hook(event, data): print(f"\n[Recovery] Handling after failure") return {"recovered": True} # Register hooks await manager.register_hook({ "name": "will_fail", "event": "error.test", "handler": failing_hook, "priority": 1, "error_handler": "continue" # Continue to next hook }) await manager.register_hook({ "name": "will_recover", "event": "error.test", "handler": recovery_hook, "priority": 2 }) # Trigger event results = await manager.trigger_event("error.test", {"test": True}) # Should have error and recovery assert any(r.get("error") for r in results) assert any(r.get("recovered") for r in results) class TestCommandSystem: """Test command system functionality.""" @pytest.fixture async def command_setup(self): """Set up command system.""" parser = CommandParser() registry = CommandRegistry() executor = CommandExecutor(registry) # Set up session manager binary_manager = BinaryManager() binaries = await binary_manager.discover_binaries() if not binaries: pytest.skip("No Claude Code binary found") session_manager = SessionManager(binary_manager=binary_manager) yield { "parser": parser, "registry": registry, "executor": executor, "session_manager": session_manager } @pytest.mark.asyncio async def test_command_registration_execution(self, command_setup): """Test registering and executing commands.""" setup = command_setup registry = setup["registry"] executor = setup["executor"] # Track command executions executions = [] # Register test commands async def hello_command(args): name = args.get("name", "World") greeting = f"Hello, {name}!" executions.append(("hello", greeting)) return {"message": greeting} async def calc_command(args): op = args.get("operation", "add") a = args.get("a", 0) b = args.get("b", 0) if op == "add": result = a + b elif op == "multiply": result = a * b else: result = 0 executions.append(("calc", result)) return {"result": result, "operation": op} # Register commands await registry.register({ "name": "hello", "description": "Greet someone", "handler": hello_command, "args": [ {"name": "name", "type": "string", "default": "World"} ] }) await registry.register({ "name": "calc", "description": "Perform calculations", "handler": calc_command, "args": [ {"name": "operation", "type": "string", "choices": ["add", "multiply"]}, {"name": "a", "type": "number", "required": True}, {"name": "b", "type": "number", "required": True} ] }) # Execute commands test_commands = [ ("/hello", {}), ("/hello --name Alice", {"name": "Alice"}), ("/calc --operation add --a 5 --b 3", {"operation": "add", "a": 5, "b": 3}), ("/calc --operation multiply --a 4 --b 7", {"operation": "multiply", "a": 4, "b": 7}) ] for cmd_str, expected_args in test_commands: parsed = setup["parser"].parse(cmd_str) result = await executor.execute(parsed["command"], parsed.get("args", {})) print(f"\nCommand: {cmd_str}") print(f"Result: {result}") # Verify executions assert ("hello", "Hello, World!") in executions assert ("hello", "Hello, Alice!") in executions assert ("calc", 8) in executions # 5 + 3 assert ("calc", 28) in executions # 4 * 7 @pytest.mark.asyncio async def test_command_validation(self, command_setup): """Test command argument validation.""" setup = command_setup registry = setup["registry"] executor = setup["executor"] parser = setup["parser"] # Register command with strict validation async def validated_command(args): return {"received": args} await registry.register({ "name": "strict", "handler": validated_command, "args": [ { "name": "age", "type": "integer", "min": 0, "max": 150, "required": True }, { "name": "email", "type": "string", "pattern": r"^[^@]+@[^@]+\.[^@]+$", "required": True }, { "name": "role", "type": "string", "choices": ["admin", "user", "guest"], "default": "user" } ] }) # Test valid command valid_cmd = "/strict --age 25 --email test@example.com --role admin" parsed = parser.parse(valid_cmd) result = await executor.execute(parsed["command"], parsed["args"]) assert result["received"]["age"] == 25 assert result["received"]["email"] == "test@example.com" assert result["received"]["role"] == "admin" # Test invalid commands invalid_commands = [ "/strict --age 200 --email test@example.com", # Age out of range "/strict --age 25 --email invalid-email", # Invalid email "/strict --age 25 --email test@example.com --role superuser", # Invalid role "/strict --email test@example.com", # Missing required age ] for invalid_cmd in invalid_commands: parsed = parser.parse(invalid_cmd) with pytest.raises(Exception) as exc_info: await executor.execute(parsed["command"], parsed.get("args", {})) print(f"\nValidation error for '{invalid_cmd}': {exc_info.value}") assert "validation" in str(exc_info.value).lower() or "invalid" in str(exc_info.value).lower() @pytest.mark.asyncio async def test_command_aliases(self, command_setup): """Test command aliases.""" setup = command_setup registry = setup["registry"] executor = setup["executor"] parser = setup["parser"] # Register command with aliases async def status_command(args): verbose = args.get("verbose", False) return { "status": "running", "details": "All systems operational" if verbose else None } await registry.register({ "name": "status", "aliases": ["st", "stat", "info"], "handler": status_command, "args": [ {"name": "verbose", "type": "boolean", "short": "v"} ] }) # Test all aliases aliases_to_test = ["/status", "/st", "/stat", "/info"] for alias in aliases_to_test: parsed = parser.parse(f"{alias} --verbose") result = await executor.execute_by_alias( parsed["command"], parsed.get("args", {}) ) print(f"\nAlias '{alias}' result: {result}") assert result["status"] == "running" assert result["details"] is not None @pytest.mark.asyncio async def test_command_pipeline(self, command_setup): """Test command pipelines.""" setup = command_setup registry = setup["registry"] executor = setup["executor"] # Register pipeline commands async def generate_data(args): count = args.get("count", 5) return {"data": list(range(1, count + 1))} async def transform_data(args): data = args.get("input", []) operation = args.get("op", "double") if operation == "double": result = [x * 2 for x in data] elif operation == "square": result = [x * x for x in data] else: result = data return {"data": result} async def summarize_data(args): data = args.get("input", []) return { "count": len(data), "sum": sum(data), "average": sum(data) / len(data) if data else 0 } # Register commands for cmd_def in [ ("generate", generate_data), ("transform", transform_data), ("summarize", summarize_data) ]: await registry.register({ "name": cmd_def[0], "handler": cmd_def[1], "supports_pipeline": True }) # Execute pipeline pipeline = [ {"command": "generate", "args": {"count": 10}}, {"command": "transform", "args": {"op": "square"}}, {"command": "summarize", "args": {}} ] # Run pipeline current_output = None for i, step in enumerate(pipeline): # Pass previous output as input if current_output and "data" in current_output: step["args"]["input"] = current_output["data"] current_output = await executor.execute( step["command"], step["args"] ) print(f"\nPipeline step {i+1} ({step['command']}):") print(f" Output: {current_output}") # Verify pipeline result # Generated 1-10, squared them, then summarized assert current_output["count"] == 10 assert current_output["sum"] == sum(x*x for x in range(1, 11)) # 385 assert current_output["average"] == 38.5

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server