#!/usr/bin/env python3
"""
Comprehensive tests for Phase 2 Trigger System implementation.
Run these tests to verify all trigger types work correctly.
"""
import asyncio
import pytest
import time
from unittest.mock import Mock, AsyncMock, patch
from pentest_mcp_server.trigger_system import TriggerSystem, OutputBuffer
from pentest_mcp_server.ssh_manager import SSHManager
from pentest_mcp_server.tmux_manager import TmuxManager
class TestTriggerSystem:
"""Test cases for the enhanced trigger system."""
@pytest.fixture
def ssh_manager(self):
"""Create mock SSH manager."""
ssh_manager = Mock(spec=SSHManager)
ssh_manager.run_command = AsyncMock()
return ssh_manager
@pytest.fixture
def tmux_manager(self):
"""Create mock Tmux manager."""
tmux_manager = Mock(spec=TmuxManager)
tmux_manager.capture_pane = AsyncMock()
tmux_manager.execute_command = AsyncMock()
return tmux_manager
@pytest.fixture
def trigger_system(self, ssh_manager, tmux_manager):
"""Create TriggerSystem instance."""
return TriggerSystem(ssh_manager, tmux_manager)
def test_prompt_detection(self, trigger_system):
"""Test prompt detection with various shell prompts."""
test_cases = [
("root@kali:~# ", True),
("kali@localhost:$ ", True),
("msf6 > ", True),
("(meterpreter) > ", True),
("mysql> ", True),
(">>> ", True),
("In [1]: ", True),
("regular text output", False),
("[sudo] password for kali:", True),
]
for output, expected in test_cases:
result = trigger_system._check_prompt_trigger(output)
assert result["matched"] == expected, f"Failed for: {output}"
print("Prompt detection works for all shell types")
def test_regex_trigger(self, trigger_system):
"""Test regex pattern matching."""
trigger = {"type": "regex", "pattern": "Nmap scan report", "name": "scan_started"}
# Test match in new output
result = trigger_system._check_regex_trigger(
trigger,
"Starting scan...\nNmap scan report for 192.168.1.1",
"Full output"
)
assert result["matched"] == True
assert result["trigger_name"] == "scan_started"
# Test no match
result = trigger_system._check_regex_trigger(
trigger,
"Starting scan...",
"Full output"
)
assert result["matched"] == False
print("Regex trigger detection works")
def test_timeout_trigger(self, trigger_system):
"""Test timeout trigger logic."""
start_time = time.time() - 10 # 10 seconds ago
# Test timeout reached
trigger = {"type": "timeout", "timeout_seconds": 5}
result = trigger_system._check_timeout_trigger(start_time, trigger)
assert result["matched"] == True
assert result["elapsed_seconds"] >= 10
# Test timeout not reached
start_time = time.time() - 2 # 2 seconds ago
trigger = {"type": "timeout", "timeout_seconds": 5}
result = trigger_system._check_timeout_trigger(start_time, trigger)
assert result["matched"] == False
print("Timeout trigger works")
@pytest.mark.asyncio
async def test_file_exists_trigger(self, trigger_system, tmux_manager):
"""Test file existence checking."""
trigger = {"type": "file_exists", "path": "/tmp/test.txt", "name": "file_ready"}
# Mock file exists
tmux_manager.execute_command.return_value = {"status": "sent"}
tmux_manager.capture_pane.return_value = {
"status": "success",
"output": "EXISTS\n1234" # File exists with size 1234
}
result = await trigger_system._check_file_exists_trigger(trigger, "test_session")
assert result["matched"] == True
assert result["file_path"] == "/tmp/test.txt"
assert result["file_size"] == 1234
# Mock file doesn't exist
tmux_manager.capture_pane.return_value = {
"status": "success",
"output": "NOTFOUND"
}
result = await trigger_system._check_file_exists_trigger(trigger, "test_session")
assert result["matched"] == False
print("File exists trigger works")
def test_error_detection(self, trigger_system):
"""Test automatic error detection in output."""
error_cases = [
("Error: connection failed", True),
("Command not found: nmap", True),
("Permission denied", True),
("Connection refused", True),
("No route to host", True),
("Segmentation fault", True),
("Normal output here", False),
("This is fine", False),
]
for output, should_detect_error in error_cases:
result = trigger_system._detect_error_in_output(output)
assert result["error_detected"] == should_detect_error, f"Failed for: {output}"
print("Error detection works for common error patterns")
def test_output_buffer(self):
"""Test efficient output buffer management."""
buffer = OutputBuffer()
# First call - should return all output as new
new, full = buffer.get_new_output("line1\nline2\nline3")
assert new == "line1\nline2\nline3"
assert full == "line1\nline2\nline3"
# Second call with additional lines - should return only new lines
new, full = buffer.get_new_output("line1\nline2\nline3\nline4\nline5")
assert new == "line4\nline5"
assert full == "line1\nline2\nline3\nline4\nline5"
# Third call with same output - should return empty
new, full = buffer.get_new_output("line1\nline2\nline3\nline4\nline5")
assert new == ""
assert full == "line1\nline2\nline3\nline4\nline5"
print("Output buffer management works efficiently")
def test_trigger_validation(self, trigger_system):
"""Test trigger configuration validation."""
# Valid triggers
valid_triggers = [
[{"type": "prompt"}],
[{"type": "regex", "pattern": "test"}],
[{"type": "timeout", "timeout_seconds": 30}],
[{"type": "file_exists", "path": "/tmp/test"}],
]
for triggers in valid_triggers:
is_valid, error = trigger_system.validate_triggers(triggers)
assert is_valid == True, f"Should be valid: {triggers}"
# Invalid triggers
invalid_cases = [
([{"type": "regex"}], "pattern"),
([{"type": "timeout"}], "timeout_seconds"),
([{"type": "file_exists"}], "path"),
([{"type": "unknown_type"}], "unknown"),
]
for triggers, expected_error in invalid_cases:
is_valid, error = trigger_system.validate_triggers(triggers)
assert is_valid == False, f"Should be invalid: {triggers}"
assert expected_error in error.lower()
print("Trigger validation works correctly")
class TestRealWorldScenarios:
"""Test real-world pentesting scenarios with triggers."""
@pytest.fixture
def mock_managers(self):
"""Set up mock managers for integration tests."""
ssh_manager = Mock(spec=SSHManager)
tmux_manager = Mock(spec=TmuxManager)
# Mock successful command execution
tmux_manager.execute_command.return_value = {"status": "sent"}
return ssh_manager, tmux_manager
@pytest.mark.asyncio
async def test_nmap_scan_scenario(self, mock_managers):
"""Test nmap scan with completion trigger."""
ssh_manager, tmux_manager = mock_managers
# Simulate nmap output progression
nmap_outputs = [
"Starting Nmap 7.80...",
"Nmap scan report for 192.168.1.1",
"PORT STATE SERVICE",
"22/tcp open ssh",
"80/tcp open http",
"Nmap done: 1 IP address (1 host up) scanned in 2.34 seconds",
"root@kali:~# " # Final prompt
]
current_output = []
def capture_pane_side_effect(*args, **kwargs):
# Simulate output growing over time
if len(current_output) < len(nmap_outputs):
current_output.append(nmap_outputs[len(current_output)])
return {"status": "success", "output": "\n".join(current_output)}
tmux_manager.capture_pane.side_effect = capture_pane_side_effect
trigger_system = TriggerSystem(ssh_manager, tmux_manager)
# Start monitoring for nmap completion
start_time = time.time()
result = await trigger_system.monitor_session_with_triggers(
session_id="nmap_test",
triggers=[
{"type": "regex", "pattern": "Nmap done", "name": "scan_complete"},
{"type": "timeout", "timeout_seconds": 30}
],
max_timeout=10, # Short timeout for test
poll_interval=0.1 # Fast polling for test
)
assert result["status"] == "trigger_matched"
assert result["trigger"]["trigger_name"] == "scan_complete"
assert "Nmap done" in result["output"]
print("Nmap scan scenario works with completion trigger")
@pytest.mark.asyncio
async def test_metasploit_interactive(self, mock_managers):
"""Test metasploit interactive session with prompt detection."""
ssh_manager, tmux_manager = mock_managers
# Simulate metasploit startup
msf_outputs = [
"Starting Metasploit Framework...",
"msf6 > " # Metasploit prompt
]
current_output = []
def capture_pane_side_effect(*args, **kwargs):
if len(current_output) < len(msf_outputs):
current_output.append(msf_outputs[len(current_output)])
return {"status": "success", "output": "\n".join(current_output)}
tmux_manager.capture_pane.side_effect = capture_pane_side_effect
trigger_system = TriggerSystem(ssh_manager, tmux_manager)
result = await trigger_system.monitor_session_with_triggers(
session_id="msf_test",
triggers=[
{"type": "regex", "pattern": "msf6 >", "name": "msf_ready"},
{"type": "timeout", "timeout_seconds": 10}
],
max_timeout=5,
poll_interval=0.1
)
assert result["status"] == "trigger_matched"
assert result["trigger"]["trigger_name"] == "msf_ready"
assert "msf6 >" in result["output"]
print("Metasploit interactive session works with prompt detection")
async def run_all_tests():
"""Run all trigger system tests."""
print("Running Phase 2 Trigger System Tests...\n")
# Run unit tests
test_system = TestTriggerSystem()
# Create mock managers for the tests
ssh_manager = Mock(spec=SSHManager)
tmux_manager = Mock(spec=TmuxManager)
trigger_system = TriggerSystem(ssh_manager, tmux_manager)
test_system.test_prompt_detection()
test_system.test_regex_trigger()
test_system.test_timeout_trigger()
await test_system.test_file_exists_trigger(trigger_system, tmux_manager)
test_system.test_error_detection()
test_system.test_output_buffer()
test_system.test_trigger_validation()
# Run integration tests
test_scenarios = TestRealWorldScenarios()
await test_scenarios.test_nmap_scan_scenario()
await test_scenarios.test_metasploit_interactive()
print("\nAll Phase 2 tests passed! Trigger system is ready.")
if __name__ == "__main__":
asyncio.run(run_all_tests())