#!/usr/bin/env python3
"""
Basic Network Scanning Example
Demonstrates:
- Session creation
- Command execution with triggers
- Output parsing
- Session cleanup
Usage:
python basic_scan.py <target>
Example:
python basic_scan.py 192.168.1.0/24
"""
import asyncio
import sys
from pentest_mcp_server import PentestMCPServer
async def main():
if len(sys.argv) < 2:
print("Usage: python basic_scan.py <target>")
print("Example: python basic_scan.py 192.168.1.0/24")
sys.exit(1)
target = sys.argv[1]
server = PentestMCPServer()
try:
print(f"[*] Starting basic network scan of {target}")
print("[*] This example demonstrates the Pentest MCP Server capabilities\n")
# Step 1: Create a persistent session
print("[+] Creating scan session...")
result = await server._handle_create_session({
"session_id": "basic_scan"
})
if result["status"] != "created":
print(f"[-] Failed to create session: {result.get('message')}")
return
print(f"[+] Session created: {result['session_id']}\n")
# Step 2: Execute nmap scan with trigger
print(f"[*] Running nmap scan on {target}...")
print("[*] Using trigger to wait for completion...")
scan_result = await server._handle_execute({
"session_id": "basic_scan",
"command": f"nmap -sV {target} -oX /tmp/basic_scan.xml",
"triggers": [
{"type": "regex", "pattern": "Nmap done", "name": "scan_complete"},
{"type": "timeout", "timeout_seconds": 600}
],
"max_timeout": 600
})
if scan_result["status"] == "trigger_matched":
print(f"[+] Scan completed in {scan_result['execution_time']:.2f} seconds")
print(f"[+] Trigger matched: {scan_result['trigger']['trigger_name']}\n")
# Show last 20 lines of output
output_lines = scan_result["output"].split('\n')
print("[*] Scan output (last 20 lines):")
print("-" * 60)
for line in output_lines[-20:]:
if line.strip():
print(line)
print("-" * 60 + "\n")
# Step 3: Parse the XML results
print("[*] Parsing scan results...")
parse_result = await server._handle_parse_tool_output({
"tool": "nmap",
"file_path": "/tmp/basic_scan.xml",
"format": "xml"
})
if parse_result["status"] == "success":
hosts = parse_result["parsed_data"]["hosts"]
print(f"[+] Found {len(hosts)} host(s)\n")
for i, host in enumerate(hosts, 1):
print(f"Host #{i}: {host.get('address', 'unknown')}")
ports = host.get('ports', [])
if ports:
print(f" Open ports: {len(ports)}")
for port in ports[:10]: # Show first 10 ports
print(f" - {port['port']}/{port['protocol']}: "
f"{port['service']} ({port['state']})")
if len(ports) > 10:
print(f" ... and {len(ports) - 10} more ports")
else:
print(" No open ports detected")
print()
else:
print(f"[-] Failed to parse results: {parse_result.get('message')}")
elif scan_result["status"] == "timeout":
print(f"[-] Scan timed out after {scan_result['execution_time']:.2f} seconds")
else:
print(f"[-] Scan failed: {scan_result.get('error', 'Unknown error')}")
except Exception as e:
print(f"[-] Error: {e}")
finally:
# Step 4: Cleanup
print("\n[*] Cleaning up...")
cleanup_result = await server._handle_kill_session({
"session_id": "basic_scan"
})
if cleanup_result["status"] == "killed":
print("[+] Session terminated")
await server.shutdown()
print("[+] Done!")
if __name__ == "__main__":
asyncio.run(main())