import os
import threading
import uuid
import psutil
import subprocess
import glob
import shlex
import re
import json
from typing import Any, List, Union
from mcp.server.fastmcp import FastMCP
from datetime import datetime
from shutil import which
from pathlib import Path
from tools.nuclei import run_nuclei, get_nuclei_output
from tools.nmap import run_nmap, get_nmap_output
from tools.ZAP_Active_Scan import ActiveScan
from tools.ZAP_Ajax_Scan import ajaxSpider
from tools.ZAP_Spider_Scan import run_zap_scan
from tools.curl_tool import run_curl, CurlError
from tools.arjun_tool import run_arjun, get_arjun_output, ArjunError
from tools.ad_tools import (
user_enum, smb_signing_check, shares_enum, password_spray,
asreproast, kerberoast, relay_setup, coerce_petitpotam,
coerce_printerbug, responder_poison, mitm6_attack,
bloodhound_collect, secrets_dump, dcsync, certipy_enum,
certipy_esc8, certipy_request, ldap_dump, check_credentials
)
DEFAULT_TIMEOUT = 300 # seconds
_scan_semaphore = threading.Semaphore(5)
ZAP_BASE_URL = "http://localhost:8888"
mcp = FastMCP("pentestMCP")
@mcp.tool()
def run_spider_tool(url: str, recurse: bool, max_children: int) -> str:
"""
Run a ZAP Spider scan on the given URL.
:param url: The target URL to scan.
:param recurse: Whether to recurse into the site.
:param max_children: The maximum number of child nodes to crawl.
:return: the Scan Results.
"""
return run_zap_scan(
url=url,
recurse=recurse,
max_children=max_children,
port=8888,
user_id="ram_claud",
output="LoL"
)
@mcp.tool()
def run_subfinder(url: str, args: str) -> str:
"""
Run Subfinder against the given URL with the provided arguments.
:param url: The target URL to scan.
:param args: Additional arguments for Subfinder.
SOURCE:
-s, -sources string[] specific sources to use for discovery (-s crtsh,github). Use -ls to display all available sources.
-recursive use only sources that can handle subdomains recursively rather than both recursive and non-recursive sources
-all use all sources for enumeration (slow)
-es, -exclude-sources string[] sources to exclude from enumeration (-es alienvault,zoomeyeapi)
FILTER:
-m, -match string[] subdomain or list of subdomain to match (file or comma separated)
-f, -filter string[] subdomain or list of subdomain to filter (file or comma separated)
RATE-LIMIT:
-rl, -rate-limit int maximum number of http requests to send per second (global)
-rls, -rate-limits value maximum number of http requests to send per second for providers in key=value format (-rls hackertarget=10/m) (default ["github=30/m", "fullhunt=60/m", "robtex=18446744073709551615/ms", "securitytrails=1/s", "shodan=1/s", "virustotal=4/m", "hackertarget=2/s", "waybackarchive=15/m", "whoisxmlapi=50/s", "securitytrails=2/s", "sitedossier=8/m", "netlas=1/s", "github=83/m", "hudsonrock=5/s"])
-t int number of concurrent goroutines for resolving (-active only) (default 10)
UPDATE:
-up, -update update subfinder to latest version
-duc, -disable-update-check disable automatic subfinder update check
CONFIGURATION:
-r string[] comma separated list of resolvers to use
-rL, -rlist string file containing list of resolvers to use
-nW, -active display active subdomains only
-proxy string http proxy to use with subfinder
-ei, -exclude-ip exclude IPs from the list of domains
DEBUG:
-silent show only subdomains in output
-version show version of subfinder
-v show verbose output
-nc, -no-color disable color in output
-ls, -list-sources list all available sources
-stats report source statistics
:return: The full output (stdout and stderr merged) from the subfinder execution.
"""
_scan_semaphore.acquire()
try:
command = ["/root/go/bin/subfinder", "-d", url] + shlex.split(args)
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
try:
output, _ = process.communicate(timeout=DEFAULT_TIMEOUT)
except subprocess.TimeoutExpired:
process.kill()
output, _ = process.communicate()
return f"Subfinder timed out after {DEFAULT_TIMEOUT} seconds"
return output
finally:
_scan_semaphore.release()
class SearchsploitError(RuntimeError):
pass
@mcp.tool()
def run_searchsploit(query: str, args: str = "") -> str:
"""
Run Searchsploit against the given query with optional arguments.
:param query: The search query for Searchsploit.
:param args: Additional arguments for Searchsploit.
=========
Options
=========
## Search Terms
-c, --case [term] Perform a case-sensitive search (Default is inSEnsITiVe)
-e, --exact [term] Perform an EXACT & order match on exploit title (Default is an AND match on each term) [Implies "-t"]
e.g. "WordPress 4.1" would not be detect "WordPress Core 4.1")
-s, --strict Perform a strict search, so input values must exist, disabling fuzzy search for version range
e.g. "1.1" would not be detected in "1.0 < 1.3")
-t, --title [term] Search JUST the exploit title (Default is title AND the file's path)
--exclude="term" Remove values from results. By using "|" to separate, you can chain multiple values
e.g. --exclude="term1|term2|term3"
--cve [CVE] Search for Common Vulnerabilities and Exposures (CVE) value
## Output
-j, --json [term] Show result in JSON format
-o, --overflow [term] Exploit titles are allowed to overflow their columns
-p, --path [EDB-ID] Show the full path to an exploit (and also copies the path to the clipboard if possible)
-v, --verbose Display more information in output
-w, --www [term] Show URLs to Exploit-DB.com rather than the local path
--id Display the EDB-ID value rather than local path
--disable-colour Disable colour highlighting in search results
## Non-Searching
-m, --mirror [EDB-ID] Mirror (aka copies) an exploit to the current working directory
-x, --examine [EDB-ID] Examine (aka opens) the exploit using $PAGER
## Non-Searching
-h, --help Show this help screen
:return: The output from the searchsploit execution.
"""
_scan_semaphore.acquire()
try:
if which("searchsploit") is None:
raise FileNotFoundError(
"searchsploit not found in PATH."
)
cmd = ["searchsploit"]
if args:
cmd.extend(shlex.split(args))
cmd.extend(shlex.split(query))
completed = subprocess.run(
cmd,
text=True,
capture_output=True,
timeout=DEFAULT_TIMEOUT
)
if completed.returncode != 0:
raise SearchsploitError(
f"searchsploit exited with status {completed.returncode}: {completed.stderr}"
)
return completed.stdout
finally:
_scan_semaphore.release()
@mcp.tool()
def fetch_whois_data(domain: str) -> str:
"""
Fetch WHOIS data for the given domain.
:param domain: The domain to fetch WHOIS data for.
:return: The WHOIS data as a string.
"""
try:
result = subprocess.run(['whois', domain], capture_output=True, text=True)
if result.returncode != 0:
return f"Error fetching data for {domain}: {result.stderr.strip()}"
return result.stdout
except Exception as e:
return f"Error fetching data for {domain}: {e}"
@mcp.tool()
def launch_nuclei_scan(target: str, args: str = "") -> dict:
"""
Launch a Nuclei scan against the given target with optional arguments.
:param target: The target URL or IP address to scan.
:param args: Additional arguments for Nuclei.
TARGET:
-u, -target string[] target URLs/hosts to scan
-l, -list string path to file containing a list of target URLs/hosts to scan (one per line)
-eh, -exclude-hosts string[] hosts to exclude to scan from the input list (ip, cidr, hostname)
-resume string resume scan using resume.cfg (clustering will be disabled)
-sa, -scan-all-ips scan all the IP's associated with dns record
-iv, -ip-version string[] IP version to scan of hostname (4,6) - (default 4)
TARGET-FORMAT:
-im, -input-mode string mode of input file (list, burp, jsonl, yaml, openapi, swagger) (default "list")
-ro, -required-only use only required fields in input format when generating requests
-sfv, -skip-format-validation skip format validation (like missing vars) when parsing input file
TEMPLATES:
-nt, -new-templates run only new templates added in latest nuclei-templates release
-ntv, -new-templates-version string[] run new templates added in specific version
-as, -automatic-scan automatic web scan using wappalyzer technology detection to tags mapping
-t, -templates string[] list of template or template directory to run (comma-separated, file)
-turl, -template-url string[] template url or list containing template urls to run (comma-separated, file)
-ai, -prompt string generate and run template using ai prompt
-w, -workflows string[] list of workflow or workflow directory to run (comma-separated, file)
-wurl, -workflow-url string[] workflow url or list containing workflow urls to run (comma-separated, file)
-validate validate the passed templates to nuclei
-nss, -no-strict-syntax disable strict syntax check on templates
-td, -template-display displays the templates content
-tl list all available templates
-tgl list all available tags
-sign signs the templates with the private key defined in NUCLEI_SIGNATURE_PRIVATE_KEY env variable
-code enable loading code protocol-based templates
-dut, -disable-unsigned-templates disable running unsigned templates or templates with mismatched signature
-esc, -enable-self-contained enable loading self-contained templates
-egm, -enable-global-matchers enable loading global matchers templates
-file enable loading file templates
FILTERING:
-a, -author string[] templates to run based on authors (comma-separated, file)
-tags string[] templates to run based on tags (comma-separated, file)
-etags, -exclude-tags string[] templates to exclude based on tags (comma-separated, file)
-itags, -include-tags string[] tags to be executed even if they are excluded either by default or configuration
-id, -template-id string[] templates to run based on template ids (comma-separated, file, allow-wildcard)
-eid, -exclude-id string[] templates to exclude based on template ids (comma-separated, file)
-it, -include-templates string[] path to template file or directory to be executed even if they are excluded either by default or configuration
-et, -exclude-templates string[] path to template file or directory to exclude (comma-separated, file)
-em, -exclude-matchers string[] template matchers to exclude in result
-s, -severity value[] templates to run based on severity. Possible values: info, low, medium, high, critical, unknown
-es, -exclude-severity value[] templates to exclude based on severity. Possible values: info, low, medium, high, critical, unknown
-pt, -type value[] templates to run based on protocol type. Possible values: dns, file, http, headless, tcp, workflow, ssl, websocket, whois, code, javascript
-ept, -exclude-type value[] templates to exclude based on protocol type. Possible values: dns, file, http, headless, tcp, workflow, ssl, websocket, whois, code, javascript
-tc, -template-condition string[] templates to run based on expression condition
OUTPUT:
-o, -output string output file to write found issues/vulnerabilities
-sresp, -store-resp store all request/response passed through nuclei to output directory
-srd, -store-resp-dir string store all request/response passed through nuclei to custom directory (default "output")
-silent display findings only
-nc, -no-color disable output content coloring (ANSI escape codes)
-j, -jsonl write output in JSONL(ines) format
-irr, -include-rr -omit-raw include request/response pairs in the JSON, JSONL, and Markdown outputs (for findings only) [DEPRECATED use -omit-raw] (default true)
-or, -omit-raw omit request/response pairs in the JSON, JSONL, and Markdown outputs (for findings only)
-ot, -omit-template omit encoded template in the JSON, JSONL output
-nm, -no-meta disable printing result metadata in cli output
-ts, -timestamp enables printing timestamp in cli output
-rdb, -report-db string nuclei reporting database (always use this to persist report data)
-ms, -matcher-status display match failure status
-me, -markdown-export string directory to export results in markdown format
-se, -sarif-export string file to export results in SARIF format
-je, -json-export string file to export results in JSON format
-jle, -jsonl-export string file to export results in JSONL(ine) format
-rd, -redact string[] redact given list of keys from query parameter, request header and body
CONFIGURATIONS:
-config string path to the nuclei configuration file
-tp, -profile string template profile config file to run
-tpl, -profile-list list community template profiles
-fr, -follow-redirects enable following redirects for http templates
-fhr, -follow-host-redirects follow redirects on the same host
-mr, -max-redirects int max number of redirects to follow for http templates (default 10)
-dr, -disable-redirects disable redirects for http templates
-rc, -report-config string nuclei reporting module configuration file
-H, -header string[] custom header/cookie to include in all http request in header:value format (cli, file)
-V, -var value custom vars in key=value format
-r, -resolvers string file containing resolver list for nuclei
-sr, -system-resolvers use system DNS resolving as error fallback
-dc, -disable-clustering disable clustering of requests
-passive enable passive HTTP response processing mode
-fh2, -force-http2 force http2 connection on requests
-ev, -env-vars enable environment variables to be used in template
-cc, -client-cert string client certificate file (PEM-encoded) used for authenticating against scanned hosts
-ck, -client-key string client key file (PEM-encoded) used for authenticating against scanned hosts
-ca, -client-ca string client certificate authority file (PEM-encoded) used for authenticating against scanned hosts
-sml, -show-match-line show match lines for file templates, works with extractors only
-ztls use ztls library with autofallback to standard one for tls13 [Deprecated] autofallback to ztls is enabled by default
-sni string tls sni hostname to use (default: input domain name)
-dka, -dialer-keep-alive value keep-alive duration for network requests.
-lfa, -allow-local-file-access allows file (payload) access anywhere on the system
-lna, -restrict-local-network-access blocks connections to the local / private network
-i, -interface string network interface to use for network scan
-at, -attack-type string type of payload combinations to perform (batteringram,pitchfork,clusterbomb)
-sip, -source-ip string source ip address to use for network scan
-rsr, -response-size-read int max response size to read in bytes
-rss, -response-size-save int max response size to read in bytes (default 1048576)
-reset reset removes all nuclei configuration and data files (including nuclei-templates)
-tlsi, -tls-impersonate enable experimental client hello (ja3) tls randomization
-hae, -http-api-endpoint string experimental http api endpoint
INTERACTSH:
-iserver, -interactsh-server string interactsh server url for self-hosted instance (default: oast.pro,oast.live,oast.site,oast.online,oast.fun,oast.me)
-itoken, -interactsh-token string authentication token for self-hosted interactsh server
-interactions-cache-size int number of requests to keep in the interactions cache (default 5000)
-interactions-eviction int number of seconds to wait before evicting requests from cache (default 60)
-interactions-poll-duration int number of seconds to wait before each interaction poll request (default 5)
-interactions-cooldown-period int extra time for interaction polling before exiting (default 5)
-ni, -no-interactsh disable interactsh server for OAST testing, exclude OAST based templates
FUZZING:
-ft, -fuzzing-type string overrides fuzzing type set in template (replace, prefix, postfix, infix)
-fm, -fuzzing-mode string overrides fuzzing mode set in template (multiple, single)
-fuzz enable loading fuzzing templates (Deprecated: use -dast instead)
-dast enable / run dast (fuzz) nuclei templates
-dts, -dast-server enable dast server mode (live fuzzing)
-dtr, -dast-report write dast scan report to file
-dtst, -dast-server-token string dast server token (optional)
-dtsa, -dast-server-address string dast server address (default "localhost:9055")
-dfp, -display-fuzz-points display fuzz points in the output for debugging
-fuzz-param-frequency int frequency of uninteresting parameters for fuzzing before skipping (default 10)
-fa, -fuzz-aggression string fuzzing aggression level controls payload count for fuzz (low, medium, high) (default "low")
-cs, -fuzz-scope string[] in scope url regex to be followed by fuzzer
-cos, -fuzz-out-scope string[] out of scope url regex to be excluded by fuzzer
UNCOVER:
-uc, -uncover enable uncover engine
-uq, -uncover-query string[] uncover search query
-ue, -uncover-engine string[] uncover search engine (shodan,censys,fofa,shodan-idb,quake,hunter,zoomeye,netlas,criminalip,publicwww,hunterhow,google,odin,binaryedge) (default shodan)
-uf, -uncover-field string uncover fields to return (ip,port,host) (default "ip:port")
-ul, -uncover-limit int uncover results to return (default 100)
-ur, -uncover-ratelimit int override ratelimit of engines with unknown ratelimit (default 60 req/min) (default 60)
RATE-LIMIT:
-rl, -rate-limit int maximum number of requests to send per second (default 150)
-rld, -rate-limit-duration value maximum number of requests to send per second (default 1s)
-rlm, -rate-limit-minute int maximum number of requests to send per minute (DEPRECATED)
-bs, -bulk-size int maximum number of hosts to be analyzed in parallel per template (default 25)
-c, -concurrency int maximum number of templates to be executed in parallel (default 25)
-hbs, -headless-bulk-size int maximum number of headless hosts to be analyzed in parallel per template (default 10)
-headc, -headless-concurrency int maximum number of headless templates to be executed in parallel (default 10)
-jsc, -js-concurrency int maximum number of javascript runtimes to be executed in parallel (default 120)
-pc, -payload-concurrency int max payload concurrency for each template (default 25)
-prc, -probe-concurrency int http probe concurrency with httpx (default 50)
OPTIMIZATIONS:
-timeout int time to wait in seconds before timeout (default 10)
-retries int number of times to retry a failed request (default 1)
-ldp, -leave-default-ports leave default HTTP/HTTPS ports (eg. host:80,host:443)
-mhe, -max-host-error int max errors for a host before skipping from scan (default 30)
-te, -track-error string[] adds given error to max-host-error watchlist (standard, file)
-nmhe, -no-mhe disable skipping host from scan based on errors
-project use a project folder to avoid sending same request multiple times
-spm, -stop-at-first-match stop processing HTTP requests after the first match (may break template/workflow logic)
-stream stream mode - start elaborating without sorting the input
-ss, -scan-strategy value strategy to use while scanning(auto/host-spray/template-spray) (default auto)
-irt, -input-read-timeout value timeout on input read (default 3m0s)
-nh, -no-httpx disable httpx probing for non-url input
-no-stdin disable stdin processing
HEADLESS:
-headless enable templates that require headless browser support (root user on Linux will disable sandbox)
-page-timeout int seconds to wait for each page in headless mode (default 20)
-sb, -show-browser show the browser on the screen when running templates with headless mode
-ho, -headless-options string[] start headless chrome with additional options
-sc, -system-chrome use local installed Chrome browser instead of nuclei installed
-lha, -list-headless-action list available headless actions
DEBUG:
-debug show all requests and responses
-dreq, -debug-req show all sent requests
-dresp, -debug-resp show all received responses
-p, -proxy string[] list of http/socks5 proxy to use (comma separated or file input)
-pi, -proxy-internal proxy all internal requests
-ldf, -list-dsl-function list all supported DSL function signatures
-tlog, -trace-log string file to write sent requests trace log
-elog, -error-log string file to write sent requests error log
-version show nuclei version
-hm, -hang-monitor enable nuclei hang monitoring
-v, -verbose show verbose output
-profile-mem string generate memory (heap) profile & trace files
-vv display templates loaded for scan
-svd, -show-var-dump show variables dump for debugging
-vdl, -var-dump-limit int limit the number of characters displayed in var dump (default 255)
-ep, -enable-pprof enable pprof debugging server
-tv, -templates-version shows the version of the installed nuclei-templates
-hc, -health-check run diagnostic check up
UPDATE:
-up, -update update nuclei engine to the latest released version
-ut, -update-templates update nuclei-templates to latest released version
-ud, -update-template-dir string custom directory to install / update nuclei-templates
-duc, -disable-update-check disable automatic nuclei/templates update check
STATISTICS:
-stats display statistics about the running scan
-sj, -stats-json display statistics in JSONL(ines) format
-si, -stats-interval int number of seconds to wait between showing a statistics update (default 5)
-mp, -metrics-port int port to expose nuclei metrics on (default 9092)
-hps, -http-stats enable http status capturing (experimental)
CLOUD:
-auth configure projectdiscovery cloud (pdcp) api key (default true)
-tid, -team-id string upload scan results to given team id (optional) (default "none")
-cup, -cloud-upload upload scan results to pdcp dashboard [DEPRECATED use -dashboard]
-sid, -scan-id string upload scan results to existing scan id (optional)
-sname, -scan-name string scan name to set (optional)
-pd, -dashboard upload / view nuclei results in projectdiscovery cloud (pdcp) UI dashboard
-pdu, -dashboard-upload string upload / view nuclei results file (jsonl) in projectdiscovery cloud (pdcp) UI dashboard
AUTHENTICATION:
-sf, -secret-file string[] path to config file containing secrets for nuclei authenticated scan
-ps, -prefetch-secrets prefetch secrets from the secrets file
:return: A dictionary containing the output path and process ID.
"""
output_path, pid = run_nuclei(target, args)
return {"output_path": output_path, "pid": pid}
@mcp.tool()
def fetch_nuclei_results(output_path: str, pid: int) -> Union[str, List[Any]]:
"""
Non-blocking fetch for Nuclei scan results:
- Returns immediately if process is still running.
- Otherwise attempts to read and parse the JSON output.
"""
try:
if psutil.pid_exists(pid):
return f"Scan is still running (PID {pid})"
return get_nuclei_output(output_path, pid)
except FileNotFoundError:
return "Results not yet available."
except Exception as e:
return f"Error fetching Nuclei results: {e}"
@mcp.tool()
def launch_nmap_scan(target: str, args: str = "") -> dict:
"""
Launch a Nmap scan against the given target with optional arguments.
:param target: The target URL or IP address to scan.
:param args: Additional arguments for Nmap.
TARGET SPECIFICATION:
Can pass hostnames, IP addresses, networks, etc.
Ex: scanme.nmap.org, microsoft.com/24, 192.168.0.1; 10.0.0-255.1-254
-iL <inputfilename>: Input from list of hosts/networks
-iR <num hosts>: Choose random targets
--exclude <host1[,host2][,host3],...>: Exclude hosts/networks
--excludefile <exclude_file>: Exclude list from file
HOST DISCOVERY:
-sL: List Scan - simply list targets to scan
-sn: Ping Scan - disable port scan
-Pn: Treat all hosts as online -- skip host discovery
-PS/PA/PU/PY[portlist]: TCP SYN/ACK, UDP or SCTP discovery to given ports
-PE/PP/PM: ICMP echo, timestamp, and netmask request discovery probes
-PO[protocol list]: IP Protocol Ping
-n/-R: Never do DNS resolution/Always resolve [default: sometimes]
--dns-servers <serv1[,serv2],...>: Specify custom DNS servers
--system-dns: Use OS's DNS resolver
--traceroute: Trace hop path to each host
SCAN TECHNIQUES:
-sS/sT/sA/sW/sM: TCP SYN/Connect()/ACK/Window/Maimon scans
-sU: UDP Scan
-sN/sF/sX: TCP Null, FIN, and Xmas scans
--scanflags <flags>: Customize TCP scan flags
-sI <zombie host[:probeport]>: Idle scan
-sY/sZ: SCTP INIT/COOKIE-ECHO scans
-sO: IP protocol scan
-b <FTP relay host>: FTP bounce scan
PORT SPECIFICATION AND SCAN ORDER:
-p <port ranges>: Only scan specified ports
Ex: -p22; -p1-65535; -p U:53,111,137,T:21-25,80,139,8080,S:9
--exclude-ports <port ranges>: Exclude the specified ports from scanning
-F: Fast mode - Scan fewer ports than the default scan
-r: Scan ports sequentially - don't randomize
--top-ports <number>: Scan <number> most common ports
--port-ratio <ratio>: Scan ports more common than <ratio>
SERVICE/VERSION DETECTION:
-sV: Probe open ports to determine service/version info
--version-intensity <level>: Set from 0 (light) to 9 (try all probes)
--version-light: Limit to most likely probes (intensity 2)
--version-all: Try every single probe (intensity 9)
--version-trace: Show detailed version scan activity (for debugging)
SCRIPT SCAN:
-sC: equivalent to --script=default
--script=<Lua scripts>: <Lua scripts> is a comma separated list of
directories, script-files or script-categories
--script-args=<n1=v1,[n2=v2,...]>: provide arguments to scripts
--script-args-file=filename: provide NSE script args in a file
--script-trace: Show all data sent and received
--script-updatedb: Update the script database.
--script-help=<Lua scripts>: Show help about scripts.
<Lua scripts> is a comma-separated list of script-files or
script-categories.
OS DETECTION:
-O: Enable OS detection
--osscan-limit: Limit OS detection to promising targets
--osscan-guess: Guess OS more aggressively
TIMING AND PERFORMANCE:
Options which take <time> are in seconds, or append 'ms' (milliseconds),
's' (seconds), 'm' (minutes), or 'h' (hours) to the value (e.g. 30m).
-T<0-5>: Set timing template (higher is faster)
--min-hostgroup/max-hostgroup <size>: Parallel host scan group sizes
--min-parallelism/max-parallelism <numprobes>: Probe parallelization
--min-rtt-timeout/max-rtt-timeout/initial-rtt-timeout <time>: Specifies
probe round trip time.
--max-retries <tries>: Caps number of port scan probe retransmissions.
--host-timeout <time>: Give up on target after this long
--scan-delay/--max-scan-delay <time>: Adjust delay between probes
--min-rate <number>: Send packets no slower than <number> per second
--max-rate <number>: Send packets no faster than <number> per second
FIREWALL/IDS EVASION AND SPOOFING:
-f; --mtu <val>: fragment packets (optionally w/given MTU)
-D <decoy1,decoy2[,ME],...>: Cloak a scan with decoys
-S <IP_Address>: Spoof source address
-e <iface>: Use specified interface
-g/--source-port <portnum>: Use given port number
--proxies <url1,[url2],...>: Relay connections through HTTP/SOCKS4 proxies
--data <hex string>: Append a custom payload to sent packets
--data-string <string>: Append a custom ASCII string to sent packets
--data-length <num>: Append random data to sent packets
--ip-options <options>: Send packets with specified ip options
--ttl <val>: Set IP time-to-live field
--spoof-mac <mac address/prefix/vendor name>: Spoof your MAC address
--badsum: Send packets with a bogus TCP/UDP/SCTP checksum
:return: A dictionary containing the output path and process ID.
"""
output_path, pid = run_nmap(target, args)
return {"output_path": output_path, "pid": pid}
@mcp.tool()
def fetch_nmap_results(output_path: str, pid: int) -> Union[str, List[str]]:
"""
Non-blocking fetch for Nmap scan results:
- Returns immediately if process is still running.
- Otherwise attempts to read and parse the Nmap output.
"""
try:
if psutil.pid_exists(pid):
return f"Scan is still running (PID {pid})"
return get_nmap_output(output_path, pid)
except FileNotFoundError:
return "Results not yet available."
except Exception as e:
return f"Error fetching Nmap results: {e}"
Activescanner = ActiveScan()
@mcp.tool()
def run_zap_active_scan(url: str, recurse: bool = False) -> str:
"""
Run a ZAP Active Scan on the given URL.
:param url: The target URL to scan.
:param recurse: Whether to recurse into the site.
:return: The scan ID of the started scan.
"""
scan_id = Activescanner.start_scan(url, recurse)
return f"Scan Started with ID: {scan_id}"
@mcp.tool()
def fetch_zap_scan(scan_id: str) -> str:
status = Activescanner.checkStatus(scan_id)
return f"Scan Status: {status}%"
@mcp.tool()
def active_scan_results(url: str, scan_id: str) -> str:
status = Activescanner.checkStatus(scan_id)
if status == 100:
results = Activescanner.scanResults(ZAP_BASE_URL, url, status)
if results:
filepath = Activescanner.save_results(results, "ZAP_Active_Scan")
return f"Scan Results saved to: {filepath}"
else:
return "No results found."
else:
return "Scan is still in progress."
Ajaxscanner = ajaxSpider()
@mcp.tool()
def run_ajax_spider(target_url: str):
spider = ajaxSpider()
start_response = spider.startScan(ZAP_BASE_URL, target_url)
return start_response
@mcp.tool()
def check_ajax_spider_status():
spider = ajaxSpider()
return spider.checkStatus(ZAP_BASE_URL)
@mcp.tool()
def get_ajax_spider_results():
spider = ajaxSpider()
results = spider.scanResults(ZAP_BASE_URL)
if results:
filepath = spider.save_results(results, "Ajax_Spider")
return results, f"Results saved to {filepath}"
else:
return "No results found or failed to fetch results."
@mcp.tool()
def run_gobuster_scan(scan_type: str, target_url: str, wordlist_path: str, args: str = "") -> tuple[str, int]:
"""
Run a Gobuster scan against the given target URL with the specified wordlist and arguments.
:param scan_type: The type of scan "dir" or "vhost".
:param target_url: The target URL to scan.
:param wordlist_path: The path to the wordlist file.
/opt/pentest-mcp/seclists/Discovery This Directory is used for wordlists and is popular seclist directory
:param args: Additional arguments for Gobuster.
:return: A tuple containing the output file path and the process ID.
"""
ts = datetime.now().strftime("%Y%m%d%H%M%S")
uid = uuid.uuid4().hex
job_id = f"{ts}_{uid}"
output_dir = os.path.join(os.getcwd(), "Gobuster_output")
os.makedirs(output_dir, exist_ok=True)
output_file = os.path.join(output_dir, f"{job_id}.txt")
command = [
"gobuster", scan_type,
"-u", target_url,
"-w", wordlist_path,
"-o", output_file,
] + args.split()
try:
proc = subprocess.Popen(
command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
close_fds=True,
)
return output_file, proc.pid
except Exception as e:
raise RuntimeError(f"Error starting Gobuster scan: {e}")
@mcp.tool()
def check_gobuster_status(output_file: str, pid: int) -> str:
if not os.path.exists(output_file):
return "Output file not yet created. Scan is likely starting..."
try:
reaped_pid, _ = os.waitpid(pid, os.WNOHANG)
except ChildProcessError:
reaped_pid = pid
if reaped_pid == 0:
return "Scan is still running..."
if psutil.pid_exists(pid):
try:
p = psutil.Process(pid)
if p.status() != psutil.STATUS_ZOMBIE:
return "Scan is still running..."
except psutil.NoSuchProcess:
pass
try:
with open(output_file, "r", encoding="utf-8", errors="ignore") as f:
return "Scan completed. Results:\n\n" + f.read()
except Exception as e:
return f"Scan ended, but failed to read output file: {e}"
@mcp.tool()
def run_dig_tool(domain: str, options: str) -> str:
"""
Run a dig command against the given domain with the provided options.
:param domain: The target domain to query.
:param options: Additional options for dig.
:return: The output from the dig command.
"""
_scan_semaphore.acquire()
try:
cmd = ["dig"] + shlex.split(options) + [domain]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
timeout=DEFAULT_TIMEOUT
)
return result.stdout
finally:
_scan_semaphore.release()
@mcp.tool()
def run_sqlmap_tool(target: str, options: str) -> dict:
"""
Run a SQLMap scan against the given target with the provided options.
:param target: The target URL to scan.
:param options: Additional options for SQLMap.
-h, --help Show basic help message and exit
-hh Show advanced help message and exit
--version Show program's version number and exit
-v VERBOSE Verbosity level: 0-6 (default 1)
Target:
At least one of these options has to be provided to define the
target(s)
-u URL, --url=URL Target URL (e.g. "http://www.site.com/vuln.php?id=1")
-g GOOGLEDORK Process Google dork results as target URLs
Request:
These options can be used to specify how to connect to the target URL
--data=DATA Data string to be sent through POST (e.g. "id=1")
--cookie=COOKIE HTTP Cookie header value (e.g. "PHPSESSID=a8d127e..")
--random-agent Use randomly selected HTTP User-Agent header value
--proxy=PROXY Use a proxy to connect to the target URL
--tor Use Tor anonymity network
--check-tor Check to see if Tor is used properly
Injection:
These options can be used to specify which parameters to test for,
provide custom injection payloads and optional tampering scripts
-p TESTPARAMETER Testable parameter(s)
--dbms=DBMS Force back-end DBMS to provided value
Detection:
These options can be used to customize the detection phase
--level=LEVEL Level of tests to perform (1-5, default 1)
--risk=RISK Risk of tests to perform (1-3, default 1)
Techniques:
These options can be used to tweak testing of specific SQL injection
techniques
--technique=TECH.. SQL injection techniques to use (default "BEUSTQ")
Enumeration:
These options can be used to enumerate the back-end database
management system information, structure and data contained in the
tables
-a, --all Retrieve everything
-b, --banner Retrieve DBMS banner
--current-user Retrieve DBMS current user
--current-db Retrieve DBMS current database
--passwords Enumerate DBMS users password hashes
--dbs Enumerate DBMS databases
--tables Enumerate DBMS database tables
--columns Enumerate DBMS database table columns
--schema Enumerate DBMS schema
--dump Dump DBMS database table entries
--dump-all Dump all DBMS databases tables entries
-D DB DBMS database to enumerate
-T TBL DBMS database table(s) to enumerate
-C COL DBMS database table column(s) to enumerate
Operating system access:
These options can be used to access the back-end database management
system underlying operating system
--os-shell Prompt for an interactive operating system shell
--os-pwn Prompt for an OOB shell, Meterpreter or VNC
General:
These options can be used to set some general working parameters
--batch Never ask for user input, use the default behavior
--flush-session Flush session files for current target
:return: A dictionary containing the process ID and output directory.
"""
base_dir = os.path.abspath("sqlmap_output")
os.makedirs(base_dir, exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
unique_id = uuid.uuid4().hex
workdir = os.path.join(base_dir, f"{timestamp}_{unique_id}")
os.makedirs(workdir)
cmd = ["sqlmap", "-u", target, "--batch", "--output-dir", workdir] + shlex.split(options)
proc = subprocess.Popen(cmd)
return {"pid": proc.pid, "output_dir": workdir}
@mcp.tool()
def check_sqlmap_status(pid: int, output_dir: str) -> str:
"""
Non-blocking status check for SQLMap scan:
- Returns immediately if process is still running.
- Otherwise summarizes results from the output directory.
"""
try:
if psutil.pid_exists(pid):
return f"Scan is still running (PID {pid})"
except Exception:
pass
if not os.path.isdir(output_dir):
return "Output directory not found; scan may not have started."
summary = [f"Scan completed. Results in: {output_dir}"]
base = output_dir
tgt_files = glob.glob(os.path.join(base, '**', 'target.txt'), recursive=True)
if tgt_files:
for tf in tgt_files:
summary.append(f"-- target.txt: {os.path.relpath(tf, base)} --")
try:
content = open(tf, 'r', encoding='utf-8', errors='ignore').read().strip()
summary.append(content or '(empty)')
except Exception as e:
summary.append(f"Failed to read {tf}: {e}")
else:
summary.append("No target.txt found.")
log_files = glob.glob(os.path.join(base, '**', 'log'), recursive=True) + glob.glob(os.path.join(base, '**', '*.log'), recursive=True)
if log_files:
for lf in log_files:
summary.append(f"-- Log: {os.path.relpath(lf, base)} --")
try:
lines = open(lf, 'r', encoding='utf-8', errors='ignore').read().splitlines()
for line in lines[-5:]:
summary.append(line)
except Exception as e:
summary.append(f"Failed to read {lf}: {e}")
else:
summary.append("No log file found.")
return "".join(summary)
@mcp.tool()
def run_curl(target: str, opts: str) -> subprocess.CompletedProcess:
"""
Run a curl command against the given target with the provided options.
:param target: URL or hostname (e.g. "https://example.com")
:param opts: space-separated curl options (e.g. "-I -L")
:return: CompletedProcess from subprocess.run()
"""
cmd = ["curl"] + shlex.split(opts) + [target]
result = subprocess.run(cmd, capture_output=True, text=True)
return result
@mcp.tool()
def run_harvester(domain: str, sources: str = "all", extra_args: str = "") -> tuple[int, str]:
"""
Fire off theHarvester asynchronously.
Parameters
----------
domain : target domain (passed to -d)
sources : comma-separated list of search engines (-b); default 'all'
extra_args : raw string of any other theHarvester CLI flags (e.g. '-l 100 -s')
Returns
-------
(pid, json_path)
pid : PID of the child theHarvester process
json_path : absolute path where <uuid>/<domain>.json will be written
"""
def safe_name(domain: str) -> str:
"""Convert e.g. 'ramkansal.com' → 'ramkansal-com' (only a-z, 0-9, _ or -)."""
return re.sub(r"[^A-Za-z0-9_-]", "-", domain.strip().lower())
base_output_dir = Path.home() / "theHarvester_output"
out_dir = base_output_dir / str(uuid.uuid4())
out_dir.mkdir(parents=True, exist_ok=True)
base_name = safe_name(domain)
output_prefix = out_dir / base_name # *no* extension
cmd = [
"theHarvester",
"-d",
domain,
"-b",
sources,
"-f",
str(output_prefix),
]
if extra_args:
cmd.extend(shlex.split(extra_args))
proc = subprocess.Popen(cmd)
json_path = f"{output_prefix}.json"
return proc.pid, json_path
@mcp.tool()
def check_harvester_output(pid: int, json_path: str):
"""
One-shot check: has theHarvester produced its JSON results yet?
• If the file exists → returns the parsed dict/list from JSON.
• If not → returns the literal string 'scan still running'.
This function does *not* poll or wait; call it as often as you need.
"""
json_path_expanded = os.path.expanduser(json_path)
if os.path.exists(json_path_expanded):
with open(json_path_expanded, "r", encoding="utf-8") as fh:
return json.load(fh)
return "scan still running"
@mcp.tool()
def run_curl_tool(target: str, opts: str) -> dict:
"""
Run a curl command against the given target with the provided options.
This tool is useful for making HTTP requests, testing endpoints, checking headers,
and debugging web services without requiring a full browser.
:param target: URL or hostname (e.g. "https://example.com")
:param opts: space-separated curl options (e.g. "-I -L")
:return: A dictionary containing status, stdout, and stderr
Common curl options:
-I, --head Fetch headers only (HEAD request)
-L, --location Follow redirects
-X, --request METHOD Specify HTTP method (GET, POST, PUT, DELETE, etc.)
-d, --data DATA Send POST data
-H, --header HEADER Add custom header
-A, --user-agent AGENT Set User-Agent
-b, --cookie COOKIE Send cookies
-u, --user USER:PASS HTTP authentication
-v, --verbose Verbose output
-s, --silent Silent mode
-o, --output FILE Write output to file
-O, --remote-name Write to remote filename
--proxy PROXY Use proxy
--insecure Allow insecure SSL
-m, --max-time SECONDS Timeout in seconds
"""
_scan_semaphore.acquire()
try:
result = run_curl(target, opts)
return result
except CurlError as e:
return {
"status": "error",
"return_code": -1,
"error": str(e),
"stdout": "",
"stderr": str(e)
}
finally:
_scan_semaphore.release()
@mcp.tool()
def launch_arjun_scan(url: str, args: str = "") -> dict:
"""
Launch Arjun (HTTP parameter discovery tool) against the given target.
Arjun is a tool to find hidden HTTP parameters in web applications.
It helps in discovering undisclosed parameters that may be exploitable.
:param url: The target URL to scan (e.g. "https://example.com/page")
:param args: Additional arguments for Arjun
SCANNING:
-t, --threads INT Number of threads to use (default 5)
-d, --delay FLOAT Delay between requests in seconds
--timeout INT Request timeout in seconds
--proxy PROXY HTTP proxy to use (e.g. http://127.0.0.1:8080)
--headers FILE File containing HTTP headers
-c, --cookie COOKIE Cookie string to use
-H, --header HEADER Custom header (e.g. "Authorization: Bearer token")
OUTPUT:
-o, --output FILE Save output to file
-j, --json Output in JSON format
--verbose Verbose output
-q, --quiet Quiet mode
BEHAVIOR:
--get Use GET requests only
--post Use POST requests
--both Try both GET and POST
--stable Assume parameters are stable (skip verification)
-x, --method METHOD HTTP method to use
--no-color Disable colored output
:return: Dictionary containing output_path and process_id
"""
_scan_semaphore.acquire()
try:
output_path, pid = run_arjun(url, args)
return {
"status": "started",
"output_path": output_path,
"pid": pid,
"message": f"Arjun scan started with PID {pid}"
}
except ArjunError as e:
return {
"status": "error",
"error": str(e),
"message": f"Failed to start Arjun scan: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def fetch_arjun_results(output_path: str, pid: int) -> dict:
"""
Fetch results from a previously launched Arjun scan.
:param output_path: Path to the output file from the scan
:param pid: Process ID of the Arjun scan
:return: Dictionary containing scan results and status
"""
try:
results = get_arjun_output(output_path, pid)
return results
except Exception as e:
return {
"status": "error",
"message": f"Error fetching Arjun results: {str(e)}",
"pid": pid
}
@mcp.tool()
def ad_user_enum(target: str, domain: str, username: Optional[str] = None, password: Optional[str] = None) -> dict:
"""
Enumerate domain users using NetExec
:param target: Target domain controller or host
:param domain: Domain name
:param username: Optional username for authentication
:param password: Optional password for authentication
:return: Dictionary with user enumeration results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(user_enum(target, domain, username, password))
return result
except Exception as e:
return {
"success": False,
"error": f"User enumeration failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_smb_signing_check(targets: str, output_file: str = "/tmp/relay_targets.txt") -> dict:
"""
Check SMB signing status to identify relay targets
:param targets: Target hosts/network range to check
:param output_file: Output file for relay targets
:return: Dictionary with signing status and relay targets
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(smb_signing_check(targets, output_file))
return result
except Exception as e:
return {
"success": False,
"error": f"SMB signing check failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_shares_enum(target: str, username: str, password: str, domain: Optional[str] = None) -> dict:
"""
Enumerate SMB shares and permissions
:param target: Target host
:param username: Username for authentication
:param password: Password for authentication
:param domain: Optional domain name
:return: Dictionary with share enumeration results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(shares_enum(target, username, password, domain))
return result
except Exception as e:
return {
"success": False,
"error": f"Share enumeration failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_password_spray(target: str, domain: str, userfile: str, password: str, delay: int = 1) -> dict:
"""
Password spraying attack against domain accounts
:param target: Target domain controller
:param domain: Domain name
:param userfile: Path to file containing usernames
:param password: Password to spray
:param delay: Delay between attempts in seconds
:return: Dictionary with spray results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(password_spray(target, domain, userfile, password, delay))
return result
except Exception as e:
return {
"success": False,
"error": f"Password spray failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_asreproast(target: str, domain: str, userfile: Optional[str] = None, format: str = "hashcat") -> dict:
"""
AS-REP Roasting attack to harvest Kerberos hashes
:param target: Target domain controller
:param domain: Domain name
:param userfile: Optional file with usernames to target
:param format: Hash format (hashcat, john)
:return: Dictionary with AS-REP roasting results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(asreproast(target, domain, userfile, format))
return result
except Exception as e:
return {
"success": False,
"error": f"AS-REP roasting failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_kerberoast(target: str, domain: str, username: str, password: str, format: str = "hashcat") -> dict:
"""
Kerberoasting attack to extract service account hashes
:param target: Target domain controller
:param domain: Domain name
:param username: Username for authentication
:param password: Password for authentication
:param format: Hash format (hashcat, john)
:return: Dictionary with Kerberoasting results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(kerberoast(target, domain, username, password, format))
return result
except Exception as e:
return {
"success": False,
"error": f"Kerberoasting failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_relay_setup(targets: str, smb2support: bool = True, escalate_user: Optional[str] = None, dump_sam: bool = True, dump_lsass: bool = True, interface: str = "eth0") -> dict:
"""
Setup NTLM relay attack infrastructure
:param targets: Target hosts for relay attacks
:param smb2support: Enable SMB2 support
:param escalate_user: User to escalate privileges to
:param dump_sam: Dump SAM database
:param dump_lsass: Dump LSASS process
:param interface: Network interface to bind to
:return: Dictionary with relay setup status
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(relay_setup(targets, smb2support, escalate_user, dump_sam, dump_lsass, interface))
return result
except Exception as e:
return {
"success": False,
"error": f"Relay setup failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_coerce_petitpotam(target: str, listener: str, pipe: str = "lsarpc", username: Optional[str] = None, password: Optional[str] = None) -> dict:
"""
Coerce authentication using PetitPotam (MS-EFSRPC)
:param target: Target host to coerce
:param listener: Relay listener IP
:param pipe: Named pipe to use
:param username: Optional username for authentication
:param password: Optional password for authentication
:return: Dictionary with coercion results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(coerce_petitpotam(target, listener, pipe, username, password))
return result
except Exception as e:
return {
"success": False,
"error": f"PetitPotam coercion failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_coerce_printerbug(target: str, listener: str, username: str, password: str, domain: str) -> dict:
"""
Coerce authentication using PrinterBug (MS-RPRN)
:param target: Target host to coerce
:param listener: Relay listener IP
:param username: Username for authentication
:param password: Password for authentication
:param domain: Domain name
:return: Dictionary with coercion results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(coerce_printerbug(target, listener, username, password, domain))
return result
except Exception as e:
return {
"success": False,
"error": f"PrinterBug coercion failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_responder_poison(interface: str, analyze: bool = False, wpad: bool = True, force_wpad_auth: bool = False, disable_smb: bool = False, disable_http: bool = False) -> dict:
"""
Start Responder to poison LLMNR/NBT-NS/mDNS
:param interface: Network interface to bind to
:param analyze: Analysis mode only
:param wpad: Enable WPAD poisoning
:param force_wpad_auth: Force WPAD authentication
:param disable_smb: Disable SMB server
:param disable_http: Disable HTTP server
:return: Dictionary with Responder status
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(responder_poison(interface, analyze, wpad, force_wpad_auth, disable_smb, disable_http))
return result
except Exception as e:
return {
"success": False,
"error": f"Responder failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_bloodhound_collect(target: str, domain: str, username: str, password: str, collection_method: str = "All", dns_server: Optional[str] = None) -> dict:
"""
Collect BloodHound data for attack path analysis
:param target: Target domain controller
:param domain: Domain name
:param username: Username for authentication
:param password: Password for authentication
:param collection_method: Collection method (All, DCOnly, etc.)
:param dns_server: Optional DNS server
:return: Dictionary with BloodHound collection results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(bloodhound_collect(target, domain, username, password, collection_method, dns_server))
return result
except Exception as e:
return {
"success": False,
"error": f"BloodHound collection failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_secrets_dump(target: str, username: str, password: str, domain: Optional[str] = None, use_hash: bool = False, just_dc: bool = False) -> dict:
"""
Dump secrets (SAM/SYSTEM/NTDS) from target
:param target: Target host
:param username: Username for authentication
:param password: Password for authentication
:param domain: Optional domain name
:param use_hash: Use hash instead of password
:param just_dc: Only dump DC hashes
:return: Dictionary with secrets dump results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(secrets_dump(target, username, password, domain, use_hash, just_dc))
return result
except Exception as e:
return {
"success": False,
"error": f"Secrets dump failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_dcsync(target: str, domain: str, username: str, password: str, target_user: Optional[str] = None, use_hash: bool = False) -> dict:
"""
Perform DCSync attack
:param target: Target domain controller
:param domain: Domain name
:param username: Username for authentication
:param password: Password for authentication
:param target_user: Specific user to target
:param use_hash: Use hash instead of password
:return: Dictionary with DCSync results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(dcsync(target, domain, username, password, target_user, use_hash))
return result
except Exception as e:
return {
"success": False,
"error": f"DCSync failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_certipy_enum(target: str, domain: str, username: str, password: str, vulnerable: bool = False) -> dict:
"""
Enumerate ADCS for vulnerabilities
:param target: Target domain controller
:param domain: Domain name
:param username: Username for authentication
:param password: Password for authentication
:param vulnerable: Only show vulnerable configurations
:return: Dictionary with ADCS enumeration results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(certipy_enum(target, domain, username, password, vulnerable))
return result
except Exception as e:
return {
"success": False,
"error": f"Certipy enumeration failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_ldap_dump(target: str, domain: str, username: str, password: str) -> dict:
"""
Comprehensive LDAP dump
:param target: Target domain controller
:param domain: Domain name
:param username: Username for authentication
:param password: Password for authentication
:return: Dictionary with LDAP dump results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(ldap_dump(target, domain, username, password))
return result
except Exception as e:
return {
"success": False,
"error": f"LDAP dump failed: {str(e)}"
}
finally:
_scan_semaphore.release()
@mcp.tool()
def ad_check_credentials(target: str, domain: str, username: str, password: str) -> dict:
"""
Quick credential validation
:param target: Target host
:param domain: Domain name
:param username: Username to validate
:param password: Password to validate
:return: Dictionary with credential validation results
"""
_scan_semaphore.acquire()
try:
import asyncio
result = asyncio.run(check_credentials(target, domain, username, password))
return result
except Exception as e:
return {
"success": False,
"error": f"Credential check failed: {str(e)}"
}
finally:
_scan_semaphore.release()
if __name__ == "__main__":
mcp.run()