Skip to main content
Glama
hook_validation_agent.py35.6 kB
#!/usr/bin/env python3 """ Hook Validation Test Agent for MCP Integration Validates that MCP hook modifications persist in user scope and that hook executions properly affect the host system. """ import os import sys import json import asyncio import subprocess import hashlib from pathlib import Path from typing import Dict, Any, List, Optional, Tuple from datetime import datetime import logging # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent)) from test_agent_base import TestAgentBase logger = logging.getLogger(__name__) class HookValidationAgent(TestAgentBase): """Test agent for validating hook system persistence and execution.""" def __init__(self): super().__init__( name="HookValidationAgent", description="Validates MCP hook modifications, persistence, and execution" ) self.hooks_dir = self.test_base_dir / "test-hooks" self.global_hooks_dir = Path.home() / ".shannon-mcp" / "hooks" self.created_hooks = [] self.marker_files = [] async def validate_prerequisites(self) -> bool: """Validate hook system test prerequisites.""" try: # Ensure hooks directory exists self.hooks_dir.mkdir(parents=True, exist_ok=True) # Check if we can create executable scripts test_script = self.hooks_dir / ".prereq-test.sh" test_script.write_text("#!/bin/bash\necho 'test'") test_script.chmod(0o755) # Test execution result = subprocess.run( [str(test_script)], capture_output=True, text=True ) test_script.unlink() return result.returncode == 0 and result.stdout.strip() == "test" except Exception as e: logger.error(f"Prerequisites validation failed: {e}") return False async def execute_test_scenarios(self) -> List[Dict[str, Any]]: """ Execute comprehensive hook system test scenarios. Tests: 1. Hook registration through MCP 2. Hook execution validation 3. Global hook persistence 4. Hook modification and updates 5. Hook chain execution 6. Conditional hook triggers 7. Hook security and sandboxing 8. User scope reflection """ test_results = [] # Test 1: Hook Registration logger.info("Test 1: Hook registration through MCP") result = await self._test_hook_registration() test_results.append(result) # Test 2: Hook Execution logger.info("Test 2: Hook execution validation") result = await self._test_hook_execution() test_results.append(result) # Test 3: Global Hook Persistence logger.info("Test 3: Global hook persistence") result = await self._test_global_hook_persistence() test_results.append(result) # Test 4: Hook Modification logger.info("Test 4: Hook modification and updates") result = await self._test_hook_modification() test_results.append(result) # Test 5: Hook Chain Execution logger.info("Test 5: Hook chain execution") result = await self._test_hook_chain() test_results.append(result) # Test 6: Conditional Hooks logger.info("Test 6: Conditional hook triggers") result = await self._test_conditional_hooks() test_results.append(result) # Test 7: Hook Security logger.info("Test 7: Hook security and sandboxing") result = await self._test_hook_security() test_results.append(result) # Test 8: User Scope Reflection logger.info("Test 8: User scope reflection") result = await self._test_user_scope_reflection() test_results.append(result) self.test_results = test_results return test_results async def _test_hook_registration(self) -> Dict[str, Any]: """Test hook registration through MCP.""" test_name = "hook_registration" try: # Create a test hook script hook_name = "test-registration-hook" hook_script = self.hooks_dir / f"{hook_name}.sh" marker_file = Path(f"/tmp/mcp-hook-{hook_name}-{datetime.now().strftime('%Y%m%d%H%M%S')}.marker") hook_content = f"""#!/bin/bash # Test hook for registration validation echo "Hook executed: {hook_name}" > "{marker_file}" echo "Session: $SESSION_ID" >> "{marker_file}" echo "Event: $EVENT_TYPE" >> "{marker_file}" echo "Timestamp: $(date)" >> "{marker_file}" """ hook_script.write_text(hook_content) hook_script.chmod(0o755) self.created_hooks.append(hook_script) self.marker_files.append(marker_file) # Register hook through MCP register_result = await self.execute_mcp_operation( "register_hook", { "name": hook_name, "path": str(hook_script), "events": ["session.start", "session.end"], "description": "Test hook for validation" } ) # Verify hook is registered list_result = await self.execute_mcp_operation( "list_hooks", {} ) hook_found = False if list_result["success"]: hooks = list_result.get("result", {}).get("hooks", []) hook_found = any(h.get("name") == hook_name for h in hooks) # Get hook details detail_result = await self.execute_mcp_operation( "get_hook", {"name": hook_name} ) details_valid = ( detail_result["success"] and detail_result.get("result", {}).get("path") == str(hook_script) ) return { "test": test_name, "registration_success": register_result["success"], "hook_listed": hook_found, "details_valid": details_valid, "passed": register_result["success"] and hook_found and details_valid, "details": { "hook_name": hook_name, "hook_path": str(hook_script), "marker_file": str(marker_file) } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_hook_execution(self) -> Dict[str, Any]: """Test hook execution through MCP events.""" test_name = "hook_execution" try: # Create execution test hook hook_name = "test-execution-hook" hook_script = self.hooks_dir / f"{hook_name}.py" marker_file = Path(f"/tmp/mcp-exec-{datetime.now().strftime('%Y%m%d%H%M%S')}.json") hook_content = f"""#!/usr/bin/env python3 import os import json from datetime import datetime # Hook execution test result = {{ "executed": True, "timestamp": datetime.now().isoformat(), "session_id": os.environ.get("SESSION_ID", "unknown"), "event_type": os.environ.get("EVENT_TYPE", "unknown"), "hook_name": "{hook_name}", "environment": dict(os.environ) }} with open("{marker_file}", "w") as f: json.dump(result, f, indent=2) print(f"Hook executed successfully: {marker_file}") """ hook_script.write_text(hook_content) hook_script.chmod(0o755) self.created_hooks.append(hook_script) self.marker_files.append(marker_file) # Register hook register_result = await self.execute_mcp_operation( "register_hook", { "name": hook_name, "path": str(hook_script), "events": ["test.event"], "language": "python" } ) if not register_result["success"]: return { "test": test_name, "passed": False, "error": "Failed to register hook" } # Trigger hook execution trigger_result = await self.execute_mcp_operation( "trigger_event", { "event": "test.event", "data": {"test": "execution validation"} } ) # Wait for hook execution await asyncio.sleep(1) # Validate execution hook_executed = marker_file.exists() execution_data = None if hook_executed: with open(marker_file) as f: execution_data = json.load(f) execution_valid = ( hook_executed and execution_data and execution_data.get("executed") and execution_data.get("event_type") == "test.event" ) return { "test": test_name, "registration_success": register_result["success"], "trigger_success": trigger_result["success"], "hook_executed": hook_executed, "execution_valid": execution_valid, "passed": all([ register_result["success"], trigger_result["success"], execution_valid ]), "details": { "hook_name": hook_name, "marker_file": str(marker_file), "execution_data": execution_data } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_global_hook_persistence(self) -> Dict[str, Any]: """Test global hook persistence in user scope.""" test_name = "global_hook_persistence" try: # Create global hook hook_name = "global-persistent-hook" global_hook = self.global_hooks_dir / f"{hook_name}.sh" self.global_hooks_dir.mkdir(parents=True, exist_ok=True) hook_content = f"""#!/bin/bash # Global hook for persistence testing echo "Global hook executed at $(date)" >> "$HOME/.shannon-mcp-global-hook.log" echo "Session: $SESSION_ID" >> "$HOME/.shannon-mcp-global-hook.log" """ # Register as global hook through MCP register_result = await self.execute_mcp_operation( "register_hook", { "name": hook_name, "content": hook_content, # Provide content directly "events": ["session.start"], "scope": "global", "persist": True } ) # Verify hook file was created hook_created = global_hook.exists() or register_result["success"] if global_hook.exists(): self.created_hooks.append(global_hook) # Simulate MCP restart by clearing and re-listing hooks await asyncio.sleep(0.5) # List hooks after "restart" list_result = await self.execute_mcp_operation( "list_hooks", {"scope": "global"} ) hook_persisted = False if list_result["success"]: hooks = list_result.get("result", {}).get("hooks", []) hook_persisted = any( h.get("name") == hook_name and h.get("scope") == "global" for h in hooks ) # Check hook metadata persistence meta_result = await self.execute_mcp_operation( "get_hook_metadata", { "name": hook_name, "scope": "global" } ) metadata_valid = ( meta_result["success"] and meta_result.get("result", {}).get("persist") == True ) return { "test": test_name, "registration_success": register_result["success"], "hook_created": hook_created, "hook_persisted": hook_persisted, "metadata_valid": metadata_valid, "passed": all([ register_result["success"], hook_persisted ]), "details": { "hook_name": hook_name, "global_path": str(global_hook), "persistence_validated": hook_persisted } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_hook_modification(self) -> Dict[str, Any]: """Test hook modification and updates.""" test_name = "hook_modification" try: # Create initial hook hook_name = "test-modifiable-hook" hook_script = self.hooks_dir / f"{hook_name}.sh" initial_content = f"""#!/bin/bash echo "Initial version" > /tmp/hook-mod-test.txt """ hook_script.write_text(initial_content) hook_script.chmod(0o755) self.created_hooks.append(hook_script) # Register hook register_result = await self.execute_mcp_operation( "register_hook", { "name": hook_name, "path": str(hook_script), "events": ["test.modify"] } ) # Get initial hook hash initial_hash = hashlib.sha256(initial_content.encode()).hexdigest() # Modify hook content modified_content = f"""#!/bin/bash echo "Modified version" > /tmp/hook-mod-test.txt echo "Timestamp: $(date)" >> /tmp/hook-mod-test.txt """ # Update hook through MCP update_result = await self.execute_mcp_operation( "update_hook", { "name": hook_name, "content": modified_content } ) # Verify file was modified file_modified = False new_hash = "" if hook_script.exists(): actual_content = hook_script.read_text() new_hash = hashlib.sha256(actual_content.encode()).hexdigest() file_modified = new_hash != initial_hash # Trigger to test new behavior trigger_result = await self.execute_mcp_operation( "trigger_event", {"event": "test.modify"} ) await asyncio.sleep(0.5) # Check if modified version executed test_file = Path("/tmp/hook-mod-test.txt") modified_executed = ( test_file.exists() and "Modified version" in test_file.read_text() ) if test_file.exists(): self.marker_files.append(test_file) return { "test": test_name, "registration_success": register_result["success"], "update_success": update_result["success"], "file_modified": file_modified, "modified_executed": modified_executed, "passed": all([ register_result["success"], update_result["success"], file_modified, modified_executed ]), "details": { "hook_name": hook_name, "initial_hash": initial_hash[:8], "new_hash": new_hash[:8], "modification_applied": file_modified } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_hook_chain(self) -> Dict[str, Any]: """Test hook chain execution.""" test_name = "hook_chain" try: # Create chain of hooks chain_hooks = [] chain_markers = [] for i in range(3): hook_name = f"chain-hook-{i}" hook_script = self.hooks_dir / f"{hook_name}.sh" marker = Path(f"/tmp/chain-{i}-{datetime.now().strftime('%Y%m%d%H%M%S')}.marker") hook_content = f"""#!/bin/bash echo "Chain hook {i} executed" > "{marker}" if [ {i} -lt 2 ]; then echo "Triggering next hook..." fi """ hook_script.write_text(hook_content) hook_script.chmod(0o755) self.created_hooks.append(hook_script) self.marker_files.append(marker) chain_hooks.append((hook_name, hook_script)) chain_markers.append(marker) # Register hook await self.execute_mcp_operation( "register_hook", { "name": hook_name, "path": str(hook_script), "events": ["chain.test"], "priority": i # Ensure execution order } ) # Trigger chain trigger_result = await self.execute_mcp_operation( "trigger_event", {"event": "chain.test"} ) await asyncio.sleep(1) # Verify all hooks executed in order all_executed = all(m.exists() for m in chain_markers) # Check execution order by timestamps ordered_execution = True if all_executed: timestamps = [] for marker in chain_markers: stat = marker.stat() timestamps.append(stat.st_mtime) ordered_execution = timestamps == sorted(timestamps) return { "test": test_name, "trigger_success": trigger_result["success"], "all_hooks_executed": all_executed, "ordered_execution": ordered_execution, "passed": trigger_result["success"] and all_executed and ordered_execution, "details": { "chain_length": len(chain_hooks), "execution_verified": all_executed, "order_verified": ordered_execution } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_conditional_hooks(self) -> Dict[str, Any]: """Test conditional hook triggers.""" test_name = "conditional_hooks" try: # Create conditional hook hook_name = "conditional-test-hook" hook_script = self.hooks_dir / f"{hook_name}.py" marker_file = Path(f"/tmp/conditional-{datetime.now().strftime('%Y%m%d%H%M%S')}.marker") hook_content = f"""#!/usr/bin/env python3 import os import json import sys # Get event data event_data = os.environ.get('EVENT_DATA', '{{}}') try: data = json.loads(event_data) except: data = {{}} # Conditional execution if data.get('condition') == 'execute': with open('{marker_file}', 'w') as f: f.write(f"Conditional hook executed: {{data}}\\n") print("Condition met, hook executed") else: print("Condition not met, skipping") sys.exit(0) """ hook_script.write_text(hook_content) hook_script.chmod(0o755) self.created_hooks.append(hook_script) self.marker_files.append(marker_file) # Register conditional hook register_result = await self.execute_mcp_operation( "register_hook", { "name": hook_name, "path": str(hook_script), "events": ["conditional.test"], "conditions": { "data.condition": "execute" } } ) # Test 1: Trigger with condition not met trigger1_result = await self.execute_mcp_operation( "trigger_event", { "event": "conditional.test", "data": {"condition": "skip"} } ) await asyncio.sleep(0.5) condition_not_met = not marker_file.exists() # Test 2: Trigger with condition met trigger2_result = await self.execute_mcp_operation( "trigger_event", { "event": "conditional.test", "data": {"condition": "execute"} } ) await asyncio.sleep(0.5) condition_met = marker_file.exists() return { "test": test_name, "registration_success": register_result["success"], "condition_not_met_correct": condition_not_met, "condition_met_correct": condition_met, "passed": all([ register_result["success"], condition_not_met, condition_met ]), "details": { "hook_name": hook_name, "conditional_execution_works": condition_not_met and condition_met } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_hook_security(self) -> Dict[str, Any]: """Test hook security and sandboxing.""" test_name = "hook_security" try: # Test 1: Restricted command hook (should fail) malicious_hook_name = "malicious-test-hook" malicious_hook = self.hooks_dir / f"{malicious_hook_name}.sh" malicious_content = """#!/bin/bash # Attempt restricted operations rm -rf /etc/passwd 2>/dev/null cat /etc/shadow 2>/dev/null """ malicious_hook.write_text(malicious_content) malicious_hook.chmod(0o755) self.created_hooks.append(malicious_hook) # Try to register malicious hook malicious_result = await self.execute_mcp_operation( "register_hook", { "name": malicious_hook_name, "path": str(malicious_hook), "events": ["security.test"] } ) # Should be blocked or sandboxed malicious_blocked = ( not malicious_result["success"] or malicious_result.get("result", {}).get("sandboxed", False) ) # Test 2: Resource limit hook resource_hook_name = "resource-test-hook" resource_hook = self.hooks_dir / f"{resource_hook_name}.py" resource_content = f"""#!/usr/bin/env python3 import time import os # Attempt to consume resources start = time.time() memory_hog = [] try: # Try to allocate lots of memory for i in range(1000): memory_hog.append(' ' * (1024 * 1024)) # 1MB strings if time.time() - start > 5: # 5 second limit break except: pass with open('/tmp/resource-test.marker', 'w') as f: f.write(f"Resource test completed in {{time.time() - start:.2f}}s\\n") """ resource_hook.write_text(resource_content) resource_hook.chmod(0o755) self.created_hooks.append(resource_hook) # Register resource hook resource_result = await self.execute_mcp_operation( "register_hook", { "name": resource_hook_name, "path": str(resource_hook), "events": ["resource.test"], "timeout": 2 # 2 second timeout } ) # Trigger resource hook if resource_result["success"]: trigger_result = await self.execute_mcp_operation( "trigger_event", {"event": "resource.test"} ) await asyncio.sleep(3) # Check if it was terminated marker = Path("/tmp/resource-test.marker") resource_limited = not marker.exists() or ( marker.exists() and float(marker.read_text().split()[4].rstrip('s')) < 2.5 ) if marker.exists(): self.marker_files.append(marker) else: resource_limited = True # Test 3: Path restriction path_hook_name = "path-test-hook" path_result = await self.execute_mcp_operation( "register_hook", { "name": path_hook_name, "content": "#!/bin/bash\ntouch /etc/test-file", "events": ["path.test"] } ) path_restricted = not path_result["success"] or not Path("/etc/test-file").exists() return { "test": test_name, "malicious_blocked": malicious_blocked, "resource_limited": resource_limited, "path_restricted": path_restricted, "passed": all([ malicious_blocked, resource_limited, path_restricted ]), "details": { "security_enforced": malicious_blocked, "resource_limits_work": resource_limited, "path_restrictions_work": path_restricted } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_user_scope_reflection(self) -> Dict[str, Any]: """Test user scope reflection for hooks.""" test_name = "user_scope_reflection" try: # Create user-scoped hook user_hook_name = "user-scope-test" user_marker = Path.home() / ".shannon-mcp-user-hook-test.marker" self.marker_files.append(user_marker) # Register user-scoped hook register_result = await self.execute_mcp_operation( "register_hook", { "name": user_hook_name, "content": f"""#!/bin/bash echo "User hook executed at $(date)" > "{user_marker}" echo "User: $(whoami)" >> "{user_marker}" echo "Home: $HOME" >> "{user_marker}" """, "events": ["user.test"], "scope": "user", "persist": True } ) # Trigger user hook trigger_result = await self.execute_mcp_operation( "trigger_event", {"event": "user.test"} ) await asyncio.sleep(0.5) # Verify execution in user scope user_execution = user_marker.exists() correct_user = False if user_execution: content = user_marker.read_text() correct_user = ( os.environ.get("USER") in content or os.environ.get("USERNAME") in content ) # Check persistence in user config user_config = Path.home() / ".shannon-mcp" / "hooks.json" config_exists = user_config.exists() hook_in_config = False if config_exists: with open(user_config) as f: config = json.load(f) hooks = config.get("hooks", []) hook_in_config = any(h.get("name") == user_hook_name for h in hooks) # Verify hook survives in new session context new_session_result = await self.execute_mcp_operation( "list_hooks", {"scope": "user"} ) persisted_in_session = False if new_session_result["success"]: hooks = new_session_result.get("result", {}).get("hooks", []) persisted_in_session = any( h.get("name") == user_hook_name and h.get("scope") == "user" for h in hooks ) return { "test": test_name, "registration_success": register_result["success"], "user_execution": user_execution, "correct_user_scope": correct_user, "config_persistence": hook_in_config or register_result["success"], "session_persistence": persisted_in_session or register_result["success"], "passed": all([ register_result["success"], user_execution or trigger_result["success"], correct_user or not user_execution ]), "details": { "user_marker": str(user_marker), "user_config": str(user_config), "scope_validated": correct_user } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def validate_system_state(self) -> bool: """Validate system state after hook tests.""" try: # Check for orphaned hook processes result = subprocess.run( ["pgrep", "-f", "shannon-mcp.*hook"], capture_output=True, text=True ) orphaned_processes = len(result.stdout.strip().split('\n')) if result.stdout.strip() else 0 if orphaned_processes > 0: logger.warning(f"Found {orphaned_processes} orphaned hook processes") return False # Verify no sensitive files were modified sensitive_paths = [ "/etc/passwd", "/etc/shadow", "/etc/sudoers" ] for path in sensitive_paths: if Path(path).exists(): # Check modification time stat = Path(path).stat() if stat.st_mtime > self.start_time.timestamp(): logger.error(f"Sensitive file modified: {path}") return False # Verify hook directory not excessively large if self.hooks_dir.exists(): total_size = sum( f.stat().st_size for f in self.hooks_dir.rglob("*") if f.is_file() ) if total_size > 10 * 1024 * 1024: # 10MB limit logger.warning(f"Hooks directory too large: {total_size / (1024*1024):.2f}MB") return False return True except Exception as e: logger.error(f"System state validation failed: {e}") return False async def cleanup(self): """Clean up hook test artifacts.""" logger.info("Cleaning up hook test artifacts") # Unregister test hooks through MCP for hook in self.created_hooks: hook_name = hook.stem try: await self.execute_mcp_operation( "unregister_hook", {"name": hook_name} ) except: pass # Remove hook files for hook in self.created_hooks: try: if hook.exists(): hook.unlink() except Exception as e: logger.warning(f"Failed to remove hook {hook}: {e}") # Clean marker files for marker in self.marker_files: try: if marker.exists(): marker.unlink() except Exception as e: logger.warning(f"Failed to remove marker {marker}: {e}") # Clean test directory if self.hooks_dir.exists(): try: import shutil shutil.rmtree(self.hooks_dir) except Exception as e: logger.warning(f"Failed to clean hooks directory: {e}") async def main(): """Run the hook validation test agent.""" agent = HookValidationAgent() result = await agent.run() # Print summary print("\n" + "="*60) print("Hook Validation Test Agent Results") print("="*60) print(json.dumps(result, indent=2)) # Exit with appropriate code sys.exit(0 if result.get("summary", {}).get("status") == "PASSED" else 1) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server