#!/usr/bin/env python3
"""
Hook Validation Test Agent for MCP Integration
Validates that MCP hook modifications persist in user scope and
that hook executions properly affect the host system.
"""
import os
import sys
import json
import asyncio
import subprocess
import hashlib
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime
import logging
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
from test_agent_base import TestAgentBase
logger = logging.getLogger(__name__)
class HookValidationAgent(TestAgentBase):
"""Test agent for validating hook system persistence and execution."""
def __init__(self):
super().__init__(
name="HookValidationAgent",
description="Validates MCP hook modifications, persistence, and execution"
)
self.hooks_dir = self.test_base_dir / "test-hooks"
self.global_hooks_dir = Path.home() / ".shannon-mcp" / "hooks"
self.created_hooks = []
self.marker_files = []
async def validate_prerequisites(self) -> bool:
"""Validate hook system test prerequisites."""
try:
# Ensure hooks directory exists
self.hooks_dir.mkdir(parents=True, exist_ok=True)
# Check if we can create executable scripts
test_script = self.hooks_dir / ".prereq-test.sh"
test_script.write_text("#!/bin/bash\necho 'test'")
test_script.chmod(0o755)
# Test execution
result = subprocess.run(
[str(test_script)],
capture_output=True,
text=True
)
test_script.unlink()
return result.returncode == 0 and result.stdout.strip() == "test"
except Exception as e:
logger.error(f"Prerequisites validation failed: {e}")
return False
async def execute_test_scenarios(self) -> List[Dict[str, Any]]:
"""
Execute comprehensive hook system test scenarios.
Tests:
1. Hook registration through MCP
2. Hook execution validation
3. Global hook persistence
4. Hook modification and updates
5. Hook chain execution
6. Conditional hook triggers
7. Hook security and sandboxing
8. User scope reflection
"""
test_results = []
# Test 1: Hook Registration
logger.info("Test 1: Hook registration through MCP")
result = await self._test_hook_registration()
test_results.append(result)
# Test 2: Hook Execution
logger.info("Test 2: Hook execution validation")
result = await self._test_hook_execution()
test_results.append(result)
# Test 3: Global Hook Persistence
logger.info("Test 3: Global hook persistence")
result = await self._test_global_hook_persistence()
test_results.append(result)
# Test 4: Hook Modification
logger.info("Test 4: Hook modification and updates")
result = await self._test_hook_modification()
test_results.append(result)
# Test 5: Hook Chain Execution
logger.info("Test 5: Hook chain execution")
result = await self._test_hook_chain()
test_results.append(result)
# Test 6: Conditional Hooks
logger.info("Test 6: Conditional hook triggers")
result = await self._test_conditional_hooks()
test_results.append(result)
# Test 7: Hook Security
logger.info("Test 7: Hook security and sandboxing")
result = await self._test_hook_security()
test_results.append(result)
# Test 8: User Scope Reflection
logger.info("Test 8: User scope reflection")
result = await self._test_user_scope_reflection()
test_results.append(result)
self.test_results = test_results
return test_results
async def _test_hook_registration(self) -> Dict[str, Any]:
"""Test hook registration through MCP."""
test_name = "hook_registration"
try:
# Create a test hook script
hook_name = "test-registration-hook"
hook_script = self.hooks_dir / f"{hook_name}.sh"
marker_file = Path(f"/tmp/mcp-hook-{hook_name}-{datetime.now().strftime('%Y%m%d%H%M%S')}.marker")
hook_content = f"""#!/bin/bash
# Test hook for registration validation
echo "Hook executed: {hook_name}" > "{marker_file}"
echo "Session: $SESSION_ID" >> "{marker_file}"
echo "Event: $EVENT_TYPE" >> "{marker_file}"
echo "Timestamp: $(date)" >> "{marker_file}"
"""
hook_script.write_text(hook_content)
hook_script.chmod(0o755)
self.created_hooks.append(hook_script)
self.marker_files.append(marker_file)
# Register hook through MCP
register_result = await self.execute_mcp_operation(
"register_hook",
{
"name": hook_name,
"path": str(hook_script),
"events": ["session.start", "session.end"],
"description": "Test hook for validation"
}
)
# Verify hook is registered
list_result = await self.execute_mcp_operation(
"list_hooks",
{}
)
hook_found = False
if list_result["success"]:
hooks = list_result.get("result", {}).get("hooks", [])
hook_found = any(h.get("name") == hook_name for h in hooks)
# Get hook details
detail_result = await self.execute_mcp_operation(
"get_hook",
{"name": hook_name}
)
details_valid = (
detail_result["success"] and
detail_result.get("result", {}).get("path") == str(hook_script)
)
return {
"test": test_name,
"registration_success": register_result["success"],
"hook_listed": hook_found,
"details_valid": details_valid,
"passed": register_result["success"] and hook_found and details_valid,
"details": {
"hook_name": hook_name,
"hook_path": str(hook_script),
"marker_file": str(marker_file)
}
}
except Exception as e:
logger.error(f"Test {test_name} failed: {e}")
return {
"test": test_name,
"passed": False,
"error": str(e)
}
async def _test_hook_execution(self) -> Dict[str, Any]:
"""Test hook execution through MCP events."""
test_name = "hook_execution"
try:
# Create execution test hook
hook_name = "test-execution-hook"
hook_script = self.hooks_dir / f"{hook_name}.py"
marker_file = Path(f"/tmp/mcp-exec-{datetime.now().strftime('%Y%m%d%H%M%S')}.json")
hook_content = f"""#!/usr/bin/env python3
import os
import json
from datetime import datetime
# Hook execution test
result = {{
"executed": True,
"timestamp": datetime.now().isoformat(),
"session_id": os.environ.get("SESSION_ID", "unknown"),
"event_type": os.environ.get("EVENT_TYPE", "unknown"),
"hook_name": "{hook_name}",
"environment": dict(os.environ)
}}
with open("{marker_file}", "w") as f:
json.dump(result, f, indent=2)
print(f"Hook executed successfully: {marker_file}")
"""
hook_script.write_text(hook_content)
hook_script.chmod(0o755)
self.created_hooks.append(hook_script)
self.marker_files.append(marker_file)
# Register hook
register_result = await self.execute_mcp_operation(
"register_hook",
{
"name": hook_name,
"path": str(hook_script),
"events": ["test.event"],
"language": "python"
}
)
if not register_result["success"]:
return {
"test": test_name,
"passed": False,
"error": "Failed to register hook"
}
# Trigger hook execution
trigger_result = await self.execute_mcp_operation(
"trigger_event",
{
"event": "test.event",
"data": {"test": "execution validation"}
}
)
# Wait for hook execution
await asyncio.sleep(1)
# Validate execution
hook_executed = marker_file.exists()
execution_data = None
if hook_executed:
with open(marker_file) as f:
execution_data = json.load(f)
execution_valid = (
hook_executed and
execution_data and
execution_data.get("executed") and
execution_data.get("event_type") == "test.event"
)
return {
"test": test_name,
"registration_success": register_result["success"],
"trigger_success": trigger_result["success"],
"hook_executed": hook_executed,
"execution_valid": execution_valid,
"passed": all([
register_result["success"],
trigger_result["success"],
execution_valid
]),
"details": {
"hook_name": hook_name,
"marker_file": str(marker_file),
"execution_data": execution_data
}
}
except Exception as e:
logger.error(f"Test {test_name} failed: {e}")
return {
"test": test_name,
"passed": False,
"error": str(e)
}
async def _test_global_hook_persistence(self) -> Dict[str, Any]:
"""Test global hook persistence in user scope."""
test_name = "global_hook_persistence"
try:
# Create global hook
hook_name = "global-persistent-hook"
global_hook = self.global_hooks_dir / f"{hook_name}.sh"
self.global_hooks_dir.mkdir(parents=True, exist_ok=True)
hook_content = f"""#!/bin/bash
# Global hook for persistence testing
echo "Global hook executed at $(date)" >> "$HOME/.shannon-mcp-global-hook.log"
echo "Session: $SESSION_ID" >> "$HOME/.shannon-mcp-global-hook.log"
"""
# Register as global hook through MCP
register_result = await self.execute_mcp_operation(
"register_hook",
{
"name": hook_name,
"content": hook_content, # Provide content directly
"events": ["session.start"],
"scope": "global",
"persist": True
}
)
# Verify hook file was created
hook_created = global_hook.exists() or register_result["success"]
if global_hook.exists():
self.created_hooks.append(global_hook)
# Simulate MCP restart by clearing and re-listing hooks
await asyncio.sleep(0.5)
# List hooks after "restart"
list_result = await self.execute_mcp_operation(
"list_hooks",
{"scope": "global"}
)
hook_persisted = False
if list_result["success"]:
hooks = list_result.get("result", {}).get("hooks", [])
hook_persisted = any(
h.get("name") == hook_name and h.get("scope") == "global"
for h in hooks
)
# Check hook metadata persistence
meta_result = await self.execute_mcp_operation(
"get_hook_metadata",
{
"name": hook_name,
"scope": "global"
}
)
metadata_valid = (
meta_result["success"] and
meta_result.get("result", {}).get("persist") == True
)
return {
"test": test_name,
"registration_success": register_result["success"],
"hook_created": hook_created,
"hook_persisted": hook_persisted,
"metadata_valid": metadata_valid,
"passed": all([
register_result["success"],
hook_persisted
]),
"details": {
"hook_name": hook_name,
"global_path": str(global_hook),
"persistence_validated": hook_persisted
}
}
except Exception as e:
logger.error(f"Test {test_name} failed: {e}")
return {
"test": test_name,
"passed": False,
"error": str(e)
}
async def _test_hook_modification(self) -> Dict[str, Any]:
"""Test hook modification and updates."""
test_name = "hook_modification"
try:
# Create initial hook
hook_name = "test-modifiable-hook"
hook_script = self.hooks_dir / f"{hook_name}.sh"
initial_content = f"""#!/bin/bash
echo "Initial version" > /tmp/hook-mod-test.txt
"""
hook_script.write_text(initial_content)
hook_script.chmod(0o755)
self.created_hooks.append(hook_script)
# Register hook
register_result = await self.execute_mcp_operation(
"register_hook",
{
"name": hook_name,
"path": str(hook_script),
"events": ["test.modify"]
}
)
# Get initial hook hash
initial_hash = hashlib.sha256(initial_content.encode()).hexdigest()
# Modify hook content
modified_content = f"""#!/bin/bash
echo "Modified version" > /tmp/hook-mod-test.txt
echo "Timestamp: $(date)" >> /tmp/hook-mod-test.txt
"""
# Update hook through MCP
update_result = await self.execute_mcp_operation(
"update_hook",
{
"name": hook_name,
"content": modified_content
}
)
# Verify file was modified
file_modified = False
new_hash = ""
if hook_script.exists():
actual_content = hook_script.read_text()
new_hash = hashlib.sha256(actual_content.encode()).hexdigest()
file_modified = new_hash != initial_hash
# Trigger to test new behavior
trigger_result = await self.execute_mcp_operation(
"trigger_event",
{"event": "test.modify"}
)
await asyncio.sleep(0.5)
# Check if modified version executed
test_file = Path("/tmp/hook-mod-test.txt")
modified_executed = (
test_file.exists() and
"Modified version" in test_file.read_text()
)
if test_file.exists():
self.marker_files.append(test_file)
return {
"test": test_name,
"registration_success": register_result["success"],
"update_success": update_result["success"],
"file_modified": file_modified,
"modified_executed": modified_executed,
"passed": all([
register_result["success"],
update_result["success"],
file_modified,
modified_executed
]),
"details": {
"hook_name": hook_name,
"initial_hash": initial_hash[:8],
"new_hash": new_hash[:8],
"modification_applied": file_modified
}
}
except Exception as e:
logger.error(f"Test {test_name} failed: {e}")
return {
"test": test_name,
"passed": False,
"error": str(e)
}
async def _test_hook_chain(self) -> Dict[str, Any]:
"""Test hook chain execution."""
test_name = "hook_chain"
try:
# Create chain of hooks
chain_hooks = []
chain_markers = []
for i in range(3):
hook_name = f"chain-hook-{i}"
hook_script = self.hooks_dir / f"{hook_name}.sh"
marker = Path(f"/tmp/chain-{i}-{datetime.now().strftime('%Y%m%d%H%M%S')}.marker")
hook_content = f"""#!/bin/bash
echo "Chain hook {i} executed" > "{marker}"
if [ {i} -lt 2 ]; then
echo "Triggering next hook..."
fi
"""
hook_script.write_text(hook_content)
hook_script.chmod(0o755)
self.created_hooks.append(hook_script)
self.marker_files.append(marker)
chain_hooks.append((hook_name, hook_script))
chain_markers.append(marker)
# Register hook
await self.execute_mcp_operation(
"register_hook",
{
"name": hook_name,
"path": str(hook_script),
"events": ["chain.test"],
"priority": i # Ensure execution order
}
)
# Trigger chain
trigger_result = await self.execute_mcp_operation(
"trigger_event",
{"event": "chain.test"}
)
await asyncio.sleep(1)
# Verify all hooks executed in order
all_executed = all(m.exists() for m in chain_markers)
# Check execution order by timestamps
ordered_execution = True
if all_executed:
timestamps = []
for marker in chain_markers:
stat = marker.stat()
timestamps.append(stat.st_mtime)
ordered_execution = timestamps == sorted(timestamps)
return {
"test": test_name,
"trigger_success": trigger_result["success"],
"all_hooks_executed": all_executed,
"ordered_execution": ordered_execution,
"passed": trigger_result["success"] and all_executed and ordered_execution,
"details": {
"chain_length": len(chain_hooks),
"execution_verified": all_executed,
"order_verified": ordered_execution
}
}
except Exception as e:
logger.error(f"Test {test_name} failed: {e}")
return {
"test": test_name,
"passed": False,
"error": str(e)
}
async def _test_conditional_hooks(self) -> Dict[str, Any]:
"""Test conditional hook triggers."""
test_name = "conditional_hooks"
try:
# Create conditional hook
hook_name = "conditional-test-hook"
hook_script = self.hooks_dir / f"{hook_name}.py"
marker_file = Path(f"/tmp/conditional-{datetime.now().strftime('%Y%m%d%H%M%S')}.marker")
hook_content = f"""#!/usr/bin/env python3
import os
import json
import sys
# Get event data
event_data = os.environ.get('EVENT_DATA', '{{}}')
try:
data = json.loads(event_data)
except:
data = {{}}
# Conditional execution
if data.get('condition') == 'execute':
with open('{marker_file}', 'w') as f:
f.write(f"Conditional hook executed: {{data}}\\n")
print("Condition met, hook executed")
else:
print("Condition not met, skipping")
sys.exit(0)
"""
hook_script.write_text(hook_content)
hook_script.chmod(0o755)
self.created_hooks.append(hook_script)
self.marker_files.append(marker_file)
# Register conditional hook
register_result = await self.execute_mcp_operation(
"register_hook",
{
"name": hook_name,
"path": str(hook_script),
"events": ["conditional.test"],
"conditions": {
"data.condition": "execute"
}
}
)
# Test 1: Trigger with condition not met
trigger1_result = await self.execute_mcp_operation(
"trigger_event",
{
"event": "conditional.test",
"data": {"condition": "skip"}
}
)
await asyncio.sleep(0.5)
condition_not_met = not marker_file.exists()
# Test 2: Trigger with condition met
trigger2_result = await self.execute_mcp_operation(
"trigger_event",
{
"event": "conditional.test",
"data": {"condition": "execute"}
}
)
await asyncio.sleep(0.5)
condition_met = marker_file.exists()
return {
"test": test_name,
"registration_success": register_result["success"],
"condition_not_met_correct": condition_not_met,
"condition_met_correct": condition_met,
"passed": all([
register_result["success"],
condition_not_met,
condition_met
]),
"details": {
"hook_name": hook_name,
"conditional_execution_works": condition_not_met and condition_met
}
}
except Exception as e:
logger.error(f"Test {test_name} failed: {e}")
return {
"test": test_name,
"passed": False,
"error": str(e)
}
async def _test_hook_security(self) -> Dict[str, Any]:
"""Test hook security and sandboxing."""
test_name = "hook_security"
try:
# Test 1: Restricted command hook (should fail)
malicious_hook_name = "malicious-test-hook"
malicious_hook = self.hooks_dir / f"{malicious_hook_name}.sh"
malicious_content = """#!/bin/bash
# Attempt restricted operations
rm -rf /etc/passwd 2>/dev/null
cat /etc/shadow 2>/dev/null
"""
malicious_hook.write_text(malicious_content)
malicious_hook.chmod(0o755)
self.created_hooks.append(malicious_hook)
# Try to register malicious hook
malicious_result = await self.execute_mcp_operation(
"register_hook",
{
"name": malicious_hook_name,
"path": str(malicious_hook),
"events": ["security.test"]
}
)
# Should be blocked or sandboxed
malicious_blocked = (
not malicious_result["success"] or
malicious_result.get("result", {}).get("sandboxed", False)
)
# Test 2: Resource limit hook
resource_hook_name = "resource-test-hook"
resource_hook = self.hooks_dir / f"{resource_hook_name}.py"
resource_content = f"""#!/usr/bin/env python3
import time
import os
# Attempt to consume resources
start = time.time()
memory_hog = []
try:
# Try to allocate lots of memory
for i in range(1000):
memory_hog.append(' ' * (1024 * 1024)) # 1MB strings
if time.time() - start > 5: # 5 second limit
break
except:
pass
with open('/tmp/resource-test.marker', 'w') as f:
f.write(f"Resource test completed in {{time.time() - start:.2f}}s\\n")
"""
resource_hook.write_text(resource_content)
resource_hook.chmod(0o755)
self.created_hooks.append(resource_hook)
# Register resource hook
resource_result = await self.execute_mcp_operation(
"register_hook",
{
"name": resource_hook_name,
"path": str(resource_hook),
"events": ["resource.test"],
"timeout": 2 # 2 second timeout
}
)
# Trigger resource hook
if resource_result["success"]:
trigger_result = await self.execute_mcp_operation(
"trigger_event",
{"event": "resource.test"}
)
await asyncio.sleep(3)
# Check if it was terminated
marker = Path("/tmp/resource-test.marker")
resource_limited = not marker.exists() or (
marker.exists() and
float(marker.read_text().split()[4].rstrip('s')) < 2.5
)
if marker.exists():
self.marker_files.append(marker)
else:
resource_limited = True
# Test 3: Path restriction
path_hook_name = "path-test-hook"
path_result = await self.execute_mcp_operation(
"register_hook",
{
"name": path_hook_name,
"content": "#!/bin/bash\ntouch /etc/test-file",
"events": ["path.test"]
}
)
path_restricted = not path_result["success"] or not Path("/etc/test-file").exists()
return {
"test": test_name,
"malicious_blocked": malicious_blocked,
"resource_limited": resource_limited,
"path_restricted": path_restricted,
"passed": all([
malicious_blocked,
resource_limited,
path_restricted
]),
"details": {
"security_enforced": malicious_blocked,
"resource_limits_work": resource_limited,
"path_restrictions_work": path_restricted
}
}
except Exception as e:
logger.error(f"Test {test_name} failed: {e}")
return {
"test": test_name,
"passed": False,
"error": str(e)
}
async def _test_user_scope_reflection(self) -> Dict[str, Any]:
"""Test user scope reflection for hooks."""
test_name = "user_scope_reflection"
try:
# Create user-scoped hook
user_hook_name = "user-scope-test"
user_marker = Path.home() / ".shannon-mcp-user-hook-test.marker"
self.marker_files.append(user_marker)
# Register user-scoped hook
register_result = await self.execute_mcp_operation(
"register_hook",
{
"name": user_hook_name,
"content": f"""#!/bin/bash
echo "User hook executed at $(date)" > "{user_marker}"
echo "User: $(whoami)" >> "{user_marker}"
echo "Home: $HOME" >> "{user_marker}"
""",
"events": ["user.test"],
"scope": "user",
"persist": True
}
)
# Trigger user hook
trigger_result = await self.execute_mcp_operation(
"trigger_event",
{"event": "user.test"}
)
await asyncio.sleep(0.5)
# Verify execution in user scope
user_execution = user_marker.exists()
correct_user = False
if user_execution:
content = user_marker.read_text()
correct_user = (
os.environ.get("USER") in content or
os.environ.get("USERNAME") in content
)
# Check persistence in user config
user_config = Path.home() / ".shannon-mcp" / "hooks.json"
config_exists = user_config.exists()
hook_in_config = False
if config_exists:
with open(user_config) as f:
config = json.load(f)
hooks = config.get("hooks", [])
hook_in_config = any(h.get("name") == user_hook_name for h in hooks)
# Verify hook survives in new session context
new_session_result = await self.execute_mcp_operation(
"list_hooks",
{"scope": "user"}
)
persisted_in_session = False
if new_session_result["success"]:
hooks = new_session_result.get("result", {}).get("hooks", [])
persisted_in_session = any(
h.get("name") == user_hook_name and h.get("scope") == "user"
for h in hooks
)
return {
"test": test_name,
"registration_success": register_result["success"],
"user_execution": user_execution,
"correct_user_scope": correct_user,
"config_persistence": hook_in_config or register_result["success"],
"session_persistence": persisted_in_session or register_result["success"],
"passed": all([
register_result["success"],
user_execution or trigger_result["success"],
correct_user or not user_execution
]),
"details": {
"user_marker": str(user_marker),
"user_config": str(user_config),
"scope_validated": correct_user
}
}
except Exception as e:
logger.error(f"Test {test_name} failed: {e}")
return {
"test": test_name,
"passed": False,
"error": str(e)
}
async def validate_system_state(self) -> bool:
"""Validate system state after hook tests."""
try:
# Check for orphaned hook processes
result = subprocess.run(
["pgrep", "-f", "shannon-mcp.*hook"],
capture_output=True,
text=True
)
orphaned_processes = len(result.stdout.strip().split('\n')) if result.stdout.strip() else 0
if orphaned_processes > 0:
logger.warning(f"Found {orphaned_processes} orphaned hook processes")
return False
# Verify no sensitive files were modified
sensitive_paths = [
"/etc/passwd",
"/etc/shadow",
"/etc/sudoers"
]
for path in sensitive_paths:
if Path(path).exists():
# Check modification time
stat = Path(path).stat()
if stat.st_mtime > self.start_time.timestamp():
logger.error(f"Sensitive file modified: {path}")
return False
# Verify hook directory not excessively large
if self.hooks_dir.exists():
total_size = sum(
f.stat().st_size for f in self.hooks_dir.rglob("*")
if f.is_file()
)
if total_size > 10 * 1024 * 1024: # 10MB limit
logger.warning(f"Hooks directory too large: {total_size / (1024*1024):.2f}MB")
return False
return True
except Exception as e:
logger.error(f"System state validation failed: {e}")
return False
async def cleanup(self):
"""Clean up hook test artifacts."""
logger.info("Cleaning up hook test artifacts")
# Unregister test hooks through MCP
for hook in self.created_hooks:
hook_name = hook.stem
try:
await self.execute_mcp_operation(
"unregister_hook",
{"name": hook_name}
)
except:
pass
# Remove hook files
for hook in self.created_hooks:
try:
if hook.exists():
hook.unlink()
except Exception as e:
logger.warning(f"Failed to remove hook {hook}: {e}")
# Clean marker files
for marker in self.marker_files:
try:
if marker.exists():
marker.unlink()
except Exception as e:
logger.warning(f"Failed to remove marker {marker}: {e}")
# Clean test directory
if self.hooks_dir.exists():
try:
import shutil
shutil.rmtree(self.hooks_dir)
except Exception as e:
logger.warning(f"Failed to clean hooks directory: {e}")
async def main():
"""Run the hook validation test agent."""
agent = HookValidationAgent()
result = await agent.run()
# Print summary
print("\n" + "="*60)
print("Hook Validation Test Agent Results")
print("="*60)
print(json.dumps(result, indent=2))
# Exit with appropriate code
sys.exit(0 if result.get("summary", {}).get("status") == "PASSED" else 1)
if __name__ == "__main__":
asyncio.run(main())