Skip to main content
Glama

Kali Pentest MCP Server

by SalmanFaris7
kali_pentest_server.py10.9 kB
#!/usr/bin/env python3 """ Simple Kali Pentest MCP Server - Educational web penetration testing tools """ import os import sys import logging import subprocess import re from mcp.server.fastmcp import FastMCP # Configure logging to stderr logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stderr ) logger = logging.getLogger("kali-pentest-server") # Initialize MCP server - NO PROMPT PARAMETER! mcp = FastMCP("kali-pentest") # Configuration TARGET_WHITELIST = os.environ.get("PENTEST_TARGET_WHITELIST", "localhost,127.0.0.1").split(",") # === UTILITY FUNCTIONS === def sanitize_input(input_str: str) -> str: """Sanitize input to prevent command injection.""" # Remove dangerous characters dangerous_chars = [';', '&', '|', '`', '$', '(', ')', '<', '>', '\n', '\r'] clean = input_str for char in dangerous_chars: clean = clean.replace(char, '') return clean.strip() def is_valid_target(target: str) -> bool: """Check if target is in whitelist.""" # Extract domain/IP from URL if needed if target.startswith(('http://', 'https://')): parts = target.split('/') if len(parts) >= 3: target = parts[2].split(':')[0] # Check against whitelist for allowed in TARGET_WHITELIST: if allowed.strip() in target or target in allowed.strip(): return True return False def run_command_safe(command: list, timeout: int = 60) -> str: """Run command safely with timeout.""" try: result = subprocess.run( command, capture_output=True, text=True, timeout=timeout ) if result.returncode == 0: return result.stdout else: return f"Error: {result.stderr}" except subprocess.TimeoutExpired: return "Command timed out after {} seconds".format(timeout) except Exception as e: return f"Command error: {str(e)}" # === MCP TOOLS === @mcp.tool() async def nmap_scan(target: str = "", scan_type: str = "basic") -> str: """Perform network port scanning with nmap on whitelisted targets.""" logger.info(f"Executing nmap scan on {target}") if not target.strip(): return "❌ Error: Target is required" target = sanitize_input(target) if not is_valid_target(target): return f"❌ Error: Target '{target}' is not in whitelist. Add to PENTEST_TARGET_WHITELIST environment variable." try: # Build nmap command based on scan type if scan_type == "basic": cmd = ["nmap", "-sT", "-T4", "--top-ports", "100", target] elif scan_type == "full": cmd = ["nmap", "-sT", "-T4", "-p-", target] elif scan_type == "service": cmd = ["nmap", "-sV", "-T4", "--top-ports", "1000", target] else: cmd = ["nmap", "-sT", "-T4", "--top-ports", "100", target] result = run_command_safe(cmd, timeout=120) return f"🔍 Nmap Scan Results:\n\n{result}" except Exception as e: logger.error(f"Nmap error: {e}") return f"❌ Error running nmap: {str(e)}" @mcp.tool() async def nikto_scan(target: str = "") -> str: """Run Nikto web vulnerability scanner on whitelisted targets.""" logger.info(f"Executing nikto scan on {target}") if not target.strip(): return "❌ Error: Target URL is required" target = sanitize_input(target) if not is_valid_target(target): return f"❌ Error: Target '{target}' is not in whitelist" try: # Ensure URL has protocol if not target.startswith(('http://', 'https://')): target = f"http://{target}" cmd = ["nikto", "-h", target, "-Tuning", "123456789", "-timeout", "30"] result = run_command_safe(cmd, timeout=180) return f"🌐 Nikto Scan Results:\n\n{result}" except Exception as e: logger.error(f"Nikto error: {e}") return f"❌ Error running nikto: {str(e)}" @mcp.tool() async def dirb_scan(target: str = "", wordlist: str = "common") -> str: """Run directory enumeration with dirb on whitelisted targets.""" logger.info(f"Executing dirb scan on {target}") if not target.strip(): return "❌ Error: Target URL is required" target = sanitize_input(target) if not is_valid_target(target): return f"❌ Error: Target '{target}' is not in whitelist" try: # Ensure URL has protocol if not target.startswith(('http://', 'https://')): target = f"http://{target}" # Select wordlist if wordlist == "common": wl_path = "/usr/share/dirb/wordlists/common.txt" elif wordlist == "small": wl_path = "/usr/share/dirb/wordlists/small.txt" else: wl_path = "/usr/share/dirb/wordlists/common.txt" cmd = ["dirb", target, wl_path, "-S", "-r"] result = run_command_safe(cmd, timeout=120) return f"📁 Directory Enumeration Results:\n\n{result}" except Exception as e: logger.error(f"Dirb error: {e}") return f"❌ Error running dirb: {str(e)}" @mcp.tool() async def wpscan_check(target: str = "") -> str: """Run WordPress vulnerability scanner on whitelisted targets.""" logger.info(f"Executing wpscan on {target}") if not target.strip(): return "❌ Error: Target URL is required" target = sanitize_input(target) if not is_valid_target(target): return f"❌ Error: Target '{target}' is not in whitelist" try: # Ensure URL has protocol if not target.startswith(('http://', 'https://')): target = f"http://{target}" cmd = ["wpscan", "--url", target, "--enumerate", "vp,vt,u", "--plugins-detection", "passive"] result = run_command_safe(cmd, timeout=180) return f"📝 WordPress Scan Results:\n\n{result}" except Exception as e: logger.error(f"WPScan error: {e}") return f"❌ Error running wpscan: {str(e)}" @mcp.tool() async def sqlmap_test(target: str = "", param: str = "") -> str: """Test for SQL injection vulnerabilities on whitelisted targets.""" logger.info(f"Executing sqlmap test on {target}") if not target.strip(): return "❌ Error: Target URL is required" target = sanitize_input(target) param = sanitize_input(param) if not is_valid_target(target): return f"❌ Error: Target '{target}' is not in whitelist" try: # Basic SQL injection test cmd = ["sqlmap", "-u", target, "--batch", "--level=1", "--risk=1", "--timeout=30"] # Add parameter if specified if param.strip(): cmd.extend(["-p", param]) result = run_command_safe(cmd, timeout=120) # Extract important findings if "might be injectable" in result or "is vulnerable" in result: return f"⚠️ SQL Injection Test Results:\n\n{result}" else: return f"✅ SQL Injection Test Results:\n\n{result}" except Exception as e: logger.error(f"SQLMap error: {e}") return f"❌ Error running sqlmap: {str(e)}" @mcp.tool() async def searchsploit(search_term: str = "") -> str: """Search for exploits in the Exploit Database.""" logger.info(f"Searching exploits for: {search_term}") if not search_term.strip(): return "❌ Error: Search term is required" search_term = sanitize_input(search_term) try: cmd = ["searchsploit", search_term, "--colour"] result = run_command_safe(cmd, timeout=30) if result and "No Results" not in result: return f"🔒 Exploit Search Results for '{search_term}':\n\n{result}" else: return f"📊 No exploits found for '{search_term}'" except Exception as e: logger.error(f"Searchsploit error: {e}") return f"❌ Error running searchsploit: {str(e)}" @mcp.tool() async def check_target_whitelist() -> str: """Display currently whitelisted targets for testing.""" logger.info("Checking target whitelist") try: whitelist = TARGET_WHITELIST if whitelist: formatted_list = "\n".join([f" • {target.strip()}" for target in whitelist]) return f"✅ Current Whitelisted Targets:\n{formatted_list}\n\nTo add more targets, set PENTEST_TARGET_WHITELIST environment variable." else: return "⚠️ No targets whitelisted. Set PENTEST_TARGET_WHITELIST environment variable." except Exception as e: logger.error(f"Error checking whitelist: {e}") return f"❌ Error: {str(e)}" @mcp.tool() async def quick_recon(target: str = "") -> str: """Run quick reconnaissance scan combining multiple tools on whitelisted target.""" logger.info(f"Running quick recon on {target}") if not target.strip(): return "❌ Error: Target is required" target = sanitize_input(target) if not is_valid_target(target): return f"❌ Error: Target '{target}' is not in whitelist" try: results = [] # Quick nmap scan nmap_cmd = ["nmap", "-sT", "-T4", "--top-ports", "20", target] nmap_result = run_command_safe(nmap_cmd, timeout=60) results.append(f"=== Port Scan ===\n{nmap_result[:500]}") # Check if HTTP/HTTPS is available if "80/tcp" in nmap_result or "443/tcp" in nmap_result or "8080/tcp" in nmap_result: # Ensure URL format if not target.startswith(('http://', 'https://')): url_target = f"http://{target}" else: url_target = target # Quick dirb scan with limited recursion dirb_cmd = ["dirb", url_target, "/usr/share/dirb/wordlists/small.txt", "-S", "-r"] dirb_result = run_command_safe(dirb_cmd, timeout=60) results.append(f"\n=== Directory Scan ===\n{dirb_result[:500]}") return f"⚡ Quick Reconnaissance Results:\n\n" + "\n".join(results) except Exception as e: logger.error(f"Quick recon error: {e}") return f"❌ Error during reconnaissance: {str(e)}" # === SERVER STARTUP === if __name__ == "__main__": logger.info("Starting Kali Pentest MCP server...") logger.info(f"Whitelisted targets: {', '.join(TARGET_WHITELIST)}") # Warning message logger.warning("This server is for educational purposes only!") logger.warning("Only test systems you own or have explicit permission to test.") try: mcp.run(transport='stdio') except Exception as e: logger.error(f"Server error: {e}", exc_info=True) sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SalmanFaris7/kali-pentest-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server