server.pyβ’23.1 kB
#!/usr/bin/env python3
"""
MCP Kali Pentest Server - Professional Automated Penetration Testing
Integrated with LM Studio for autonomous decision-making
"""
import asyncio
import json
import logging
from typing import Any, Dict, List, Optional
from datetime import datetime
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
from pentest_engine import PentestEngine
from lm_studio_client import LMStudioClient
from tools import (
nmap_scan,
nikto_scan,
sqlmap_scan,
metasploit_search,
gobuster_scan,
wpscan,
wfuzz_scan,
hydra_bruteforce,
john_crack,
hashcat_crack,
aircrack_ng,
tcpdump_capture,
wireshark_analyze,
burpsuite_scan,
owasp_zap_scan,
nuclei_scan,
ffuf_fuzz,
exploit_db_search,
searchsploit,
enum4linux,
smbclient_enum,
snmp_check,
dns_enum,
ssl_scan,
vulnerability_assessment
)
from autonomous_engine import AutonomousDecisionEngine
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/mcpkali/server.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Initialize MCP Server
server = Server("mcp-kali-pentest")
# Initialize components
pentest_engine = PentestEngine()
lm_studio_client = LMStudioClient()
autonomous_engine = AutonomousDecisionEngine(lm_studio_client)
# Store active pentest sessions
active_sessions: Dict[str, Dict[str, Any]] = {}
@server.list_resources()
async def handle_list_resources() -> list[Resource]:
"""List available pentest resources and reports"""
resources = []
# Add active sessions as resources
for session_id, session in active_sessions.items():
resources.append(
Resource(
uri=f"pentest://session/{session_id}",
name=f"Pentest Session: {session['target']}",
description=f"Active pentest session against {session['target']}",
mimeType="application/json"
)
)
# Add reports
reports = pentest_engine.get_available_reports()
for report in reports:
resources.append(
Resource(
uri=f"pentest://report/{report['id']}",
name=f"Report: {report['name']}",
description=report['description'],
mimeType="application/json"
)
)
return resources
@server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read pentest session data or reports"""
if uri.startswith("pentest://session/"):
session_id = uri.replace("pentest://session/", "")
if session_id in active_sessions:
return json.dumps(active_sessions[session_id], indent=2)
raise ValueError(f"Session {session_id} not found")
elif uri.startswith("pentest://report/"):
report_id = uri.replace("pentest://report/", "")
report = pentest_engine.get_report(report_id)
return json.dumps(report, indent=2)
raise ValueError(f"Unknown resource URI: {uri}")
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List all available penetration testing tools"""
return [
# Reconnaissance Tools
Tool(
name="nmap_scan",
description="Advanced port scanning and service detection using Nmap. Supports TCP/UDP, OS detection, script scanning, and version detection.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target IP address or hostname"},
"scan_type": {"type": "string", "enum": ["quick", "full", "stealth", "aggressive", "udp"], "default": "quick"},
"ports": {"type": "string", "description": "Port range (e.g., '1-1000', '80,443,8080')"},
"scripts": {"type": "array", "items": {"type": "string"}, "description": "NSE scripts to run"}
},
"required": ["target"]
}
),
Tool(
name="nikto_scan",
description="Web server vulnerability scanner. Checks for outdated software, dangerous files, and misconfigurations.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target URL"},
"ssl": {"type": "boolean", "default": False},
"port": {"type": "integer", "default": 80}
},
"required": ["target"]
}
),
Tool(
name="gobuster_scan",
description="Directory and file brute-forcing tool. Discovers hidden paths and files on web servers.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target URL"},
"wordlist": {"type": "string", "description": "Path to wordlist file"},
"extensions": {"type": "array", "items": {"type": "string"}, "description": "File extensions to search"}
},
"required": ["target"]
}
),
Tool(
name="dns_enum",
description="DNS enumeration and subdomain discovery. Finds subdomains, DNS records, and zone transfers.",
inputSchema={
"type": "object",
"properties": {
"domain": {"type": "string", "description": "Target domain"},
"record_types": {"type": "array", "items": {"type": "string"}, "default": ["A", "AAAA", "MX", "NS", "TXT"]}
},
"required": ["domain"]
}
),
Tool(
name="enum4linux",
description="Enumerate SMB shares, users, and groups on Windows/Samba systems.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target IP address"},
"username": {"type": "string", "description": "Username for authentication"},
"password": {"type": "string", "description": "Password for authentication"}
},
"required": ["target"]
}
),
# Vulnerability Scanning Tools
Tool(
name="sqlmap_scan",
description="Automated SQL injection detection and exploitation. Tests for SQLi vulnerabilities and can extract database contents.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target URL"},
"data": {"type": "string", "description": "POST data"},
"cookie": {"type": "string", "description": "HTTP Cookie header"},
"level": {"type": "integer", "minimum": 1, "maximum": 5, "default": 1},
"risk": {"type": "integer", "minimum": 1, "maximum": 3, "default": 1}
},
"required": ["target"]
}
),
Tool(
name="nuclei_scan",
description="Fast vulnerability scanner using templates. Detects CVEs, misconfigurations, and security issues.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target URL or IP"},
"templates": {"type": "array", "items": {"type": "string"}, "description": "Specific templates to use"},
"severity": {"type": "array", "items": {"type": "string"}, "description": "Severity levels to scan"}
},
"required": ["target"]
}
),
Tool(
name="wpscan",
description="WordPress vulnerability scanner. Detects vulnerable plugins, themes, and WordPress core issues.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "WordPress site URL"},
"enumerate": {"type": "string", "enum": ["vp", "ap", "tt", "cb", "dbe", "u"], "description": "What to enumerate"},
"api_token": {"type": "string", "description": "WPScan API token"}
},
"required": ["target"]
}
),
Tool(
name="owasp_zap_scan",
description="OWASP ZAP automated security scanner. Performs active and passive scanning of web applications.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target URL"},
"scan_type": {"type": "string", "enum": ["quick", "full", "api"], "default": "quick"}
},
"required": ["target"]
}
),
Tool(
name="ssl_scan",
description="SSL/TLS configuration analyzer. Checks for weak ciphers, certificate issues, and vulnerabilities.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target hostname"},
"port": {"type": "integer", "default": 443}
},
"required": ["target"]
}
),
# Exploitation Tools
Tool(
name="metasploit_search",
description="Search Metasploit database for exploits and modules.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"type": {"type": "string", "enum": ["exploit", "auxiliary", "post", "payload"], "description": "Module type"}
},
"required": ["query"]
}
),
Tool(
name="searchsploit",
description="Search Exploit-DB for known exploits and vulnerabilities.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query (software name, CVE, etc.)"}
},
"required": ["query"]
}
),
# Brute Force Tools
Tool(
name="hydra_bruteforce",
description="Network login brute-force tool. Supports SSH, FTP, HTTP, RDP, and many other protocols.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target IP or hostname"},
"service": {"type": "string", "enum": ["ssh", "ftp", "http", "rdp", "smb", "mysql", "postgres"]},
"username": {"type": "string", "description": "Single username or path to username list"},
"password_list": {"type": "string", "description": "Path to password list"},
"port": {"type": "integer", "description": "Custom port"}
},
"required": ["target", "service", "password_list"]
}
),
Tool(
name="ffuf_fuzz",
description="Fast web fuzzer for discovering paths, parameters, and vulnerabilities.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target URL with FUZZ keyword"},
"wordlist": {"type": "string", "description": "Path to wordlist"},
"method": {"type": "string", "enum": ["GET", "POST", "PUT", "DELETE"], "default": "GET"}
},
"required": ["target", "wordlist"]
}
),
# Password Cracking
Tool(
name="john_crack",
description="John the Ripper password cracker. Cracks password hashes using wordlists and rules.",
inputSchema={
"type": "object",
"properties": {
"hash_file": {"type": "string", "description": "Path to file containing hashes"},
"wordlist": {"type": "string", "description": "Path to wordlist"},
"format": {"type": "string", "description": "Hash format (e.g., md5, sha256, ntlm)"}
},
"required": ["hash_file"]
}
),
Tool(
name="hashcat_crack",
description="Advanced password recovery using GPU acceleration.",
inputSchema={
"type": "object",
"properties": {
"hash": {"type": "string", "description": "Hash to crack"},
"hash_type": {"type": "integer", "description": "Hashcat hash type number"},
"wordlist": {"type": "string", "description": "Path to wordlist"},
"attack_mode": {"type": "integer", "enum": [0, 1, 3, 6, 7], "default": 0}
},
"required": ["hash", "hash_type"]
}
),
# Network Analysis
Tool(
name="tcpdump_capture",
description="Capture network packets for analysis.",
inputSchema={
"type": "object",
"properties": {
"interface": {"type": "string", "description": "Network interface"},
"filter": {"type": "string", "description": "BPF filter"},
"duration": {"type": "integer", "description": "Capture duration in seconds"},
"output_file": {"type": "string", "description": "PCAP output file"}
},
"required": ["interface"]
}
),
Tool(
name="snmp_check",
description="SNMP enumeration tool. Extracts system information via SNMP.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target IP address"},
"community": {"type": "string", "default": "public", "description": "SNMP community string"}
},
"required": ["target"]
}
),
# Autonomous Pentest Tools
Tool(
name="start_autonomous_pentest",
description="Start a fully autonomous penetration test. The AI will make decisions and suggest next steps automatically.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target IP, hostname, or URL"},
"scope": {"type": "array", "items": {"type": "string"}, "description": "Additional IPs/subnets in scope"},
"rules_of_engagement": {"type": "object", "description": "Constraints and permissions"},
"depth": {"type": "string", "enum": ["reconnaissance", "vulnerability_scan", "exploitation", "post_exploitation"], "default": "vulnerability_scan"}
},
"required": ["target"]
}
),
Tool(
name="get_ai_suggestion",
description="Get AI-powered suggestions for next steps based on current findings.",
inputSchema={
"type": "object",
"properties": {
"session_id": {"type": "string", "description": "Pentest session ID"},
"context": {"type": "string", "description": "Additional context for the AI"}
},
"required": ["session_id"]
}
),
Tool(
name="vulnerability_assessment",
description="Comprehensive vulnerability assessment using multiple tools and AI analysis.",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target to assess"},
"assessment_type": {"type": "string", "enum": ["web", "network", "wireless", "comprehensive"], "default": "comprehensive"}
},
"required": ["target"]
}
),
Tool(
name="generate_report",
description="Generate a comprehensive penetration test report with findings, risk ratings, and remediation recommendations.",
inputSchema={
"type": "object",
"properties": {
"session_id": {"type": "string", "description": "Pentest session ID"},
"format": {"type": "string", "enum": ["json", "html", "pdf", "markdown"], "default": "json"}
},
"required": ["session_id"]
}
)
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool execution"""
try:
logger.info(f"Executing tool: {name} with arguments: {arguments}")
# Route to appropriate handler
if name == "nmap_scan":
result = await nmap_scan(**arguments)
elif name == "nikto_scan":
result = await nikto_scan(**arguments)
elif name == "sqlmap_scan":
result = await sqlmap_scan(**arguments)
elif name == "metasploit_search":
result = await metasploit_search(**arguments)
elif name == "gobuster_scan":
result = await gobuster_scan(**arguments)
elif name == "wpscan":
result = await wpscan(**arguments)
elif name == "wfuzz_scan":
result = await wfuzz_scan(**arguments)
elif name == "hydra_bruteforce":
result = await hydra_bruteforce(**arguments)
elif name == "john_crack":
result = await john_crack(**arguments)
elif name == "hashcat_crack":
result = await hashcat_crack(**arguments)
elif name == "aircrack_ng":
result = await aircrack_ng(**arguments)
elif name == "tcpdump_capture":
result = await tcpdump_capture(**arguments)
elif name == "wireshark_analyze":
result = await wireshark_analyze(**arguments)
elif name == "owasp_zap_scan":
result = await owasp_zap_scan(**arguments)
elif name == "nuclei_scan":
result = await nuclei_scan(**arguments)
elif name == "ffuf_fuzz":
result = await ffuf_fuzz(**arguments)
elif name == "searchsploit":
result = await searchsploit(**arguments)
elif name == "enum4linux":
result = await enum4linux(**arguments)
elif name == "smbclient_enum":
result = await smbclient_enum(**arguments)
elif name == "snmp_check":
result = await snmp_check(**arguments)
elif name == "dns_enum":
result = await dns_enum(**arguments)
elif name == "ssl_scan":
result = await ssl_scan(**arguments)
elif name == "vulnerability_assessment":
result = await vulnerability_assessment(**arguments)
elif name == "start_autonomous_pentest":
result = await start_autonomous_pentest(**arguments)
elif name == "get_ai_suggestion":
result = await get_ai_suggestion(**arguments)
elif name == "generate_report":
result = await generate_report(**arguments)
else:
raise ValueError(f"Unknown tool: {name}")
# Log result
logger.info(f"Tool {name} completed successfully")
return [TextContent(type="text", text=json.dumps(result, indent=2))]
except Exception as e:
logger.error(f"Error executing tool {name}: {str(e)}", exc_info=True)
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def start_autonomous_pentest(
target: str,
scope: Optional[List[str]] = None,
rules_of_engagement: Optional[Dict[str, Any]] = None,
depth: str = "vulnerability_scan"
) -> Dict[str, Any]:
"""Start autonomous penetration test"""
session_id = f"pentest_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
session = {
"id": session_id,
"target": target,
"scope": scope or [target],
"roe": rules_of_engagement or {},
"depth": depth,
"status": "running",
"findings": [],
"timeline": [],
"started_at": datetime.now().isoformat()
}
active_sessions[session_id] = session
# Start autonomous pentest in background
asyncio.create_task(autonomous_engine.run_pentest(session))
return {
"session_id": session_id,
"status": "started",
"message": f"Autonomous pentest initiated against {target}",
"initial_plan": await autonomous_engine.create_initial_plan(session)
}
async def get_ai_suggestion(session_id: str, context: Optional[str] = None) -> Dict[str, Any]:
"""Get AI-powered suggestions"""
if session_id not in active_sessions:
raise ValueError(f"Session {session_id} not found")
session = active_sessions[session_id]
suggestions = await autonomous_engine.get_suggestions(session, context)
return {
"session_id": session_id,
"suggestions": suggestions,
"current_status": session["status"],
"findings_count": len(session["findings"])
}
async def generate_report(session_id: str, format: str = "json") -> Dict[str, Any]:
"""Generate comprehensive report"""
if session_id not in active_sessions:
raise ValueError(f"Session {session_id} not found")
session = active_sessions[session_id]
report = await pentest_engine.generate_report(session, format)
return report
async def main():
"""Run the MCP server"""
logger.info("Starting MCP Kali Pentest Server...")
# Initialize LM Studio connection
await lm_studio_client.connect()
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="mcp-kali-pentest",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
)
)
)
if __name__ == "__main__":
asyncio.run(main())