import argparse
import asyncio
import glob
import json
import os
import re
import shlex
import subprocess
import uuid
from typing import Any, List, Optional, Union
import psutil
from mcp.server.fastmcp import FastMCP
from datetime import datetime
from shutil import which
from pathlib import Path
from tools.nuclei import run_nuclei, get_nuclei_output
from tools.nmap import run_nmap, get_nmap_output
from tools.gofang import run_gofang, get_gofang_output
from tools.curl_tool import run_curl, CurlError
from tools.arjun_tool import run_arjun, get_arjun_output, ArjunError
from tools.ad_tools import (
user_enum, smb_signing_check, shares_enum, password_spray,
asreproast, kerberoast, relay_setup, coerce_petitpotam,
coerce_printerbug, responder_poison, mitm6_attack,
bloodhound_collect, secrets_dump, dcsync, certipy_enum,
certipy_esc8, certipy_request, ldap_dump, check_credentials
)
DEFAULT_TIMEOUT = 300 # seconds
_scan_semaphore = asyncio.Semaphore(5)
mcp = FastMCP("pentestMCP")
@mcp.tool()
def launch_gofang_scan(url: str, args: str = "") -> dict:
"""
Launch gofang web crawler against the given target URL.
gofang is an all-in-one web crawler with extraction superpowers.
:param url: The target URL to crawl (e.g. "https://example.com")
:param args: Additional arguments for gofang.
CRAWL:
-d, --depth <int> maximum depth to crawl (default 3)
-mp, --max-pages <int> maximum number of pages to crawl (default 500)
-c, --concurrency <int> number of concurrent crawl workers (default 5)
-rl, --rate-limit <duration> delay between requests (default 200ms)
-ct, --crawl-duration <duration> maximum duration to crawl the target for (e.g. 30s, 5m, 1h)
-s, --strategy <string> visit strategy: depth-first, breadth-first (default "depth-first")
-iqp, --ignore-query-params ignore crawling same path with different query-param values
REQUEST:
-ua, --user-agent <string> custom user-agent string
-t, --timeout <int> time to wait for request in seconds (default 10)
-rt, --retry <int> number of times to retry a failed request (default 1)
-mrs, --max-response-size <int> maximum response size to read in bytes (default 4194304)
-px, --proxy <string> http/socks5 proxy to use
-H, --header <string> custom header in "Key: Value" format (can be used multiple times)
-r, --resolver <string> list of custom resolvers, comma separated
-dr, --disable-redirects disable following redirects
-tlsi, --tls-impersonate enable experimental client hello (ja3) tls randomization
FEATURES:
-e, --external follow and extract external links
-jc, --js-crawl enable endpoint parsing / crawling in javascript files
-jsl, --js-luice enable jsluice parsing in javascript files (memory intensive)
-kf, --known-files <string> crawl known files: all, robotstxt, sitemapxml (min depth 3)
-aff, --auto-form-fill enable automatic form filling (experimental)
-fx, --form-extraction extract form, input, textarea & select elements in output
-td, --tech-detect enable technology detection
-f, --fetcher <string> fetcher mode: http, browser, auto (default "http")
--no-robots ignore robots.txt restrictions
OUTPUT:
-si, --silent suppress all output except errors
-v, --verbose show detailed extraction results per page
-nc, --no-color disable colored output
CONFIG:
--config <string> path to crawler configuration file
-fc, --form-config <string> path to custom form configuration file
-flc, --field-config <string> path to custom field configuration file
:return: Dictionary containing output_path and process pid
"""
output_path, pid = run_gofang(url, args)
return {"output_path": output_path, "pid": pid}
@mcp.tool()
def fetch_gofang_results(output_path: str, pid: int) -> Union[str, List[str]]:
"""
Non-blocking fetch for gofang crawl results:
- Returns immediately if process is still running.
- Otherwise attempts to read and return the crawl output.
:param output_path: Path to the output file from launch_gofang_scan
:param pid: Process ID of the gofang scan
:return: Crawl output lines or a status message
"""
try:
if psutil.pid_exists(pid):
return f"Scan is still running (PID {pid})"
return get_gofang_output(output_path, pid)
except FileNotFoundError:
return "Results not yet available."
except Exception as e:
return f"Error fetching gofang results: {e}"
@mcp.tool()
def run_subfinder(url: str, args: str) -> str:
"""
Run Subfinder against the given URL with the provided arguments.
:param url: The target URL to scan.
:param args: Additional arguments for Subfinder (use -h for help).
:return: The full output (stdout and stderr merged) from the subfinder execution.
"""
_scan_semaphore.acquire()
try:
command = ["/root/go/bin/subfinder", "-d", url] + shlex.split(args)
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
try:
output, _ = process.communicate(timeout=DEFAULT_TIMEOUT)
except subprocess.TimeoutExpired:
process.kill()
output, _ = process.communicate()
return f"Subfinder timed out after {DEFAULT_TIMEOUT} seconds"
return output
finally:
_scan_semaphore.release()
class SearchsploitError(RuntimeError):
pass
@mcp.tool()
def run_searchsploit(query: str, args: str = "") -> str:
"""
Run Searchsploit against the given query with optional arguments.
:param query: The search query for Searchsploit.
:param args: Additional arguments for Searchsploit (use -h for help).
:return: The output from the searchsploit execution.
"""
_scan_semaphore.acquire()
try:
if which("searchsploit") is None:
raise FileNotFoundError(
"searchsploit not found in PATH."
)
cmd = ["searchsploit"]
if args:
cmd.extend(shlex.split(args))
cmd.extend(shlex.split(query))
completed = subprocess.run(
cmd,
text=True,
capture_output=True,
timeout=DEFAULT_TIMEOUT
)
if completed.returncode != 0:
raise SearchsploitError(
f"searchsploit exited with status {completed.returncode}: {completed.stderr}"
)
return completed.stdout
finally:
_scan_semaphore.release()
@mcp.tool()
def fetch_whois_data(domain: str) -> str:
"""
Fetch WHOIS data for the given domain.
:param domain: The domain to fetch WHOIS data for.
:return: The WHOIS data as a string.
"""
try:
result = subprocess.run(['whois', domain], capture_output=True, text=True)
if result.returncode != 0:
return f"Error fetching data for {domain}: {result.stderr.strip()}"
return result.stdout
except Exception as e:
return f"Error fetching data for {domain}: {e}"
@mcp.tool()
def launch_nuclei_scan(target: str, args: str = "") -> dict:
"""
Launch a Nuclei scan against the given target with optional arguments.
:param target: The target URL or IP address to scan.
:param args: Additional arguments for Nuclei (use -h for help).
:return: A dictionary containing the output path and process ID.
"""
output_path, pid = run_nuclei(target, args)
return {"output_path": output_path, "pid": pid}
@mcp.tool()
def fetch_nuclei_results(output_path: str, pid: int) -> Union[str, List[Any]]:
"""
Non-blocking fetch for Nuclei scan results:
- Returns immediately if process is still running.
- Otherwise attempts to read and parse the JSON output.
"""
try:
if psutil.pid_exists(pid):
return f"Scan is still running (PID {pid})"
return get_nuclei_output(output_path, pid)
except FileNotFoundError:
return "Results not yet available."
except Exception as e:
return f"Error fetching Nuclei results: {e}"
@mcp.tool()
def launch_nmap_scan(target: str, args: str = "") -> dict:
"""
Launch a Nmap scan against the given target with optional arguments.
:param target: The target URL or IP address to scan.
:param args: Additional arguments for Nmap (use -h for help).
:return: A dictionary containing the output path and process ID.
"""
output_path, pid = run_nmap(target, args)
return {"output_path": output_path, "pid": pid}
@mcp.tool()
def fetch_nmap_results(output_path: str, pid: int) -> Union[str, List[str]]:
"""
Non-blocking fetch for Nmap scan results:
- Returns immediately if process is still running.
- Otherwise attempts to read and parse the Nmap output.
"""
try:
# psutil.pid_exists(pid) returns True for zombie processes on Linux.
# Treat zombies as completed and reap them so they don't remain as <defunct>.
if psutil.pid_exists(pid):
try:
p = psutil.Process(pid)
if p.status() != psutil.STATUS_ZOMBIE:
return f"Scan is still running (PID {pid})"
if os.name != 'nt':
try:
os.waitpid(pid, os.WNOHANG)
except (ChildProcessError, OSError):
pass
except psutil.NoSuchProcess:
pass
except Exception:
# If we can't inspect status, fall back to pid_exists behavior.
return f"Scan is still running (PID {pid})"
return get_nmap_output(output_path, pid)
except FileNotFoundError:
return "Results not yet available."
except Exception as e:
return f"Error fetching Nmap results: {e}"
Activescanner = ActiveScan()
"""
Run a Gobuster scan against the given target URL with the specified wordlist and arguments.
:param scan_type: The type of scan "dir" or "vhost".
:param target_url: The target URL to scan.
:param wordlist_path: The path to the wordlist file.
/opt/pentest-mcp/seclists/Discovery This Directory is used for wordlists and is popular seclist directory
:param args: Additional arguments for Gobuster.
:return: A tuple containing the output file path and the process ID.
"""
ts = datetime.now().strftime("%Y%m%d%H%M%S")
uid = uuid.uuid4().hex
job_id = f"{ts}_{uid}"
output_dir = os.path.join(os.getcwd(), "Gobuster_output")
os.makedirs(output_dir, exist_ok=True)
output_file = os.path.join(output_dir, f"{job_id}.txt")
command = [
"gobuster", scan_type,
"-u", target_url,
"-w", wordlist_path,
"-o", output_file,
] + args.split()
try:
proc = subprocess.Popen(
command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
close_fds=True,
)
return output_file, proc.pid
except Exception as e:
raise RuntimeError(f"Error starting Gobuster scan: {e}")
@mcp.tool()
def check_gobuster_status(output_file: str, pid: int) -> str:
"""
Check the status of a Gobuster scan.
:param output_file: Path to the output file
:param pid: Process ID of the Gobuster scan
:return: Status message or scan results
"""
if not os.path.exists(output_file):
return "Output file not yet created. Scan is likely starting..."
# Cross-platform process check using psutil
process_running = False
if psutil.pid_exists(pid):
try:
p = psutil.Process(pid)
if p.status() != psutil.STATUS_ZOMBIE:
process_running = True
except psutil.NoSuchProcess:
pass
if process_running:
return "Scan is still running..."
try:
with open(output_file, "r", encoding="utf-8", errors="ignore") as f:
return "Scan completed. Results:\n\n" + f.read()
except Exception as e:
return f"Scan ended, but failed to read output file: {e}"
@mcp.tool()
def run_dig_tool(domain: str, options: str) -> str:
"""
Run a dig command against the given domain with the provided options.
:param domain: The target domain to query.
:param options: Additional options for dig.
:return: The output from the dig command.
"""
_scan_semaphore.acquire()
try:
cmd = ["dig"] + shlex.split(options) + [domain]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
timeout=DEFAULT_TIMEOUT
)
return result.stdout
finally:
_scan_semaphore.release()
@mcp.tool()
def run_sqlmap_tool(target: str, options: str) -> dict:
"""
Run a SQLMap scan against the given target with the provided options.
:param target: The target URL to scan.
:param options: Additional options for SQLMap (use -h for help).
:return: A dictionary containing the process ID and output directory.
"""
base_dir = os.path.abspath("sqlmap_output")
os.makedirs(base_dir, exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
unique_id = uuid.uuid4().hex
workdir = os.path.join(base_dir, f"{timestamp}_{unique_id}")
os.makedirs(workdir)
cmd = ["sqlmap", "-u", target, "--batch", "--output-dir", workdir] + shlex.split(options)
proc = subprocess.Popen(cmd)
return {"pid": proc.pid, "output_dir": workdir}
@mcp.tool()
def check_sqlmap_status(pid: int, output_dir: str) -> str:
"""
Non-blocking status check for SQLMap scan:
- Returns immediately if process is still running.
- Otherwise summarizes results from the output directory.
"""
try:
if psutil.pid_exists(pid):
return f"Scan is still running (PID {pid})"
except Exception:
pass
if not os.path.isdir(output_dir):
return "Output directory not found; scan may not have started."
summary = [f"Scan completed. Results in: {output_dir}"]
base = output_dir
tgt_files = glob.glob(os.path.join(base, '**', 'target.txt'), recursive=True)
if tgt_files:
for tf in tgt_files:
summary.append(f"-- target.txt: {os.path.relpath(tf, base)} --")
try:
content = open(tf, 'r', encoding='utf-8', errors='ignore').read().strip()
summary.append(content or '(empty)')
except Exception as e:
summary.append(f"Failed to read {tf}: {e}")
else:
summary.append("No target.txt found.")
log_files = glob.glob(os.path.join(base, '**', 'log'), recursive=True) + glob.glob(os.path.join(base, '**', '*.log'), recursive=True)
if log_files:
for lf in log_files:
summary.append(f"-- Log: {os.path.relpath(lf, base)} --")
try:
lines = open(lf, 'r', encoding='utf-8', errors='ignore').read().splitlines()
for line in lines[-5:]:
summary.append(line)
except Exception as e:
summary.append(f"Failed to read {lf}: {e}")
else:
summary.append("No log file found.")
return "".join(summary)
@mcp.tool()
def run_harvester(domain: str, sources: str = "all", extra_args: str = "") -> tuple[int, str]:
"""
Fire off theHarvester asynchronously.
Parameters
----------
domain : target domain (passed to -d)
sources : comma-separated list of search engines (-b); default 'all'
extra_args : raw string of any other theHarvester CLI flags (e.g. '-l 100 -s')
Returns
-------
(pid, json_path)
pid : PID of the child theHarvester process
json_path : absolute path where <uuid>/<domain>.json will be written
"""
def safe_name(domain: str) -> str:
"""Convert e.g. 'ramkansal.com' → 'ramkansal-com' (only a-z, 0-9, _ or -)."""
return re.sub(r"[^A-Za-z0-9_-]", "-", domain.strip().lower())
base_output_dir = Path.home() / "theHarvester_output"
out_dir = base_output_dir / str(uuid.uuid4())
out_dir.mkdir(parents=True, exist_ok=True)
base_name = safe_name(domain)
output_prefix = out_dir / base_name # *no* extension
cmd = [
"theHarvester",
"-d",
domain,
"-b",
sources,
"-f",
str(output_prefix),
]
if extra_args:
cmd.extend(shlex.split(extra_args))
proc = subprocess.Popen(cmd)
json_path = f"{output_prefix}.json"
return proc.pid, json_path
@mcp.tool()
def check_harvester_output(pid: int, json_path: str):
"""
One-shot check: has theHarvester produced its JSON results yet?
• If the file exists → returns the parsed dict/list from JSON.
• If not → returns the literal string 'scan still running'.
This function does *not* poll or wait; call it as often as you need.
"""
json_path_expanded = os.path.expanduser(json_path)
if os.path.exists(json_path_expanded):
with open(json_path_expanded, "r", encoding="utf-8") as fh:
return json.load(fh)
return "scan still running"
@mcp.tool()
def run_curl_tool(target: str, opts: str) -> dict:
"""
Run a curl command against the given target with the provided options.
:param target: URL or hostname (e.g. "https://example.com")
:param opts: space-separated curl options (e.g. "-I -L", use -h for help)
:return: A dictionary containing status, stdout, and stderr
"""
_scan_semaphore.acquire()
try:
result = run_curl(target, opts)
return result
except CurlError as e:
return {
"status": "error",
"return_code": -1,
"error": str(e),
"stdout": "",
"stderr": str(e)
}
finally:
_scan_semaphore.release()
@mcp.tool()
def launch_arjun_scan(url: str, args: str = "") -> dict:
"""
Launch Arjun (HTTP parameter discovery tool) to find hidden HTTP parameters.
:param url: The target URL to scan (e.g. "https://example.com/page")
:param args: Additional arguments for Arjun (use -h for help)
:return: Dictionary containing output_path and process_id
"""
_scan_semaphore.acquire()
try:
output_path, pid = run_arjun(url, args)
return {
"status": "started",
"output_path": output_path,
"pid": pid,
"message": f"Arjun scan started with PID {pid}"
}
except ArjunError as e:
return {
"status": "error",
"error": str(e),
"message": f"Failed to start Arjun scan: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def fetch_arjun_results(output_path: str, pid: int) -> dict:
"""
Fetch results from a previously launched Arjun scan.
:param output_path: Path to the output file from the scan
:param pid: Process ID of the Arjun scan
:return: Dictionary containing scan results and status
"""
try:
results = get_arjun_output(output_path, pid)
return results
except Exception as e:
return {
"status": "error",
"message": f"Error fetching Arjun results: {str(e)}",
"pid": pid
}
@mcp.tool()
async def ad_user_enum(target: str, domain: str, username: Optional[str] = None, password: Optional[str] = None) -> dict:
"""
Enumerate domain users using NetExec
:param target: Target domain controller or host
:param domain: Domain name
:param username: Optional username for authentication
:param password: Optional password for authentication
:return: Dictionary with user enumeration results
"""
async with _scan_semaphore:
try:
result = await user_enum(target, domain, username, password)
return result
except Exception as e:
return {
"success": False,
"error": f"User enumeration failed: {str(e)}"
}
@mcp.tool()
async def ad_smb_signing_check(targets: str, output_file: str = "/tmp/relay_targets.txt") -> dict:
"""
Check SMB signing status to identify relay targets
:param targets: Target hosts/network range to check
:param output_file: Output file for relay targets
:return: Dictionary with signing status and relay targets
"""
async with _scan_semaphore:
try:
result = await smb_signing_check(targets, output_file)
return result
except Exception as e:
return {
"success": False,
"error": f"SMB signing check failed: {str(e)}"
}
@mcp.tool()
async def ad_shares_enum(target: str, username: str, password: str, domain: Optional[str] = None) -> dict:
"""
Enumerate SMB shares and permissions
:param target: Target host
:param username: Username for authentication
:param password: Password for authentication
:param domain: Optional domain name
:return: Dictionary with share enumeration results
"""
async with _scan_semaphore:
try:
result = await shares_enum(target, username, password, domain)
return result
except Exception as e:
return {
"success": False,
"error": f"Share enumeration failed: {str(e)}"
}
@mcp.tool()
async def ad_password_spray(target: str, domain: str, userfile: str, password: str, delay: int = 1) -> dict:
"""
Password spraying attack against domain accounts
:param target: Target domain controller
:param domain: Domain name
:param userfile: Path to file containing usernames
:param password: Password to spray
:param delay: Delay between attempts in seconds
:return: Dictionary with spray results
"""
async with _scan_semaphore:
try:
result = await password_spray(target, domain, userfile, password, delay)
return result
except Exception as e:
return {
"success": False,
"error": f"Password spray failed: {str(e)}"
}
@mcp.tool()
async def ad_asreproast(target: str, domain: str, userfile: Optional[str] = None, format: str = "hashcat") -> dict:
"""
AS-REP Roasting attack to harvest Kerberos hashes
:param target: Target domain controller
:param domain: Domain name
:param userfile: Optional file with usernames to target
:param format: Hash format (hashcat, john)
:return: Dictionary with AS-REP roasting results
"""
async with _scan_semaphore:
try:
result = await asreproast(target, domain, userfile, format)
return result
except Exception as e:
return {
"success": False,
"error": f"AS-REP roasting failed: {str(e)}"
}
@mcp.tool()
async def ad_kerberoast(target: str, domain: str, username: str, password: str, format: str = "hashcat") -> dict:
"""
Kerberoasting attack to extract service account hashes
:param target: Target domain controller
:param domain: Domain name
:param username: Username for authentication
:param password: Password for authentication
:param format: Hash format (hashcat, john)
:return: Dictionary with Kerberoasting results
"""
async with _scan_semaphore:
try:
result = await kerberoast(target, domain, username, password, format)
return result
except Exception as e:
return {
"success": False,
"error": f"Kerberoasting failed: {str(e)}"
}
@mcp.tool()
async def ad_relay_setup(targets: str, smb2support: bool = True, escalate_user: Optional[str] = None, dump_sam: bool = True, dump_lsass: bool = True, interface: str = "eth0") -> dict:
"""
Setup NTLM relay attack infrastructure
:param targets: Target hosts for relay attacks
:param smb2support: Enable SMB2 support
:param escalate_user: User to escalate privileges to
:param dump_sam: Dump SAM database
:param dump_lsass: Dump LSASS process
:param interface: Network interface to bind to
:return: Dictionary with relay setup status
"""
async with _scan_semaphore:
try:
result = await relay_setup(targets, smb2support, escalate_user, dump_sam, dump_lsass, interface)
return result
except Exception as e:
return {
"success": False,
"error": f"Relay setup failed: {str(e)}"
}
@mcp.tool()
async def ad_coerce_petitpotam(target: str, listener: str, pipe: str = "lsarpc", username: Optional[str] = None, password: Optional[str] = None) -> dict:
"""
Coerce authentication using PetitPotam (MS-EFSRPC)
:param target: Target host to coerce
:param listener: Relay listener IP
:param pipe: Named pipe to use
:param username: Optional username for authentication
:param password: Optional password for authentication
:return: Dictionary with coercion results
"""
async with _scan_semaphore:
try:
result = await coerce_petitpotam(target, listener, pipe, username, password)
return result
except Exception as e:
return {
"success": False,
"error": f"PetitPotam coercion failed: {str(e)}"
}
@mcp.tool()
async def ad_coerce_printerbug(target: str, listener: str, username: str, password: str, domain: str) -> dict:
"""
Coerce authentication using PrinterBug (MS-RPRN)
:param target: Target host to coerce
:param listener: Relay listener IP
:param username: Username for authentication
:param password: Password for authentication
:param domain: Domain name
:return: Dictionary with coercion results
"""
async with _scan_semaphore:
try:
result = await coerce_printerbug(target, listener, username, password, domain)
return result
except Exception as e:
return {
"success": False,
"error": f"PrinterBug coercion failed: {str(e)}"
}
@mcp.tool()
async def ad_responder_poison(interface: str, analyze: bool = False, wpad: bool = True, force_wpad_auth: bool = False, disable_smb: bool = False, disable_http: bool = False) -> dict:
"""
Start Responder to poison LLMNR/NBT-NS/mDNS
:param interface: Network interface to bind to
:param analyze: Analysis mode only
:param wpad: Enable WPAD poisoning
:param force_wpad_auth: Force WPAD authentication
:param disable_smb: Disable SMB server
:param disable_http: Disable HTTP server
:return: Dictionary with Responder status
"""
async with _scan_semaphore:
try:
result = await responder_poison(interface, analyze, wpad, force_wpad_auth, disable_smb, disable_http)
return result
except Exception as e:
return {
"success": False,
"error": f"Responder failed: {str(e)}"
}
@mcp.tool()
async def ad_bloodhound_collect(target: str, domain: str, username: str, password: str, collection_method: str = "All", dns_server: Optional[str] = None) -> dict:
"""
Collect BloodHound data for attack path analysis
:param target: Target domain controller
:param domain: Domain name
:param username: Username for authentication
:param password: Password for authentication
:param collection_method: Collection method (All, DCOnly, etc.)
:param dns_server: Optional DNS server
:return: Dictionary with BloodHound collection results
"""
async with _scan_semaphore:
try:
result = await bloodhound_collect(target, domain, username, password, collection_method, dns_server)
return result
except Exception as e:
return {
"success": False,
"error": f"BloodHound collection failed: {str(e)}"
}
@mcp.tool()
async def ad_secrets_dump(target: str, username: str, password: str, domain: Optional[str] = None, use_hash: bool = False, just_dc: bool = False) -> dict:
"""
Dump secrets (SAM/SYSTEM/NTDS) from target
:param target: Target host
:param username: Username for authentication
:param password: Password for authentication
:param domain: Optional domain name
:param use_hash: Use hash instead of password
:param just_dc: Only dump DC hashes
:return: Dictionary with secrets dump results
"""
async with _scan_semaphore:
try:
result = await secrets_dump(target, username, password, domain, use_hash, just_dc)
return result
except Exception as e:
return {
"success": False,
"error": f"Secrets dump failed: {str(e)}"
}
@mcp.tool()
async def ad_dcsync(target: str, domain: str, username: str, password: str, target_user: Optional[str] = None, use_hash: bool = False) -> dict:
"""
Perform DCSync attack
:param target: Target domain controller
:param domain: Domain name
:param username: Username for authentication
:param password: Password for authentication
:param target_user: Specific user to target
:param use_hash: Use hash instead of password
:return: Dictionary with DCSync results
"""
async with _scan_semaphore:
try:
result = await dcsync(target, domain, username, password, target_user, use_hash)
return result
except Exception as e:
return {
"success": False,
"error": f"DCSync failed: {str(e)}"
}
@mcp.tool()
async def ad_certipy_enum(target: str, domain: str, username: str, password: str, vulnerable: bool = False) -> dict:
"""
Enumerate ADCS for vulnerabilities
:param target: Target domain controller
:param domain: Domain name
:param username: Username for authentication
:param password: Password for authentication
:param vulnerable: Only show vulnerable configurations
:return: Dictionary with ADCS enumeration results
"""
async with _scan_semaphore:
try:
result = await certipy_enum(target, domain, username, password, vulnerable)
return result
except Exception as e:
return {
"success": False,
"error": f"Certipy enumeration failed: {str(e)}"
}
@mcp.tool()
async def ad_ldap_dump(target: str, domain: str, username: str, password: str) -> dict:
"""
Comprehensive LDAP dump
:param target: Target domain controller
:param domain: Domain name
:param username: Username for authentication
:param password: Password for authentication
:return: Dictionary with LDAP dump results
"""
async with _scan_semaphore:
try:
result = await ldap_dump(target, domain, username, password)
return result
except Exception as e:
return {
"success": False,
"error": f"LDAP dump failed: {str(e)}"
}
@mcp.tool()
async def ad_check_credentials(target: str, domain: str, username: str, password: str) -> dict:
"""
Quick credential validation
:param target: Target host
:param domain: Domain name
:param username: Username to validate
:param password: Password to validate
:return: Dictionary with credential validation results
"""
async with _scan_semaphore:
try:
result = await check_credentials(target, domain, username, password)
return result
except Exception as e:
return {
"success": False,
"error": f"Credential check failed: {str(e)}"
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="PentestMCP Security Tools Server")
parser.add_argument(
"--transport",
choices=["stdio", "streamable-http"],
default="stdio",
help="Transport protocol (default: stdio)"
)
parser.add_argument(
"--host",
default="0.0.0.0",
help="Host to bind HTTP server (default: 0.0.0.0)"
)
parser.add_argument(
"--port",
type=int,
default=8080,
help="Port for HTTP server (default: 8080)"
)
args = parser.parse_args()
if args.transport == "streamable-http":
mcp.run(transport="streamable-http", host=args.host, port=args.port)
else:
mcp.run()