kali_pentest_server.py•10.9 kB
#!/usr/bin/env python3
"""
Simple Kali Pentest MCP Server - Educational web penetration testing tools
"""
import os
import sys
import logging
import subprocess
import re
from mcp.server.fastmcp import FastMCP
# Configure logging to stderr
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr
)
logger = logging.getLogger("kali-pentest-server")
# Initialize MCP server - NO PROMPT PARAMETER!
mcp = FastMCP("kali-pentest")
# Configuration
TARGET_WHITELIST = os.environ.get("PENTEST_TARGET_WHITELIST", "localhost,127.0.0.1").split(",")
# === UTILITY FUNCTIONS ===
def sanitize_input(input_str: str) -> str:
"""Sanitize input to prevent command injection."""
# Remove dangerous characters
dangerous_chars = [';', '&', '|', '`', '$', '(', ')', '<', '>', '\n', '\r']
clean = input_str
for char in dangerous_chars:
clean = clean.replace(char, '')
return clean.strip()
def is_valid_target(target: str) -> bool:
"""Check if target is in whitelist."""
# Extract domain/IP from URL if needed
if target.startswith(('http://', 'https://')):
parts = target.split('/')
if len(parts) >= 3:
target = parts[2].split(':')[0]
# Check against whitelist
for allowed in TARGET_WHITELIST:
if allowed.strip() in target or target in allowed.strip():
return True
return False
def run_command_safe(command: list, timeout: int = 60) -> str:
"""Run command safely with timeout."""
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=timeout
)
if result.returncode == 0:
return result.stdout
else:
return f"Error: {result.stderr}"
except subprocess.TimeoutExpired:
return "Command timed out after {} seconds".format(timeout)
except Exception as e:
return f"Command error: {str(e)}"
# === MCP TOOLS ===
@mcp.tool()
async def nmap_scan(target: str = "", scan_type: str = "basic") -> str:
"""Perform network port scanning with nmap on whitelisted targets."""
logger.info(f"Executing nmap scan on {target}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
if not is_valid_target(target):
return f"❌ Error: Target '{target}' is not in whitelist. Add to PENTEST_TARGET_WHITELIST environment variable."
try:
# Build nmap command based on scan type
if scan_type == "basic":
cmd = ["nmap", "-sT", "-T4", "--top-ports", "100", target]
elif scan_type == "full":
cmd = ["nmap", "-sT", "-T4", "-p-", target]
elif scan_type == "service":
cmd = ["nmap", "-sV", "-T4", "--top-ports", "1000", target]
else:
cmd = ["nmap", "-sT", "-T4", "--top-ports", "100", target]
result = run_command_safe(cmd, timeout=120)
return f"🔍 Nmap Scan Results:\n\n{result}"
except Exception as e:
logger.error(f"Nmap error: {e}")
return f"❌ Error running nmap: {str(e)}"
@mcp.tool()
async def nikto_scan(target: str = "") -> str:
"""Run Nikto web vulnerability scanner on whitelisted targets."""
logger.info(f"Executing nikto scan on {target}")
if not target.strip():
return "❌ Error: Target URL is required"
target = sanitize_input(target)
if not is_valid_target(target):
return f"❌ Error: Target '{target}' is not in whitelist"
try:
# Ensure URL has protocol
if not target.startswith(('http://', 'https://')):
target = f"http://{target}"
cmd = ["nikto", "-h", target, "-Tuning", "123456789", "-timeout", "30"]
result = run_command_safe(cmd, timeout=180)
return f"🌐 Nikto Scan Results:\n\n{result}"
except Exception as e:
logger.error(f"Nikto error: {e}")
return f"❌ Error running nikto: {str(e)}"
@mcp.tool()
async def dirb_scan(target: str = "", wordlist: str = "common") -> str:
"""Run directory enumeration with dirb on whitelisted targets."""
logger.info(f"Executing dirb scan on {target}")
if not target.strip():
return "❌ Error: Target URL is required"
target = sanitize_input(target)
if not is_valid_target(target):
return f"❌ Error: Target '{target}' is not in whitelist"
try:
# Ensure URL has protocol
if not target.startswith(('http://', 'https://')):
target = f"http://{target}"
# Select wordlist
if wordlist == "common":
wl_path = "/usr/share/dirb/wordlists/common.txt"
elif wordlist == "small":
wl_path = "/usr/share/dirb/wordlists/small.txt"
else:
wl_path = "/usr/share/dirb/wordlists/common.txt"
cmd = ["dirb", target, wl_path, "-S", "-r"]
result = run_command_safe(cmd, timeout=120)
return f"📁 Directory Enumeration Results:\n\n{result}"
except Exception as e:
logger.error(f"Dirb error: {e}")
return f"❌ Error running dirb: {str(e)}"
@mcp.tool()
async def wpscan_check(target: str = "") -> str:
"""Run WordPress vulnerability scanner on whitelisted targets."""
logger.info(f"Executing wpscan on {target}")
if not target.strip():
return "❌ Error: Target URL is required"
target = sanitize_input(target)
if not is_valid_target(target):
return f"❌ Error: Target '{target}' is not in whitelist"
try:
# Ensure URL has protocol
if not target.startswith(('http://', 'https://')):
target = f"http://{target}"
cmd = ["wpscan", "--url", target, "--enumerate", "vp,vt,u", "--plugins-detection", "passive"]
result = run_command_safe(cmd, timeout=180)
return f"📝 WordPress Scan Results:\n\n{result}"
except Exception as e:
logger.error(f"WPScan error: {e}")
return f"❌ Error running wpscan: {str(e)}"
@mcp.tool()
async def sqlmap_test(target: str = "", param: str = "") -> str:
"""Test for SQL injection vulnerabilities on whitelisted targets."""
logger.info(f"Executing sqlmap test on {target}")
if not target.strip():
return "❌ Error: Target URL is required"
target = sanitize_input(target)
param = sanitize_input(param)
if not is_valid_target(target):
return f"❌ Error: Target '{target}' is not in whitelist"
try:
# Basic SQL injection test
cmd = ["sqlmap", "-u", target, "--batch", "--level=1", "--risk=1", "--timeout=30"]
# Add parameter if specified
if param.strip():
cmd.extend(["-p", param])
result = run_command_safe(cmd, timeout=120)
# Extract important findings
if "might be injectable" in result or "is vulnerable" in result:
return f"⚠️ SQL Injection Test Results:\n\n{result}"
else:
return f"✅ SQL Injection Test Results:\n\n{result}"
except Exception as e:
logger.error(f"SQLMap error: {e}")
return f"❌ Error running sqlmap: {str(e)}"
@mcp.tool()
async def searchsploit(search_term: str = "") -> str:
"""Search for exploits in the Exploit Database."""
logger.info(f"Searching exploits for: {search_term}")
if not search_term.strip():
return "❌ Error: Search term is required"
search_term = sanitize_input(search_term)
try:
cmd = ["searchsploit", search_term, "--colour"]
result = run_command_safe(cmd, timeout=30)
if result and "No Results" not in result:
return f"🔒 Exploit Search Results for '{search_term}':\n\n{result}"
else:
return f"📊 No exploits found for '{search_term}'"
except Exception as e:
logger.error(f"Searchsploit error: {e}")
return f"❌ Error running searchsploit: {str(e)}"
@mcp.tool()
async def check_target_whitelist() -> str:
"""Display currently whitelisted targets for testing."""
logger.info("Checking target whitelist")
try:
whitelist = TARGET_WHITELIST
if whitelist:
formatted_list = "\n".join([f" • {target.strip()}" for target in whitelist])
return f"✅ Current Whitelisted Targets:\n{formatted_list}\n\nTo add more targets, set PENTEST_TARGET_WHITELIST environment variable."
else:
return "⚠️ No targets whitelisted. Set PENTEST_TARGET_WHITELIST environment variable."
except Exception as e:
logger.error(f"Error checking whitelist: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def quick_recon(target: str = "") -> str:
"""Run quick reconnaissance scan combining multiple tools on whitelisted target."""
logger.info(f"Running quick recon on {target}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
if not is_valid_target(target):
return f"❌ Error: Target '{target}' is not in whitelist"
try:
results = []
# Quick nmap scan
nmap_cmd = ["nmap", "-sT", "-T4", "--top-ports", "20", target]
nmap_result = run_command_safe(nmap_cmd, timeout=60)
results.append(f"=== Port Scan ===\n{nmap_result[:500]}")
# Check if HTTP/HTTPS is available
if "80/tcp" in nmap_result or "443/tcp" in nmap_result or "8080/tcp" in nmap_result:
# Ensure URL format
if not target.startswith(('http://', 'https://')):
url_target = f"http://{target}"
else:
url_target = target
# Quick dirb scan with limited recursion
dirb_cmd = ["dirb", url_target, "/usr/share/dirb/wordlists/small.txt", "-S", "-r"]
dirb_result = run_command_safe(dirb_cmd, timeout=60)
results.append(f"\n=== Directory Scan ===\n{dirb_result[:500]}")
return f"⚡ Quick Reconnaissance Results:\n\n" + "\n".join(results)
except Exception as e:
logger.error(f"Quick recon error: {e}")
return f"❌ Error during reconnaissance: {str(e)}"
# === SERVER STARTUP ===
if __name__ == "__main__":
logger.info("Starting Kali Pentest MCP server...")
logger.info(f"Whitelisted targets: {', '.join(TARGET_WHITELIST)}")
# Warning message
logger.warning("This server is for educational purposes only!")
logger.warning("Only test systems you own or have explicit permission to test.")
try:
mcp.run(transport='stdio')
except Exception as e:
logger.error(f"Server error: {e}", exc_info=True)
sys.exit(1)