pentest-tools-mcp-server.py•30.8 kB
#!/usr/bin/env python3
from typing import Any, List, Optional, Dict
from mcp.server.fastmcp import FastMCP
import httpx
import subprocess
import json
import asyncio
import re
from pathlib import Path
from datetime import datetime
import yaml
import aiofiles
import aiohttp
from bs4 import BeautifulSoup
# import jwt
import base64
import logging
import threading
from concurrent.futures import ThreadPoolExecutor
import urllib.parse
import os
from dotenv import load_dotenv
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='pentest.log'
)
logger = logging.getLogger('pentest-tools')
# Initialize FastMCP server
mcp = FastMCP("pentest-tools")
# Configuration
CONFIG = {
"wordlists": {
"dirsearch": "/usr/share/wordlists/pentest-tools/dirsearch.txt",
"common": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/Web-Content/common.txt",
"api_endpoints": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/Web-Content/api-endpoints.txt",
"subdomains": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/DNS/subdomains-top1million-5000.txt",
"passwords": "/usr/share/wordlists/pentest-tools/SecLists/Passwords/Common-Credentials/10-million-password-list-top-1000.txt",
"jwt_secrets": "/usr/share/wordlists/pentest-tools/SecLists/Passwords/Common-Credentials/common-secrets.txt",
"xss": "/usr/share/wordlists/pentest-tools/xss-payloads.txt",
"sqli": "/usr/share/wordlists/pentest-tools/sqli-payloads.txt",
"lfi": "/usr/share/wordlists/pentest-tools/lfi-payloads.txt",
"ssrf": "/usr/share/wordlists/pentest-tools/ssrf-payloads.txt"
},
"tools": {
"xsstrike": "/root/tools/XSStrike/xsstrike.py",
"nuclei_templates": "/root/tools/nuclei-templates",
"crlfuzz": "/root/go/bin/crlfuzz",
"graphql_path": "/root/tools/graphql-tools"
},
"reporting": {
"output_dir": "reports",
"template_dir": "templates"
}
}
# Load environment variables
load_dotenv()
def format_ffuf_results(results):
"""Format the FFuf results in a readable format.
Args:
results: FFuf JSON results
Returns:
Formatted string with FFuf results
"""
formatted = []
# Add summary
if "stats" in results:
stats = results["stats"]
formatted.append(f"Total requests: {stats.get('total', 0)}")
formatted.append(f"Duration: {stats.get('elapsed', 0)} seconds")
formatted.append(f"Requests per second: {stats.get('req_sec', 0)}")
# Add results
if "results" in results:
formatted.append("\nDiscovered URLs:")
for item in results["results"]:
status = item.get("status", "")
url = item.get("url", "")
size = item.get("length", 0)
formatted.append(f"[Status: {status}] [{size} bytes] {url}")
return "\n".join(formatted)
@mcp.tool()
async def advanced_directory_scan(url: str, extensions: List[str] = None) -> str:
"""Advanced directory and file scanning with multiple tools and techniques.
Args:
url: Target URL
extensions: List of file extensions to scan for
"""
if extensions is None:
extensions = ["php", "asp", "aspx", "jsp", "js", "txt", "conf", "bak", "backup", "swp", "old", "db", "sql"]
results = []
# 1. FFuf with advanced options
try:
ffuf_cmd = [
"ffuf",
"-u", f"{url}/FUZZ",
"-w", CONFIG["wordlists"]["dirsearch"],
"-e", ",".join(extensions),
"-recursion",
"-recursion-depth", "3",
"-mc", "all",
"-ac",
"-o", "ffuf.json",
"-of", "json"
]
subprocess.run(ffuf_cmd, check=True)
with open("ffuf.json") as f:
ffuf_results = json.load(f)
results.append("=== FFuf Results ===\n" + format_ffuf_results(ffuf_results))
except Exception as e:
results.append(f"FFuf error: {str(e)}")
# 2. Dirsearch with advanced options
try:
dirsearch_cmd = [
"dirsearch",
"-u", url,
"-e", ",".join(extensions),
"--deep-recursive",
"--force-recursive",
"--exclude-status", "404",
"-o", "dirsearch.json",
"--format", "json"
]
subprocess.run(dirsearch_cmd, check=True)
with open("dirsearch.json") as f:
dirsearch_results = json.load(f)
results.append("=== Dirsearch Results ===\n" + json.dumps(dirsearch_results, indent=2))
except Exception as e:
results.append(f"Dirsearch error: {str(e)}")
return "\n\n".join(results)
@mcp.tool()
async def advanced_api_scan(url: str) -> str:
"""Advanced API security testing with multiple techniques.
Args:
url: Target API URL
"""
results = []
# 1. GraphQL Security Testing
if "/graphql" in url or "/graphiql" in url:
try:
# Introspection query
graphql_query = """
query IntrospectionQuery {
__schema {
types { name, fields { name, type { name } } }
queryType { name }
mutationType { name }
subscriptionType { name }
}
}
"""
async with httpx.AsyncClient(verify=False) as client:
response = await client.post(f"{url}", json={"query": graphql_query})
if response.status_code == 200:
results.append("=== GraphQL Schema ===\n" + json.dumps(response.json(), indent=2))
# Test for common GraphQL vulnerabilities
vulns = await test_graphql_vulnerabilities(url, response.json())
results.append("=== GraphQL Vulnerabilities ===\n" + vulns)
except Exception as e:
results.append(f"GraphQL testing error: {str(e)}")
# 2. REST API Testing
try:
# Test common REST endpoints
common_paths = ["/v1", "/v2", "/api", "/api/v1", "/api/v2", "/swagger", "/docs", "/openapi.json"]
async with httpx.AsyncClient(verify=False) as client:
for path in common_paths:
response = await client.get(f"{url}{path}")
if response.status_code != 404:
results.append(f"\nFound API endpoint: {path}")
results.append(f"Status: {response.status_code}")
results.append(f"Response: {response.text[:500]}...")
# If Swagger/OpenAPI found, parse and test endpoints
if "swagger" in path or "openapi" in path:
api_spec = response.json()
results.append("\n=== Testing API Endpoints ===")
for endpoint, methods in api_spec.get("paths", {}).items():
for method, details in methods.items():
test_result = await test_api_endpoint(url, endpoint, method, details)
results.append(test_result)
except Exception as e:
results.append(f"REST API testing error: {str(e)}")
return "\n\n".join(results)
@mcp.tool()
async def advanced_xss_scan(url: str) -> str:
"""Advanced XSS vulnerability scanning.
Args:
url: Target URL
"""
results = []
# 1. XSStrike with advanced options
try:
xsstrike_cmd = [
"python3", CONFIG["tools"]["xsstrike"],
"-u", url,
"--crawl",
"--params",
"--fuzzer",
"--blind",
"--vectors", CONFIG["wordlists"]["xss"]
]
xsstrike = subprocess.check_output(xsstrike_cmd, text=True)
results.append("=== XSStrike Results ===\n" + xsstrike)
except Exception as e:
results.append(f"XSStrike error: {str(e)}")
# 2. Custom XSS testing
try:
async with httpx.AsyncClient(verify=False) as client:
# Get all input parameters
response = await client.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
# Test input fields
for input_field in soup.find_all(['input', 'textarea']):
field_name = input_field.get('name', '')
if field_name:
# Test various XSS payloads
with open(CONFIG["wordlists"]["xss"]) as f:
for payload in f:
payload = payload.strip()
data = {field_name: payload}
response = await client.post(url, data=data)
if payload in response.text:
results.append(f"Potential XSS found in {field_name} with payload: {payload}")
except Exception as e:
results.append(f"Custom XSS testing error: {str(e)}")
return "\n\n".join(results)
@mcp.tool()
async def advanced_sqli_scan(url: str) -> str:
"""Advanced SQL injection testing.
Args:
url: Target URL
"""
results = []
# 1. SQLMap with advanced options
try:
sqlmap_cmd = [
"sqlmap",
"-u", url,
"--batch",
"--random-agent",
"--level", "5",
"--risk", "3",
"--threads", "10",
"--tamper=space2comment,between,randomcase",
"--time-sec", "1",
"--dump"
]
sqlmap = subprocess.check_output(sqlmap_cmd, text=True)
results.append("=== SQLMap Results ===\n" + sqlmap)
except Exception as e:
results.append(f"SQLMap error: {str(e)}")
# 2. Custom SQL injection testing
try:
async with httpx.AsyncClient(verify=False) as client:
# Test parameters
params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
for param in params:
with open(CONFIG["wordlists"]["sqli"]) as f:
for payload in f:
payload = payload.strip()
test_url = url.replace(f"{param}={params[param][0]}", f"{param}={payload}")
response = await client.get(test_url)
# Check for SQL errors
sql_errors = [
"SQL syntax",
"mysql_fetch_array",
"ORA-",
"PostgreSQL",
"SQLite3::"
]
for error in sql_errors:
if error in response.text:
results.append(f"Potential SQL injection in parameter {param} with payload: {payload}")
except Exception as e:
results.append(f"Custom SQLi testing error: {str(e)}")
return "\n\n".join(results)
@mcp.tool()
async def advanced_ssrf_scan(url: str) -> str:
"""Advanced Server-Side Request Forgery testing.
Args:
url: Target URL
"""
results = []
try:
async with httpx.AsyncClient(verify=False) as client:
# Test various SSRF payloads
with open(CONFIG["wordlists"]["ssrf"]) as f:
for payload in f:
payload = payload.strip()
# Test in different parameter positions
params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
for param in params:
test_url = url.replace(f"{param}={params[param][0]}", f"{param}={payload}")
response = await client.get(test_url)
# Check for successful SSRF
if response.status_code == 200 and len(response.text) > 0:
results.append(f"Potential SSRF in parameter {param} with payload: {payload}")
results.append(f"Response length: {len(response.text)}")
results.append(f"Response preview: {response.text[:200]}...")
except Exception as e:
results.append(f"SSRF testing error: {str(e)}")
return "\n\n".join(results)
@mcp.tool()
async def test_graphql_vulnerabilities(url: str, schema: dict) -> str:
"""Test GraphQL specific vulnerabilities.
Args:
url: GraphQL endpoint URL
schema: GraphQL schema from introspection
"""
results = []
try:
async with httpx.AsyncClient(verify=False) as client:
# 1. Test for DoS via nested queries
nested_query = "query {\n " + "user { ".repeat(10) + "id " + "}".repeat(10) + "\n}"
response = await client.post(url, json={"query": nested_query})
if response.status_code != 200:
results.append("Potential DoS vulnerability - nested queries not properly limited")
# 2. Test for sensitive data exposure
sensitive_types = ["User", "Admin", "Password", "Token", "Secret"]
for type_obj in schema["__schema"]["types"]:
if any(sensitive in type_obj["name"] for sensitive in sensitive_types):
results.append(f"Potential sensitive data exposure in type: {type_obj['name']}")
# 3. Test for batch query attacks
batch_query = [{"query": "{ __schema { types { name } } }"} for _ in range(100)]
response = await client.post(url, json=batch_query)
if response.status_code == 200:
results.append("Batch queries allowed - potential DoS vector")
except Exception as e:
results.append(f"GraphQL vulnerability testing error: {str(e)}")
return "\n\n".join(results)
@mcp.tool()
async def test_api_endpoint(base_url: str, endpoint: str, method: str, details: dict) -> str:
"""Test individual API endpoint for vulnerabilities.
Args:
base_url: Base API URL
endpoint: API endpoint path
method: HTTP method
details: Endpoint details from OpenAPI spec
"""
results = []
try:
async with httpx.AsyncClient(verify=False) as client:
# 1. Test without authentication
response = await client.request(method, f"{base_url}{endpoint}")
if response.status_code != 401:
results.append(f"Endpoint {endpoint} accessible without auth")
# 2. Test parameter fuzzing
if "parameters" in details:
for param in details["parameters"]:
if param["in"] == "query":
# Test SQL injection
test_url = f"{base_url}{endpoint}?{param['name']}=1' OR '1'='1"
response = await client.request(method, test_url)
if "error" in response.text.lower():
results.append(f"Potential SQL injection in parameter {param['name']}")
# Test XSS
test_url = f"{base_url}{endpoint}?{param['name']}=<script>alert(1)</script>"
response = await client.request(method, test_url)
if "<script>alert(1)</script>" in response.text:
results.append(f"Potential XSS in parameter {param['name']}")
# 3. Test for mass assignment
if method.lower() in ["post", "put"] and "requestBody" in details:
schema = details["requestBody"]["content"]["application/json"]["schema"]
if "properties" in schema:
# Try to inject admin/privileged fields
payload = {
"isAdmin": True,
"role": "admin",
"privileges": ["admin"],
**{k: "test" for k in schema["properties"].keys()}
}
response = await client.request(method, f"{base_url}{endpoint}", json=payload)
if response.status_code == 200:
results.append(f"Potential mass assignment vulnerability in {endpoint}")
except Exception as e:
results.append(f"API endpoint testing error: {str(e)}")
return "\n\n".join(results)
@mcp.tool()
async def advanced_recon(domain: str) -> str:
"""Perform advanced reconnaissance on a target domain.
Args:
domain: Target domain name
"""
results = []
# 1. Subdomain Enumeration
try:
# Subfinder
subdomains = subprocess.check_output(["subfinder", "-d", domain, "-silent"], text=True)
results.append("=== Subfinder Results ===\n" + subdomains)
# Amass
amass = subprocess.check_output([
"amass", "enum",
"-d", domain,
"-passive",
"-silent"
], text=True)
results.append("=== Amass Results ===\n" + amass)
# Assetfinder
assetfinder = subprocess.check_output(["assetfinder", "--subs-only", domain], text=True)
results.append("=== Assetfinder Results ===\n" + assetfinder)
# GitHub Subdomains
github_subdomains = subprocess.check_output([
"github-subdomains",
"-d", domain,
"-t", "YOUR_GITHUB_TOKEN"
], text=True)
results.append("=== GitHub Subdomains ===\n" + github_subdomains)
except Exception as e:
results.append(f"Subdomain enumeration error: {str(e)}")
# 2. Port Scanning & Service Detection
try:
# Quick Nmap scan
nmap_quick = subprocess.check_output([
"nmap",
"-sV", "-sC",
"--min-rate", "1000",
"-T4",
domain
], text=True)
results.append("=== Quick Nmap Scan ===\n" + nmap_quick)
# Detailed scan of web ports
nmap_web = subprocess.check_output([
"nmap",
"-p", "80,443,8080,8443",
"-sV", "--script=http-enum,http-headers,http-methods,http-title",
domain
], text=True)
results.append("=== Web Services Scan ===\n" + nmap_web)
except Exception as e:
results.append(f"Port scanning error: {str(e)}")
# 3. Technology Detection
try:
# Wappalyzer
wappalyzer = subprocess.check_output(["wappalyzer", f"https://{domain}"], text=True)
results.append("=== Technologies (Wappalyzer) ===\n" + wappalyzer)
# Whatweb
whatweb = subprocess.check_output(["whatweb", "-a", "3", domain], text=True)
results.append("=== Technologies (Whatweb) ===\n" + whatweb)
except Exception as e:
results.append(f"Technology detection error: {str(e)}")
# 4. DNS Information
try:
# DNSRecon
dnsrecon = subprocess.check_output(["dnsrecon", "-d", domain, "-t", "std,axfr,srv"], text=True)
results.append("=== DNS Information ===\n" + dnsrecon)
# DNS Zone Transfer
dig_axfr = subprocess.check_output(["dig", "axfr", domain], text=True)
results.append("=== DNS Zone Transfer ===\n" + dig_axfr)
except Exception as e:
results.append(f"DNS enumeration error: {str(e)}")
# 5. Web Archive
try:
# Waybackurls
wayback = subprocess.check_output(["waybackurls", domain], text=True)
results.append("=== Historical URLs ===\n" + wayback)
# Gau
gau = subprocess.check_output(["gau", domain], text=True)
results.append("=== GAU URLs ===\n" + gau)
except Exception as e:
results.append(f"Web archive error: {str(e)}")
# 6. SSL/TLS Analysis
try:
# SSLyze
sslyze = subprocess.check_output([
"sslyze",
"--regular",
domain
], text=True)
results.append("=== SSL/TLS Analysis ===\n" + sslyze)
except Exception as e:
results.append(f"SSL analysis error: {str(e)}")
# 7. Email Discovery
try:
# TheHarvester
harvester = subprocess.check_output([
"theHarvester",
"-d", domain,
"-b", "all"
], text=True)
results.append("=== Email Addresses ===\n" + harvester)
except Exception as e:
results.append(f"Email discovery error: {str(e)}")
# 8. GitHub Reconnaissance
try:
# GitHound
githound = subprocess.check_output([
"githound",
"--subdomain-file", "subdomains.txt",
"--threads", "10",
domain
], text=True)
results.append("=== GitHub Secrets ===\n" + githound)
except Exception as e:
results.append(f"GitHub recon error: {str(e)}")
# 9. Content Discovery
try:
# Hakrawler
hakrawler = subprocess.check_output([
"hakrawler",
"-url", f"https://{domain}",
"-depth", "3",
"-plain"
], text=True)
results.append("=== Content Discovery ===\n" + hakrawler)
except Exception as e:
results.append(f"Content discovery error: {str(e)}")
return "\n\n".join(results)
@mcp.tool()
async def analyze_recon_data(domain: str, recon_results: str) -> str:
"""Analyze reconnaissance data and identify potential security issues.
Args:
domain: Target domain
recon_results: Results from reconnaissance
"""
findings = []
try:
# 1. Analyze subdomains
if "Subfinder Results" in recon_results:
subdomains = re.findall(r'[\w\-\.]+\.'+domain, recon_results)
findings.append(f"Found {len(subdomains)} subdomains")
# Check for interesting subdomains
interesting = [s for s in subdomains if any(x in s for x in ['dev', 'stage', 'test', 'admin', 'internal'])]
if interesting:
findings.append(f"Potentially sensitive subdomains: {', '.join(interesting)}")
# 2. Analyze ports
if "Nmap Scan" in recon_results:
open_ports = re.findall(r'(\d+)/tcp\s+open', recon_results)
findings.append(f"Found {len(open_ports)} open ports")
# Check for dangerous ports
dangerous = [p for p in open_ports if p in ['21', '23', '3389', '445', '135']]
if dangerous:
findings.append(f"Potentially dangerous ports open: {', '.join(dangerous)}")
# 3. Analyze technologies
if "Technologies" in recon_results:
# Check for outdated versions
outdated = re.findall(r'([\w\-]+) ([\d\.]+)', recon_results)
for tech, version in outdated:
findings.append(f"Detected {tech} version {version} - check for vulnerabilities")
# 4. Analyze SSL/TLS
if "SSL/TLS Analysis" in recon_results:
if "SSLv2" in recon_results or "SSLv3" in recon_results:
findings.append("WARNING: Outdated SSL protocols detected")
if "TLSv1.0" in recon_results:
findings.append("WARNING: TLS 1.0 is enabled")
# 5. Analyze DNS
if "DNS Information" in recon_results:
if "AXFR" in recon_results:
findings.append("WARNING: DNS Zone Transfer possible")
# 6. Analyze web content
if "Content Discovery" in recon_results:
sensitive_files = re.findall(r'([\w\-\/]+\.(php|asp|aspx|jsp|config|env|git))', recon_results)
if sensitive_files:
findings.append(f"Found {len(sensitive_files)} potentially sensitive files")
# 7. Analyze GitHub findings
if "GitHub Secrets" in recon_results:
if any(x in recon_results.lower() for x in ['password', 'secret', 'key', 'token']):
findings.append("WARNING: Potential secrets found in GitHub repositories")
except Exception as e:
findings.append(f"Analysis error: {str(e)}")
return "\n\n".join(findings)
@mcp.tool()
async def advanced_full_scan(target: str, options: dict = None) -> str:
"""Perform comprehensive security assessment with advanced options.
Args:
target: Target domain/IP/application
options: Scan configuration options
"""
if options is None:
options = {
"recon": True,
"directory": True,
"api": True,
"xss": True,
"sqli": True,
"ssrf": True,
"report": True
}
results = {}
scan_tasks = []
# 1. Reconnaissance
if options.get("recon", True):
recon_results = await advanced_recon(target)
analysis = await analyze_recon_data(target, recon_results)
results["recon"] = {
"raw_results": recon_results,
"analysis": analysis
}
# 2. Directory Scanning
if options.get("directory", True):
scan_tasks.append(advanced_directory_scan(f"https://{target}"))
# Continue with other scans...
if options.get("api", True):
scan_tasks.append(advanced_api_scan(f"https://{target}"))
if options.get("xss", True):
scan_tasks.append(advanced_xss_scan(f"https://{target}"))
if options.get("sqli", True):
scan_tasks.append(advanced_sqli_scan(f"https://{target}"))
if options.get("ssrf", True):
scan_tasks.append(advanced_ssrf_scan(f"https://{target}"))
# Run remaining tasks concurrently
scan_results = await asyncio.gather(*scan_tasks, return_exceptions=True)
# Add other results
for i, task_name in enumerate(["directory", "api", "xss", "sqli", "ssrf"]):
if i < len(scan_results):
results[task_name] = scan_results[i]
# Generate report
if options.get("report", True):
report_type = options.get("report_type", "html")
await generate_report({"target": target, "results": results}, report_type)
return json.dumps(results, indent=2)
@mcp.tool()
async def generate_report(scan_results: dict, report_type: str = "html") -> str:
"""Generate a comprehensive security report.
Args:
scan_results: Dictionary containing all scan results
report_type: Output format (html, pdf, json)
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = f"{CONFIG['reporting']['output_dir']}/report_{timestamp}.{report_type}"
try:
if report_type == "html":
# Use template to generate HTML report
with open(f"{CONFIG['reporting']['template_dir']}/report.html") as f:
template = f.read()
# Replace placeholders with results
report_content = template.replace("{{RESULTS}}", json.dumps(scan_results, indent=2))
with open(report_file, "w") as f:
f.write(report_content)
elif report_type == "pdf":
# Convert HTML to PDF
import pdfkit
pdfkit.from_string(report_content, report_file)
elif report_type == "json":
with open(report_file, "w") as f:
json.dump(scan_results, f, indent=2)
return f"Report generated: {report_file}"
except Exception as e:
return f"Report generation error: {str(e)}"
@mcp.tool()
async def send_http_request(url: str, method: str = "GET", headers: dict = None, data: str = None,
verify_ssl: bool = False, timeout: int = 30) -> str:
"""Send an HTTP request to a URL and read the response with headers.
Args:
url: Target URL to send request to
method: HTTP method (GET, POST, PUT, DELETE, etc.)
headers: Dictionary of HTTP headers to include
data: Request body data
verify_ssl: Whether to verify SSL certificates
timeout: Request timeout in seconds
Returns:
String containing response headers and body
"""
try:
result = []
result.append(f"=== Request to {url} ===")
async with httpx.AsyncClient(verify=verify_ssl, timeout=float(timeout)) as client:
if method.upper() == "GET":
response = await client.get(url, headers=headers)
elif method.upper() == "POST":
response = await client.post(url, headers=headers, content=data)
elif method.upper() == "PUT":
response = await client.put(url, headers=headers, content=data)
elif method.upper() == "DELETE":
response = await client.delete(url, headers=headers)
elif method.upper() == "HEAD":
response = await client.head(url, headers=headers)
elif method.upper() == "OPTIONS":
response = await client.options(url, headers=headers)
elif method.upper() == "PATCH":
response = await client.patch(url, headers=headers, content=data)
else:
return f"Unsupported HTTP method: {method}"
# Add response information
result.append(f"Status Code: {response.status_code}")
# Add headers
result.append("\n=== Response Headers ===")
for header, value in response.headers.items():
result.append(f"{header}: {value}")
# Add response body
result.append("\n=== Response Body ===")
if 'application/json' in response.headers.get('content-type', ''):
try:
formatted_json = json.dumps(response.json(), indent=2)
result.append(formatted_json)
except:
result.append(response.text)
else:
result.append(response.text)
return "\n".join(result)
except Exception as e:
return f"Error sending HTTP request: {str(e)}"
if __name__ == "__main__":
# Create necessary directories
Path(CONFIG["reporting"]["output_dir"]).mkdir(parents=True, exist_ok=True)
# Initialize and run the server with stdio transport
mcp.run(transport='stdio')