#!/usr/bin/env python3
"""
Professional Penetration Testing MCP Server
Advanced security testing platform for authorized penetration testing
"""
import os
import sys
import logging
import subprocess
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Tuple
from mcp.server.fastmcp import FastMCP
# Configure logging to stderr
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr
)
logger = logging.getLogger("pentest-server-pro")
# Initialize MCP server
mcp = FastMCP("pentest-pro")
# Workspace and results directory
WORKSPACE_DIR = Path("/app/workspaces")
RESULTS_DIR = Path("/app/results")
WORDLISTS_DIR = Path("/usr/share/wordlists")
# Create directories if they don't exist
WORKSPACE_DIR.mkdir(exist_ok=True)
RESULTS_DIR.mkdir(exist_ok=True)
# === UTILITY FUNCTIONS ===
def sanitize_input(input_str: str) -> str:
"""Sanitize input to prevent command injection."""
if not input_str:
return ""
# Remove dangerous characters and shell metacharacters
dangerous_chars = [';', '&', '|', '`', '$', '(', ')', '<', '>', '\\', '"', "'"]
sanitized = input_str
for char in dangerous_chars:
sanitized = sanitized.replace(char, '')
return sanitized.strip()
def validate_ip_or_domain(target: str) -> bool:
"""Validate that target is a reasonable IP or domain."""
if not target:
return False
# IP pattern (IPv4)
ip_pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
# CIDR pattern
cidr_pattern = r'^(\d{1,3}\.){3}\d{1,3}/\d{1,2}$'
# Domain pattern
domain_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$'
return bool(re.match(ip_pattern, target) or re.match(cidr_pattern, target) or re.match(domain_pattern, target))
def validate_port(port: str) -> bool:
"""Validate port number."""
try:
port_int = int(port)
return 1 <= port_int <= 65535
except ValueError:
return False
def run_command(command: List[str], timeout: int = 30, capture_output: bool = True) -> Tuple[int, str, str]:
"""Execute command safely with timeout."""
try:
result = subprocess.run(
command,
capture_output=capture_output,
text=True,
timeout=timeout,
shell=False
)
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return -1, "", "Command timed out"
except Exception as e:
return -1, "", str(e)
def save_results(filename: str, content: str) -> str:
"""Save results to file."""
try:
filepath = RESULTS_DIR / filename
filepath.write_text(content)
return str(filepath)
except Exception as e:
logger.error(f"Failed to save results: {e}")
return ""
# === NETWORK RECONNAISSANCE TOOLS ===
@mcp.tool()
async def nmap_scan(target: str = "", scan_type: str = "basic", ports: str = "") -> str:
"""Advanced Nmap network scanning with multiple scan types and output formats."""
logger.info(f"Executing nmap scan on {target}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
if not validate_ip_or_domain(target):
return "❌ Error: Invalid target format"
scan_type = sanitize_input(scan_type.lower())
ports = sanitize_input(ports)
try:
command = ["nmap"]
# Scan type configurations
if scan_type == "stealth":
command.extend(["-sS", "-O"])
elif scan_type == "version":
command.extend(["-sV", "-sC"])
elif scan_type == "aggressive":
command.extend(["-A", "-T4"])
elif scan_type == "full":
command.extend(["-p-", "-sV", "-sC", "-A"])
elif scan_type == "udp":
command.extend(["-sU"])
elif scan_type == "vuln":
command.extend(["--script", "vuln"])
else:
command.append("-sT")
# Port specification
if ports:
command.extend(["-p", ports])
command.append(target)
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0:
# Save results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"nmap_{target}_{timestamp}.txt", stdout)
return f"✅ Nmap scan completed:\n{stdout}"
else:
return f"❌ Nmap scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def masscan_scan(target: str = "", ports: str = "1-65535", rate: str = "1000") -> str:
"""High-speed port scanner using Masscan for large-scale network discovery."""
logger.info(f"Executing masscan on {target}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
if not validate_ip_or_domain(target):
return "❌ Error: Invalid target format"
ports = sanitize_input(ports)
rate = sanitize_input(rate)
try:
command = ["masscan", target, "-p", ports, "--rate", rate]
returncode, stdout, stderr = run_command(command, timeout=600)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"masscan_{target}_{timestamp}.txt", stdout)
return f"⚡ Masscan completed:\n{stdout}"
else:
return f"❌ Masscan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def dnsenum_scan(domain: str = "") -> str:
"""Comprehensive DNS enumeration including subdomains, nameservers, and zone transfers."""
logger.info(f"Executing dnsenum on {domain}")
if not domain.strip():
return "❌ Error: Domain is required"
domain = sanitize_input(domain)
if not validate_ip_or_domain(domain):
return "❌ Error: Invalid domain format"
try:
command = ["dnsenum", "--enum", domain]
returncode, stdout, stderr = run_command(command, timeout=120)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"dnsenum_{domain}_{timestamp}.txt", stdout)
return f"🔍 DNS enumeration completed:\n{stdout}"
else:
return f"❌ DNS enumeration failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def amass_enum(domain: str = "", mode: str = "passive") -> str:
"""Advanced subdomain enumeration using OWASP Amass with passive or active reconnaissance."""
logger.info(f"Executing Amass enumeration on {domain}")
if not domain.strip():
return "❌ Error: Domain is required"
domain = sanitize_input(domain)
mode = sanitize_input(mode.lower())
try:
if mode == "active":
command = ["amass", "enum", "-active", "-d", domain]
else:
command = ["amass", "enum", "-passive", "-d", domain]
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"amass_{domain}_{timestamp}.txt", stdout)
return f"🎯 Amass enumeration completed:\n{stdout}"
else:
return f"❌ Amass enumeration failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
# === WEB APPLICATION TESTING TOOLS ===
@mcp.tool()
async def nikto_scan(target: str = "", port: str = "80", ssl: str = "no") -> str:
"""Comprehensive web vulnerability scanner using Nikto with SSL/TLS support."""
logger.info(f"Executing nikto scan on {target}:{port}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
port = sanitize_input(port)
if not validate_ip_or_domain(target):
return "❌ Error: Invalid target format"
if not validate_port(port):
return "❌ Error: Invalid port number"
try:
command = ["nikto", "-h", f"{target}:{port}"]
if ssl.lower() == "yes":
command.append("-ssl")
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"nikto_{target}_{timestamp}.txt", stdout)
return f"🔍 Nikto scan completed:\n{stdout}"
else:
return f"❌ Nikto scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def sqlmap_scan(url: str = "", data: str = "", cookie: str = "", level: str = "1", risk: str = "1") -> str:
"""SQL injection detection and exploitation using SQLMap with customizable levels and risk."""
logger.info(f"Executing SQLMap on {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
data = sanitize_input(data)
cookie = sanitize_input(cookie)
try:
command = ["sqlmap", "-u", url, "--batch", "--level", level, "--risk", risk]
if data:
command.extend(["--data", data])
if cookie:
command.extend(["--cookie", cookie])
returncode, stdout, stderr = run_command(command, timeout=600)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"sqlmap_{timestamp}.txt", stdout)
return f"💉 SQLMap scan completed:\n{stdout}"
else:
return f"❌ SQLMap scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def wpscan_scan(url: str = "", enumerate: str = "vp,vt,u") -> str:
"""WordPress vulnerability scanner with plugin, theme, and user enumeration."""
logger.info(f"Executing WPScan on {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
enumerate = sanitize_input(enumerate)
try:
command = ["wpscan", "--url", url, "--enumerate", enumerate, "--random-user-agent"]
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"wpscan_{timestamp}.txt", stdout)
return f"🔍 WPScan completed:\n{stdout}"
else:
return f"❌ WPScan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def ffuf_scan(url: str = "", wordlist: str = "common", mode: str = "dir") -> str:
"""Fast web fuzzer for directory, file, parameter, and vhost discovery."""
logger.info(f"Executing FFUF on {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
wordlist = sanitize_input(wordlist.lower())
# Select wordlist
if wordlist == "big":
wordlist_path = "/usr/share/wordlists/dirb/big.txt"
elif wordlist == "common":
wordlist_path = "/usr/share/wordlists/dirb/common.txt"
else:
wordlist_path = "/usr/share/wordlists/dirb/common.txt"
try:
if mode == "vhost":
command = ["ffuf", "-u", url, "-H", "Host: FUZZ", "-w", wordlist_path]
else:
command = ["ffuf", "-u", f"{url}/FUZZ", "-w", wordlist_path]
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"ffuf_{timestamp}.txt", stdout)
return f"⚡ FFUF scan completed:\n{stdout}"
else:
return f"❌ FFUF scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def nuclei_scan(target: str = "", templates: str = "cves") -> str:
"""Fast vulnerability scanner using Nuclei templates for CVEs, misconfigurations, and more."""
logger.info(f"Executing Nuclei scan on {target}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
templates = sanitize_input(templates.lower())
try:
command = ["nuclei", "-u", target, "-t", templates]
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"nuclei_{timestamp}.txt", stdout)
return f"⚡ Nuclei scan completed:\n{stdout}"
else:
return f"❌ Nuclei scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def wafw00f_scan(url: str = "") -> str:
"""Web Application Firewall detection and fingerprinting."""
logger.info(f"Executing wafw00f on {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
try:
command = ["wafw00f", url]
returncode, stdout, stderr = run_command(command, timeout=60)
if returncode == 0:
return f"🛡️ WAF detection completed:\n{stdout}"
else:
return f"❌ WAF detection failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
# === EXPLOITATION TOOLS ===
@mcp.tool()
async def searchsploit_lookup(search_term: str = "", exact: str = "no") -> str:
"""Search exploit database with options for exact matching and detailed information."""
logger.info(f"Searching exploits for: {search_term}")
if not search_term.strip():
return "❌ Error: Search term is required"
search_term = sanitize_input(search_term)
try:
command = ["searchsploit"]
if exact.lower() == "yes":
command.append("--exact")
command.append(search_term)
returncode, stdout, stderr = run_command(command, timeout=30)
if returncode == 0:
return f"🔍 SearchSploit results:\n{stdout}"
else:
return f"❌ SearchSploit failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def msfconsole_search(search_term: str = "") -> str:
"""Search Metasploit Framework modules for exploits, auxiliaries, and payloads."""
logger.info(f"Searching Metasploit modules for: {search_term}")
if not search_term.strip():
return "❌ Error: Search term is required"
search_term = sanitize_input(search_term)
try:
command = ["msfconsole", "-q", "-x", f"search {search_term}; exit"]
returncode, stdout, stderr = run_command(command, timeout=60)
if returncode == 0:
return f"🎯 Metasploit search results:\n{stdout}"
else:
return f"❌ Metasploit search failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
# === PASSWORD CRACKING & BRUTE FORCING ===
@mcp.tool()
async def hydra_bruteforce(target: str = "", service: str = "ssh", username: str = "",
password_list: str = "rockyou") -> str:
"""Network login brute-forcing with Hydra supporting SSH, FTP, HTTP, RDP, and more."""
logger.info(f"Executing Hydra brute-force on {target} for {service}")
if not target.strip() or not username.strip():
return "❌ Error: Target and username are required"
target = sanitize_input(target)
service = sanitize_input(service.lower())
username = sanitize_input(username)
# Select password list
if password_list == "rockyou":
pass_file = "/usr/share/wordlists/rockyou.txt"
else:
pass_file = "/usr/share/wordlists/rockyou.txt"
try:
command = ["hydra", "-l", username, "-P", pass_file, target, service, "-t", "4"]
returncode, stdout, stderr = run_command(command, timeout=600)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"hydra_{target}_{service}_{timestamp}.txt", stdout)
return f"🔓 Hydra brute-force completed:\n{stdout}"
else:
return f"❌ Hydra brute-force failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def john_crack(hash_file: str = "", format: str = "auto", wordlist: str = "rockyou") -> str:
"""Password hash cracking using John the Ripper with multiple hash format support."""
logger.info(f"Executing John the Ripper on {hash_file}")
if not hash_file.strip():
return "❌ Error: Hash file path is required"
hash_file = sanitize_input(hash_file)
format_type = sanitize_input(format.lower())
# Select wordlist
if wordlist == "rockyou":
wlist = "/usr/share/wordlists/rockyou.txt"
else:
wlist = "/usr/share/wordlists/rockyou.txt"
try:
command = ["john", hash_file, f"--wordlist={wlist}"]
if format_type != "auto":
command.append(f"--format={format_type}")
returncode, stdout, stderr = run_command(command, timeout=600)
if returncode == 0:
return f"🔓 John the Ripper results:\n{stdout}"
else:
return f"❌ John the Ripper failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def hashcat_crack(hash_file: str = "", hash_type: str = "0", attack_mode: str = "0") -> str:
"""GPU-accelerated password cracking using Hashcat with various attack modes."""
logger.info(f"Executing Hashcat on {hash_file}")
if not hash_file.strip():
return "❌ Error: Hash file path is required"
hash_file = sanitize_input(hash_file)
hash_type = sanitize_input(hash_type)
attack_mode = sanitize_input(attack_mode)
try:
wordlist = "/usr/share/wordlists/rockyou.txt"
command = ["hashcat", "-m", hash_type, "-a", attack_mode, hash_file, wordlist]
returncode, stdout, stderr = run_command(command, timeout=600)
if returncode == 0:
return f"🔓 Hashcat results:\n{stdout}"
else:
return f"❌ Hashcat failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def hashid_identify(hash_value: str = "") -> str:
"""Identify hash types for password cracking tool selection."""
logger.info(f"Identifying hash type")
if not hash_value.strip():
return "❌ Error: Hash value is required"
hash_value = sanitize_input(hash_value)
try:
command = ["hashid", hash_value]
returncode, stdout, stderr = run_command(command, timeout=10)
if returncode == 0:
return f"🔍 Hash identification:\n{stdout}"
else:
return f"❌ Hash identification failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
# === POST-EXPLOITATION TOOLS ===
@mcp.tool()
async def enum4linux_scan(target: str = "") -> str:
"""Enumerate SMB shares, users, groups, and policies on Windows/Samba systems."""
logger.info(f"Executing enum4linux on {target}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
try:
command = ["enum4linux", "-a", target]
returncode, stdout, stderr = run_command(command, timeout=120)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"enum4linux_{target}_{timestamp}.txt", stdout)
return f"📋 Enum4linux scan completed:\n{stdout}"
else:
return f"❌ Enum4linux scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def smbmap_scan(target: str = "", username: str = "guest", password: str = "") -> str:
"""SMB share enumeration and permission auditing with authentication support."""
logger.info(f"Executing smbmap on {target}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
username = sanitize_input(username)
password = sanitize_input(password)
try:
command = ["smbmap", "-H", target]
if username:
command.extend(["-u", username])
if password:
command.extend(["-p", password])
returncode, stdout, stderr = run_command(command, timeout=60)
if returncode == 0:
return f"📂 SMBMap scan completed:\n{stdout}"
else:
return f"❌ SMBMap scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
# === SSL/TLS TESTING ===
@mcp.tool()
async def sslscan_test(target: str = "", port: str = "443") -> str:
"""Comprehensive SSL/TLS security testing including cipher suites and protocol support."""
logger.info(f"Executing SSLScan on {target}:{port}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
port = sanitize_input(port)
try:
command = ["sslscan", f"{target}:{port}"]
returncode, stdout, stderr = run_command(command, timeout=60)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"sslscan_{target}_{timestamp}.txt", stdout)
return f"🔒 SSLScan completed:\n{stdout}"
else:
return f"❌ SSLScan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def testssl_scan(target: str = "") -> str:
"""Advanced SSL/TLS testing for vulnerabilities including Heartbleed, POODLE, BEAST, etc."""
logger.info(f"Executing testssl.sh on {target}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
try:
command = ["testssl.sh", target]
returncode, stdout, stderr = run_command(command, timeout=120)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"testssl_{target}_{timestamp}.txt", stdout)
return f"🔒 Testssl.sh scan completed:\n{stdout}"
else:
return f"❌ Testssl.sh scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
# === WIRELESS TESTING ===
@mcp.tool()
async def aircrack_analyze(capture_file: str = "", wordlist: str = "rockyou") -> str:
"""WPA/WPA2 password cracking from capture files using Aircrack-ng."""
logger.info(f"Executing Aircrack-ng on {capture_file}")
if not capture_file.strip():
return "❌ Error: Capture file path is required"
capture_file = sanitize_input(capture_file)
# Select wordlist
if wordlist == "rockyou":
wlist = "/usr/share/wordlists/rockyou.txt"
else:
wlist = "/usr/share/wordlists/rockyou.txt"
try:
command = ["aircrack-ng", capture_file, "-w", wlist]
returncode, stdout, stderr = run_command(command, timeout=600)
if returncode == 0:
return f"📡 Aircrack-ng analysis completed:\n{stdout}"
else:
return f"❌ Aircrack-ng analysis failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
# === ADDITIONAL UTILITY TOOLS ===
@mcp.tool()
async def whatweb_scan(target: str = "", aggression: str = "1") -> str:
"""Web technology fingerprinting with adjustable aggression levels."""
logger.info(f"Executing WhatWeb scan on {target}")
if not target.strip():
return "❌ Error: Target URL is required"
target = sanitize_input(target)
aggression = sanitize_input(aggression)
if not target.startswith(('http://', 'https://')):
target = f"http://{target}"
try:
command = ["whatweb", "-a", aggression, target]
returncode, stdout, stderr = run_command(command, timeout=60)
if returncode == 0:
return f"🌐 WhatWeb scan completed:\n{stdout}"
else:
return f"❌ WhatWeb scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def gobuster_scan(target: str = "", mode: str = "dir", wordlist: str = "common") -> str:
"""Fast directory, DNS, and vhost enumeration with customizable wordlists."""
logger.info(f"Executing Gobuster {mode} scan on {target}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
mode = sanitize_input(mode.lower())
wordlist = sanitize_input(wordlist.lower())
# Select wordlist
if wordlist == "big":
wlist = "/usr/share/wordlists/dirb/big.txt"
else:
wlist = "/usr/share/wordlists/dirb/common.txt"
try:
if mode == "dns":
if not validate_ip_or_domain(target):
return "❌ Error: Invalid domain format for DNS mode"
command = ["gobuster", "dns", "-d", target, "-w", wlist]
elif mode == "vhost":
if not target.startswith(('http://', 'https://')):
target = f"http://{target}"
command = ["gobuster", "vhost", "-u", target, "-w", wlist]
else:
if not target.startswith(('http://', 'https://')):
target = f"http://{target}"
command = ["gobuster", "dir", "-u", target, "-w", wlist]
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"gobuster_{mode}_{timestamp}.txt", stdout)
return f"⚡ Gobuster {mode} scan completed:\n{stdout}"
else:
return f"❌ Gobuster scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def dirb_scan(target: str = "", wordlist: str = "common") -> str:
"""Web content directory brute-forcing with multiple wordlist options."""
logger.info(f"Executing DIRB scan on {target}")
if not target.strip():
return "❌ Error: Target URL is required"
target = sanitize_input(target)
wordlist = sanitize_input(wordlist.lower())
if not target.startswith(('http://', 'https://')):
target = f"http://{target}"
try:
if wordlist == "big":
wordlist_path = "/usr/share/wordlists/dirb/big.txt"
elif wordlist == "small":
wordlist_path = "/usr/share/wordlists/dirb/small.txt"
else:
wordlist_path = "/usr/share/wordlists/dirb/common.txt"
command = ["dirb", target, wordlist_path]
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"dirb_{timestamp}.txt", stdout)
return f"📁 DIRB scan completed:\n{stdout}"
else:
return f"❌ DIRB scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def nslookup_query(domain: str = "", query_type: str = "A") -> str:
"""DNS record queries supporting A, AAAA, MX, NS, TXT, CNAME, SOA, and more."""
logger.info(f"Performing DNS lookup for {domain}")
if not domain.strip():
return "❌ Error: Domain is required"
domain = sanitize_input(domain)
query_type = sanitize_input(query_type.upper())
if not validate_ip_or_domain(domain):
return "❌ Error: Invalid domain format"
valid_types = ["A", "AAAA", "MX", "NS", "TXT", "CNAME", "SOA", "PTR", "ANY"]
if query_type not in valid_types:
query_type = "A"
try:
command = ["nslookup", "-type=" + query_type, domain]
returncode, stdout, stderr = run_command(command, timeout=15)
if returncode == 0:
return f"🌐 DNS lookup results:\n{stdout}"
else:
return f"❌ DNS lookup failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def whois_lookup(domain: str = "") -> str:
"""WHOIS domain registration information lookup."""
logger.info(f"Performing WHOIS lookup for {domain}")
if not domain.strip():
return "❌ Error: Domain is required"
domain = sanitize_input(domain)
try:
command = ["whois", domain]
returncode, stdout, stderr = run_command(command, timeout=30)
if returncode == 0:
return f"📋 WHOIS lookup results:\n{stdout}"
else:
return f"❌ WHOIS lookup failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def traceroute_scan(target: str = "") -> str:
"""Network path tracing to identify routing and network topology."""
logger.info(f"Executing traceroute to {target}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
try:
command = ["traceroute", target]
returncode, stdout, stderr = run_command(command, timeout=60)
if returncode == 0:
return f"🗺️ Traceroute results:\n{stdout}"
else:
return f"❌ Traceroute failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
# === ADVANCED WEB TESTING TOOLS ===
@mcp.tool()
async def feroxbuster_scan(url: str = "", wordlist: str = "common", depth: str = "4", threads: str = "50") -> str:
"""Fast recursive content discovery with Feroxbuster - modern replacement for dirb/gobuster."""
logger.info(f"Executing Feroxbuster on {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
depth = sanitize_input(depth)
threads = sanitize_input(threads)
if not url.startswith(('http://', 'https://')):
url = f"http://{url}"
# Select wordlist
if wordlist == "big":
wlist = "/usr/share/wordlists/dirb/big.txt"
elif wordlist == "common":
wlist = "/usr/share/wordlists/dirb/common.txt"
else:
wlist = "/usr/share/wordlists/dirb/common.txt"
try:
command = ["feroxbuster", "-u", url, "-w", wlist, "-d", depth, "-t", threads, "--silent"]
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"feroxbuster_{timestamp}.txt", stdout)
return f"🦀 Feroxbuster scan completed:\n{stdout}"
else:
return f"❌ Feroxbuster scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def httpx_probe(target: str = "", mode: str = "basic") -> str:
"""Fast HTTP probing with httpx for web service discovery and technology detection."""
logger.info(f"Executing httpx probe on {target}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
mode = sanitize_input(mode.lower())
try:
command = ["httpx", "-u", target]
if mode == "tech":
command.extend(["-tech-detect", "-status-code", "-title"])
elif mode == "full":
command.extend(["-tech-detect", "-status-code", "-title", "-server", "-content-length", "-follow-redirects"])
elif mode == "paths":
command.extend(["-path", "/admin,/login,/dashboard,/api", "-status-code"])
else:
command.extend(["-status-code", "-title"])
returncode, stdout, stderr = run_command(command, timeout=60)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"httpx_{timestamp}.txt", stdout)
return f"⚡ Httpx probe completed:\n{stdout}"
else:
return f"❌ Httpx probe failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def subfinder_enum(domain: str = "", sources: str = "all") -> str:
"""Fast passive subdomain enumeration using multiple data sources with Subfinder."""
logger.info(f"Executing Subfinder on {domain}")
if not domain.strip():
return "❌ Error: Domain is required"
domain = sanitize_input(domain)
sources = sanitize_input(sources.lower())
try:
command = ["subfinder", "-d", domain, "-silent"]
if sources != "all":
command.extend(["-sources", sources])
returncode, stdout, stderr = run_command(command, timeout=120)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"subfinder_{domain}_{timestamp}.txt", stdout)
return f"🔍 Subfinder enumeration completed:\n{stdout}"
else:
return f"❌ Subfinder enumeration failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def katana_crawl(url: str = "", depth: str = "3", js_crawl: str = "yes") -> str:
"""Advanced web crawler with JavaScript parsing using Katana for comprehensive site mapping."""
logger.info(f"Executing Katana crawler on {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
depth = sanitize_input(depth)
try:
command = ["katana", "-u", url, "-d", depth, "-silent"]
if js_crawl.lower() == "yes":
command.append("-jc")
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"katana_{timestamp}.txt", stdout)
return f"🕷️ Katana crawl completed:\n{stdout}"
else:
return f"❌ Katana crawl failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def kiterunner_api(url: str = "", wordlist: str = "routes") -> str:
"""API endpoint and route discovery using Kiterunner for REST API reconnaissance."""
logger.info(f"Executing Kiterunner on {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
try:
# Kiterunner has built-in wordlists
command = ["kr", "scan", url, "-w", wordlist]
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"kiterunner_{timestamp}.txt", stdout)
return f"🪁 Kiterunner API scan completed:\n{stdout}"
else:
return f"❌ Kiterunner scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def arjun_params(url: str = "", method: str = "GET") -> str:
"""HTTP parameter discovery using Arjun to find hidden GET/POST parameters."""
logger.info(f"Executing Arjun parameter discovery on {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
method = sanitize_input(method.upper())
try:
command = ["arjun", "-u", url, "-m", method]
returncode, stdout, stderr = run_command(command, timeout=180)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"arjun_{timestamp}.txt", stdout)
return f"🔍 Arjun parameter discovery completed:\n{stdout}"
else:
return f"❌ Arjun scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def dalfox_xss(url: str = "", mode: str = "basic") -> str:
"""Advanced XSS vulnerability scanner using Dalfox with WAF bypass capabilities."""
logger.info(f"Executing Dalfox XSS scan on {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
mode = sanitize_input(mode.lower())
try:
command = ["dalfox", "url", url]
if mode == "deep":
command.extend(["--deep-domxss", "--mining-dict"])
elif mode == "blind":
command.append("--blind")
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"dalfox_{timestamp}.txt", stdout)
return f"🎯 Dalfox XSS scan completed:\n{stdout}"
else:
return f"❌ Dalfox scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def gau_urls(domain: str = "", providers: str = "all") -> str:
"""Fetch known URLs from AlienVault, Wayback Machine, Common Crawl, and URLScan using gau."""
logger.info(f"Executing gau URL gathering for {domain}")
if not domain.strip():
return "❌ Error: Domain is required"
domain = sanitize_input(domain)
try:
command = ["gau", domain]
if providers != "all":
command.extend(["--providers", providers])
returncode, stdout, stderr = run_command(command, timeout=120)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"gau_{domain}_{timestamp}.txt", stdout)
return f"📚 Gau URL collection completed:\n{stdout[:5000]}..." # Truncate long output
else:
return f"❌ Gau collection failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def waybackurls_fetch(domain: str = "") -> str:
"""Fetch all URLs from Wayback Machine archives for historical endpoint discovery."""
logger.info(f"Fetching Wayback URLs for {domain}")
if not domain.strip():
return "❌ Error: Domain is required"
domain = sanitize_input(domain)
try:
command = ["waybackurls", domain]
returncode, stdout, stderr = run_command(command, timeout=120)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"waybackurls_{domain}_{timestamp}.txt", stdout)
return f"📜 Wayback URLs fetched:\n{stdout[:5000]}..." # Truncate long output
else:
return f"❌ Wayback fetch failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def hakrawler_spider(url: str = "", depth: str = "2") -> str:
"""Fast web crawler for gathering endpoints, URLs, and JavaScript files using hakrawler."""
logger.info(f"Executing hakrawler on {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
depth = sanitize_input(depth)
try:
command = ["hakrawler", "-url", url, "-depth", depth, "-plain"]
returncode, stdout, stderr = run_command(command, timeout=180)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"hakrawler_{timestamp}.txt", stdout)
return f"🕸️ Hakrawler spider completed:\n{stdout}"
else:
return f"❌ Hakrawler failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def getjs_extract(url: str = "") -> str:
"""Extract JavaScript files from a target for analysis and endpoint discovery."""
logger.info(f"Extracting JavaScript files from {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
try:
command = ["getJS", "--url", url, "--complete"]
returncode, stdout, stderr = run_command(command, timeout=120)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"getjs_{timestamp}.txt", stdout)
return f"📜 JavaScript files extracted:\n{stdout}"
else:
return f"❌ JavaScript extraction failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def crlfuzz_test(url: str = "") -> str:
"""CRLF injection vulnerability scanner for header injection and HTTP response splitting."""
logger.info(f"Testing CRLF injection on {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
try:
command = ["crlfuzz", "-u", url, "-silent"]
returncode, stdout, stderr = run_command(command, timeout=120)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"crlfuzz_{timestamp}.txt", stdout)
return f"🔍 CRLF injection test completed:\n{stdout}"
else:
return f"❌ CRLF test failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def naabu_portscan(target: str = "", ports: str = "top-1000", rate: str = "1000") -> str:
"""Fast port scanner using Naabu with SYN/CONNECT scan capabilities."""
logger.info(f"Executing Naabu port scan on {target}")
if not target.strip():
return "❌ Error: Target is required"
target = sanitize_input(target)
ports = sanitize_input(ports)
rate = sanitize_input(rate)
try:
command = ["naabu", "-host", target, "-rate", rate, "-silent"]
if ports == "full":
command.extend(["-p", "-"])
elif ports != "top-1000":
command.extend(["-p", ports])
returncode, stdout, stderr = run_command(command, timeout=300)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"naabu_{target}_{timestamp}.txt", stdout)
return f"⚡ Naabu port scan completed:\n{stdout}"
else:
return f"❌ Naabu scan failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def dnsx_resolve(domain: str = "", query_type: str = "A") -> str:
"""Fast DNS resolution and enumeration using dnsx with multiple record type support."""
logger.info(f"Executing dnsx resolution for {domain}")
if not domain.strip():
return "❌ Error: Domain is required"
domain = sanitize_input(domain)
query_type = sanitize_input(query_type.upper())
try:
command = ["dnsx", "-d", domain, "-silent"]
if query_type != "A":
command.extend(["-t", query_type])
returncode, stdout, stderr = run_command(command, timeout=60)
if returncode == 0 or stdout:
return f"🌐 DNSX resolution completed:\n{stdout}"
else:
return f"❌ DNSX resolution failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def assetfinder_discover(domain: str = "") -> str:
"""Find related domains and subdomains using OSINT sources with assetfinder."""
logger.info(f"Discovering assets for {domain}")
if not domain.strip():
return "❌ Error: Domain is required"
domain = sanitize_input(domain)
try:
command = ["assetfinder", "--subs-only", domain]
returncode, stdout, stderr = run_command(command, timeout=120)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"assetfinder_{domain}_{timestamp}.txt", stdout)
return f"🔎 Assetfinder discovery completed:\n{stdout}"
else:
return f"❌ Assetfinder failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def gospider_crawl(url: str = "", depth: str = "2", concurrent: str = "5") -> str:
"""Fast web spider for discovering endpoints and extracting URLs using gospider."""
logger.info(f"Executing gospider on {url}")
if not url.strip():
return "❌ Error: URL is required"
url = sanitize_input(url)
depth = sanitize_input(depth)
concurrent = sanitize_input(concurrent)
try:
command = ["gospider", "-s", url, "-d", depth, "-c", concurrent, "--sitemap", "--robots"]
returncode, stdout, stderr = run_command(command, timeout=180)
if returncode == 0 or stdout:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_results(f"gospider_{timestamp}.txt", stdout)
return f"🕷️ Gospider crawl completed:\n{stdout}"
else:
return f"❌ Gospider crawl failed:\n{stderr}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {str(e)}"
# === RESULTS MANAGEMENT ===
@mcp.tool()
async def list_results(limit: str = "10") -> str:
"""List recently saved scan results with timestamps."""
logger.info("Listing saved results")
try:
results = sorted(RESULTS_DIR.glob("*"), key=lambda x: x.stat().st_mtime, reverse=True)
limit_int = int(limit) if limit.isdigit() else 10
results = results[:limit_int]
if not results:
return "📂 No results found"
output = "📂 Recent scan results:\n\n"
for result in results:
stat = result.stat()
timestamp = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
size = stat.st_size
output += f" • {result.name} ({size} bytes) - {timestamp}\n"
return output
except Exception as e:
logger.error(f"Error listing results: {e}")
return f"❌ Error: {str(e)}"
@mcp.tool()
async def read_result(filename: str = "") -> str:
"""Read contents of a saved scan result file."""
logger.info(f"Reading result file: {filename}")
if not filename.strip():
return "❌ Error: Filename is required"
filename = sanitize_input(filename)
try:
filepath = RESULTS_DIR / filename
if not filepath.exists():
return f"❌ Error: File {filename} not found"
content = filepath.read_text()
return f"📄 Content of {filename}:\n\n{content}"
except Exception as e:
logger.error(f"Error reading result: {e}")
return f"❌ Error: {str(e)}"
# === SERVER STARTUP ===
if __name__ == "__main__":
logger.info("Starting Professional Penetration Testing MCP Server...")
# Startup checks for critical tools
required_tools = [
"nmap", "masscan", "nikto", "sqlmap", "wpscan", "ffuf", "nuclei",
"searchsploit", "msfconsole", "hydra", "john", "hashcat",
"enum4linux", "smbmap", "sslscan", "testssl.sh", "aircrack-ng",
"whatweb", "gobuster", "dirb", "dnsenum", "amass", "wafw00f",
"feroxbuster", "httpx", "subfinder", "katana", "kr", "arjun",
"dalfox", "gau", "waybackurls", "hakrawler", "getJS", "crlfuzz",
"naabu", "dnsx", "assetfinder", "gospider"
]
available_tools = []
missing_tools = []
for tool in required_tools:
try:
subprocess.run(["which", tool], check=True, capture_output=True)
available_tools.append(tool)
logger.info(f"✓ {tool} is available")
except subprocess.CalledProcessError:
missing_tools.append(tool)
logger.warning(f"⚠ {tool} not found in PATH")
logger.info(f"Available tools: {len(available_tools)}/{len(required_tools)}")
if missing_tools:
logger.warning(f"Missing tools: {', '.join(missing_tools)}")
try:
mcp.run(transport='stdio')
except Exception as e:
logger.error(f"Server error: {e}", exc_info=True)
sys.exit(1)