Skip to main content
Glama

Pentest Tools MCP Server

by ch1nhpd
pentest-tools-mcp-server.py30.8 kB
#!/usr/bin/env python3 from typing import Any, List, Optional, Dict from mcp.server.fastmcp import FastMCP import httpx import subprocess import json import asyncio import re from pathlib import Path from datetime import datetime import yaml import aiofiles import aiohttp from bs4 import BeautifulSoup # import jwt import base64 import logging import threading from concurrent.futures import ThreadPoolExecutor import urllib.parse import os from dotenv import load_dotenv # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename='pentest.log' ) logger = logging.getLogger('pentest-tools') # Initialize FastMCP server mcp = FastMCP("pentest-tools") # Configuration CONFIG = { "wordlists": { "dirsearch": "/usr/share/wordlists/pentest-tools/dirsearch.txt", "common": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/Web-Content/common.txt", "api_endpoints": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/Web-Content/api-endpoints.txt", "subdomains": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/DNS/subdomains-top1million-5000.txt", "passwords": "/usr/share/wordlists/pentest-tools/SecLists/Passwords/Common-Credentials/10-million-password-list-top-1000.txt", "jwt_secrets": "/usr/share/wordlists/pentest-tools/SecLists/Passwords/Common-Credentials/common-secrets.txt", "xss": "/usr/share/wordlists/pentest-tools/xss-payloads.txt", "sqli": "/usr/share/wordlists/pentest-tools/sqli-payloads.txt", "lfi": "/usr/share/wordlists/pentest-tools/lfi-payloads.txt", "ssrf": "/usr/share/wordlists/pentest-tools/ssrf-payloads.txt" }, "tools": { "xsstrike": "/root/tools/XSStrike/xsstrike.py", "nuclei_templates": "/root/tools/nuclei-templates", "crlfuzz": "/root/go/bin/crlfuzz", "graphql_path": "/root/tools/graphql-tools" }, "reporting": { "output_dir": "reports", "template_dir": "templates" } } # Load environment variables load_dotenv() def format_ffuf_results(results): """Format the FFuf results in a readable format. Args: results: FFuf JSON results Returns: Formatted string with FFuf results """ formatted = [] # Add summary if "stats" in results: stats = results["stats"] formatted.append(f"Total requests: {stats.get('total', 0)}") formatted.append(f"Duration: {stats.get('elapsed', 0)} seconds") formatted.append(f"Requests per second: {stats.get('req_sec', 0)}") # Add results if "results" in results: formatted.append("\nDiscovered URLs:") for item in results["results"]: status = item.get("status", "") url = item.get("url", "") size = item.get("length", 0) formatted.append(f"[Status: {status}] [{size} bytes] {url}") return "\n".join(formatted) @mcp.tool() async def advanced_directory_scan(url: str, extensions: List[str] = None) -> str: """Advanced directory and file scanning with multiple tools and techniques. Args: url: Target URL extensions: List of file extensions to scan for """ if extensions is None: extensions = ["php", "asp", "aspx", "jsp", "js", "txt", "conf", "bak", "backup", "swp", "old", "db", "sql"] results = [] # 1. FFuf with advanced options try: ffuf_cmd = [ "ffuf", "-u", f"{url}/FUZZ", "-w", CONFIG["wordlists"]["dirsearch"], "-e", ",".join(extensions), "-recursion", "-recursion-depth", "3", "-mc", "all", "-ac", "-o", "ffuf.json", "-of", "json" ] subprocess.run(ffuf_cmd, check=True) with open("ffuf.json") as f: ffuf_results = json.load(f) results.append("=== FFuf Results ===\n" + format_ffuf_results(ffuf_results)) except Exception as e: results.append(f"FFuf error: {str(e)}") # 2. Dirsearch with advanced options try: dirsearch_cmd = [ "dirsearch", "-u", url, "-e", ",".join(extensions), "--deep-recursive", "--force-recursive", "--exclude-status", "404", "-o", "dirsearch.json", "--format", "json" ] subprocess.run(dirsearch_cmd, check=True) with open("dirsearch.json") as f: dirsearch_results = json.load(f) results.append("=== Dirsearch Results ===\n" + json.dumps(dirsearch_results, indent=2)) except Exception as e: results.append(f"Dirsearch error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def advanced_api_scan(url: str) -> str: """Advanced API security testing with multiple techniques. Args: url: Target API URL """ results = [] # 1. GraphQL Security Testing if "/graphql" in url or "/graphiql" in url: try: # Introspection query graphql_query = """ query IntrospectionQuery { __schema { types { name, fields { name, type { name } } } queryType { name } mutationType { name } subscriptionType { name } } } """ async with httpx.AsyncClient(verify=False) as client: response = await client.post(f"{url}", json={"query": graphql_query}) if response.status_code == 200: results.append("=== GraphQL Schema ===\n" + json.dumps(response.json(), indent=2)) # Test for common GraphQL vulnerabilities vulns = await test_graphql_vulnerabilities(url, response.json()) results.append("=== GraphQL Vulnerabilities ===\n" + vulns) except Exception as e: results.append(f"GraphQL testing error: {str(e)}") # 2. REST API Testing try: # Test common REST endpoints common_paths = ["/v1", "/v2", "/api", "/api/v1", "/api/v2", "/swagger", "/docs", "/openapi.json"] async with httpx.AsyncClient(verify=False) as client: for path in common_paths: response = await client.get(f"{url}{path}") if response.status_code != 404: results.append(f"\nFound API endpoint: {path}") results.append(f"Status: {response.status_code}") results.append(f"Response: {response.text[:500]}...") # If Swagger/OpenAPI found, parse and test endpoints if "swagger" in path or "openapi" in path: api_spec = response.json() results.append("\n=== Testing API Endpoints ===") for endpoint, methods in api_spec.get("paths", {}).items(): for method, details in methods.items(): test_result = await test_api_endpoint(url, endpoint, method, details) results.append(test_result) except Exception as e: results.append(f"REST API testing error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def advanced_xss_scan(url: str) -> str: """Advanced XSS vulnerability scanning. Args: url: Target URL """ results = [] # 1. XSStrike with advanced options try: xsstrike_cmd = [ "python3", CONFIG["tools"]["xsstrike"], "-u", url, "--crawl", "--params", "--fuzzer", "--blind", "--vectors", CONFIG["wordlists"]["xss"] ] xsstrike = subprocess.check_output(xsstrike_cmd, text=True) results.append("=== XSStrike Results ===\n" + xsstrike) except Exception as e: results.append(f"XSStrike error: {str(e)}") # 2. Custom XSS testing try: async with httpx.AsyncClient(verify=False) as client: # Get all input parameters response = await client.get(url) soup = BeautifulSoup(response.text, 'html.parser') # Test input fields for input_field in soup.find_all(['input', 'textarea']): field_name = input_field.get('name', '') if field_name: # Test various XSS payloads with open(CONFIG["wordlists"]["xss"]) as f: for payload in f: payload = payload.strip() data = {field_name: payload} response = await client.post(url, data=data) if payload in response.text: results.append(f"Potential XSS found in {field_name} with payload: {payload}") except Exception as e: results.append(f"Custom XSS testing error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def advanced_sqli_scan(url: str) -> str: """Advanced SQL injection testing. Args: url: Target URL """ results = [] # 1. SQLMap with advanced options try: sqlmap_cmd = [ "sqlmap", "-u", url, "--batch", "--random-agent", "--level", "5", "--risk", "3", "--threads", "10", "--tamper=space2comment,between,randomcase", "--time-sec", "1", "--dump" ] sqlmap = subprocess.check_output(sqlmap_cmd, text=True) results.append("=== SQLMap Results ===\n" + sqlmap) except Exception as e: results.append(f"SQLMap error: {str(e)}") # 2. Custom SQL injection testing try: async with httpx.AsyncClient(verify=False) as client: # Test parameters params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query) for param in params: with open(CONFIG["wordlists"]["sqli"]) as f: for payload in f: payload = payload.strip() test_url = url.replace(f"{param}={params[param][0]}", f"{param}={payload}") response = await client.get(test_url) # Check for SQL errors sql_errors = [ "SQL syntax", "mysql_fetch_array", "ORA-", "PostgreSQL", "SQLite3::" ] for error in sql_errors: if error in response.text: results.append(f"Potential SQL injection in parameter {param} with payload: {payload}") except Exception as e: results.append(f"Custom SQLi testing error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def advanced_ssrf_scan(url: str) -> str: """Advanced Server-Side Request Forgery testing. Args: url: Target URL """ results = [] try: async with httpx.AsyncClient(verify=False) as client: # Test various SSRF payloads with open(CONFIG["wordlists"]["ssrf"]) as f: for payload in f: payload = payload.strip() # Test in different parameter positions params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query) for param in params: test_url = url.replace(f"{param}={params[param][0]}", f"{param}={payload}") response = await client.get(test_url) # Check for successful SSRF if response.status_code == 200 and len(response.text) > 0: results.append(f"Potential SSRF in parameter {param} with payload: {payload}") results.append(f"Response length: {len(response.text)}") results.append(f"Response preview: {response.text[:200]}...") except Exception as e: results.append(f"SSRF testing error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def test_graphql_vulnerabilities(url: str, schema: dict) -> str: """Test GraphQL specific vulnerabilities. Args: url: GraphQL endpoint URL schema: GraphQL schema from introspection """ results = [] try: async with httpx.AsyncClient(verify=False) as client: # 1. Test for DoS via nested queries nested_query = "query {\n " + "user { ".repeat(10) + "id " + "}".repeat(10) + "\n}" response = await client.post(url, json={"query": nested_query}) if response.status_code != 200: results.append("Potential DoS vulnerability - nested queries not properly limited") # 2. Test for sensitive data exposure sensitive_types = ["User", "Admin", "Password", "Token", "Secret"] for type_obj in schema["__schema"]["types"]: if any(sensitive in type_obj["name"] for sensitive in sensitive_types): results.append(f"Potential sensitive data exposure in type: {type_obj['name']}") # 3. Test for batch query attacks batch_query = [{"query": "{ __schema { types { name } } }"} for _ in range(100)] response = await client.post(url, json=batch_query) if response.status_code == 200: results.append("Batch queries allowed - potential DoS vector") except Exception as e: results.append(f"GraphQL vulnerability testing error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def test_api_endpoint(base_url: str, endpoint: str, method: str, details: dict) -> str: """Test individual API endpoint for vulnerabilities. Args: base_url: Base API URL endpoint: API endpoint path method: HTTP method details: Endpoint details from OpenAPI spec """ results = [] try: async with httpx.AsyncClient(verify=False) as client: # 1. Test without authentication response = await client.request(method, f"{base_url}{endpoint}") if response.status_code != 401: results.append(f"Endpoint {endpoint} accessible without auth") # 2. Test parameter fuzzing if "parameters" in details: for param in details["parameters"]: if param["in"] == "query": # Test SQL injection test_url = f"{base_url}{endpoint}?{param['name']}=1' OR '1'='1" response = await client.request(method, test_url) if "error" in response.text.lower(): results.append(f"Potential SQL injection in parameter {param['name']}") # Test XSS test_url = f"{base_url}{endpoint}?{param['name']}=<script>alert(1)</script>" response = await client.request(method, test_url) if "<script>alert(1)</script>" in response.text: results.append(f"Potential XSS in parameter {param['name']}") # 3. Test for mass assignment if method.lower() in ["post", "put"] and "requestBody" in details: schema = details["requestBody"]["content"]["application/json"]["schema"] if "properties" in schema: # Try to inject admin/privileged fields payload = { "isAdmin": True, "role": "admin", "privileges": ["admin"], **{k: "test" for k in schema["properties"].keys()} } response = await client.request(method, f"{base_url}{endpoint}", json=payload) if response.status_code == 200: results.append(f"Potential mass assignment vulnerability in {endpoint}") except Exception as e: results.append(f"API endpoint testing error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def advanced_recon(domain: str) -> str: """Perform advanced reconnaissance on a target domain. Args: domain: Target domain name """ results = [] # 1. Subdomain Enumeration try: # Subfinder subdomains = subprocess.check_output(["subfinder", "-d", domain, "-silent"], text=True) results.append("=== Subfinder Results ===\n" + subdomains) # Amass amass = subprocess.check_output([ "amass", "enum", "-d", domain, "-passive", "-silent" ], text=True) results.append("=== Amass Results ===\n" + amass) # Assetfinder assetfinder = subprocess.check_output(["assetfinder", "--subs-only", domain], text=True) results.append("=== Assetfinder Results ===\n" + assetfinder) # GitHub Subdomains github_subdomains = subprocess.check_output([ "github-subdomains", "-d", domain, "-t", "YOUR_GITHUB_TOKEN" ], text=True) results.append("=== GitHub Subdomains ===\n" + github_subdomains) except Exception as e: results.append(f"Subdomain enumeration error: {str(e)}") # 2. Port Scanning & Service Detection try: # Quick Nmap scan nmap_quick = subprocess.check_output([ "nmap", "-sV", "-sC", "--min-rate", "1000", "-T4", domain ], text=True) results.append("=== Quick Nmap Scan ===\n" + nmap_quick) # Detailed scan of web ports nmap_web = subprocess.check_output([ "nmap", "-p", "80,443,8080,8443", "-sV", "--script=http-enum,http-headers,http-methods,http-title", domain ], text=True) results.append("=== Web Services Scan ===\n" + nmap_web) except Exception as e: results.append(f"Port scanning error: {str(e)}") # 3. Technology Detection try: # Wappalyzer wappalyzer = subprocess.check_output(["wappalyzer", f"https://{domain}"], text=True) results.append("=== Technologies (Wappalyzer) ===\n" + wappalyzer) # Whatweb whatweb = subprocess.check_output(["whatweb", "-a", "3", domain], text=True) results.append("=== Technologies (Whatweb) ===\n" + whatweb) except Exception as e: results.append(f"Technology detection error: {str(e)}") # 4. DNS Information try: # DNSRecon dnsrecon = subprocess.check_output(["dnsrecon", "-d", domain, "-t", "std,axfr,srv"], text=True) results.append("=== DNS Information ===\n" + dnsrecon) # DNS Zone Transfer dig_axfr = subprocess.check_output(["dig", "axfr", domain], text=True) results.append("=== DNS Zone Transfer ===\n" + dig_axfr) except Exception as e: results.append(f"DNS enumeration error: {str(e)}") # 5. Web Archive try: # Waybackurls wayback = subprocess.check_output(["waybackurls", domain], text=True) results.append("=== Historical URLs ===\n" + wayback) # Gau gau = subprocess.check_output(["gau", domain], text=True) results.append("=== GAU URLs ===\n" + gau) except Exception as e: results.append(f"Web archive error: {str(e)}") # 6. SSL/TLS Analysis try: # SSLyze sslyze = subprocess.check_output([ "sslyze", "--regular", domain ], text=True) results.append("=== SSL/TLS Analysis ===\n" + sslyze) except Exception as e: results.append(f"SSL analysis error: {str(e)}") # 7. Email Discovery try: # TheHarvester harvester = subprocess.check_output([ "theHarvester", "-d", domain, "-b", "all" ], text=True) results.append("=== Email Addresses ===\n" + harvester) except Exception as e: results.append(f"Email discovery error: {str(e)}") # 8. GitHub Reconnaissance try: # GitHound githound = subprocess.check_output([ "githound", "--subdomain-file", "subdomains.txt", "--threads", "10", domain ], text=True) results.append("=== GitHub Secrets ===\n" + githound) except Exception as e: results.append(f"GitHub recon error: {str(e)}") # 9. Content Discovery try: # Hakrawler hakrawler = subprocess.check_output([ "hakrawler", "-url", f"https://{domain}", "-depth", "3", "-plain" ], text=True) results.append("=== Content Discovery ===\n" + hakrawler) except Exception as e: results.append(f"Content discovery error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def analyze_recon_data(domain: str, recon_results: str) -> str: """Analyze reconnaissance data and identify potential security issues. Args: domain: Target domain recon_results: Results from reconnaissance """ findings = [] try: # 1. Analyze subdomains if "Subfinder Results" in recon_results: subdomains = re.findall(r'[\w\-\.]+\.'+domain, recon_results) findings.append(f"Found {len(subdomains)} subdomains") # Check for interesting subdomains interesting = [s for s in subdomains if any(x in s for x in ['dev', 'stage', 'test', 'admin', 'internal'])] if interesting: findings.append(f"Potentially sensitive subdomains: {', '.join(interesting)}") # 2. Analyze ports if "Nmap Scan" in recon_results: open_ports = re.findall(r'(\d+)/tcp\s+open', recon_results) findings.append(f"Found {len(open_ports)} open ports") # Check for dangerous ports dangerous = [p for p in open_ports if p in ['21', '23', '3389', '445', '135']] if dangerous: findings.append(f"Potentially dangerous ports open: {', '.join(dangerous)}") # 3. Analyze technologies if "Technologies" in recon_results: # Check for outdated versions outdated = re.findall(r'([\w\-]+) ([\d\.]+)', recon_results) for tech, version in outdated: findings.append(f"Detected {tech} version {version} - check for vulnerabilities") # 4. Analyze SSL/TLS if "SSL/TLS Analysis" in recon_results: if "SSLv2" in recon_results or "SSLv3" in recon_results: findings.append("WARNING: Outdated SSL protocols detected") if "TLSv1.0" in recon_results: findings.append("WARNING: TLS 1.0 is enabled") # 5. Analyze DNS if "DNS Information" in recon_results: if "AXFR" in recon_results: findings.append("WARNING: DNS Zone Transfer possible") # 6. Analyze web content if "Content Discovery" in recon_results: sensitive_files = re.findall(r'([\w\-\/]+\.(php|asp|aspx|jsp|config|env|git))', recon_results) if sensitive_files: findings.append(f"Found {len(sensitive_files)} potentially sensitive files") # 7. Analyze GitHub findings if "GitHub Secrets" in recon_results: if any(x in recon_results.lower() for x in ['password', 'secret', 'key', 'token']): findings.append("WARNING: Potential secrets found in GitHub repositories") except Exception as e: findings.append(f"Analysis error: {str(e)}") return "\n\n".join(findings) @mcp.tool() async def advanced_full_scan(target: str, options: dict = None) -> str: """Perform comprehensive security assessment with advanced options. Args: target: Target domain/IP/application options: Scan configuration options """ if options is None: options = { "recon": True, "directory": True, "api": True, "xss": True, "sqli": True, "ssrf": True, "report": True } results = {} scan_tasks = [] # 1. Reconnaissance if options.get("recon", True): recon_results = await advanced_recon(target) analysis = await analyze_recon_data(target, recon_results) results["recon"] = { "raw_results": recon_results, "analysis": analysis } # 2. Directory Scanning if options.get("directory", True): scan_tasks.append(advanced_directory_scan(f"https://{target}")) # Continue with other scans... if options.get("api", True): scan_tasks.append(advanced_api_scan(f"https://{target}")) if options.get("xss", True): scan_tasks.append(advanced_xss_scan(f"https://{target}")) if options.get("sqli", True): scan_tasks.append(advanced_sqli_scan(f"https://{target}")) if options.get("ssrf", True): scan_tasks.append(advanced_ssrf_scan(f"https://{target}")) # Run remaining tasks concurrently scan_results = await asyncio.gather(*scan_tasks, return_exceptions=True) # Add other results for i, task_name in enumerate(["directory", "api", "xss", "sqli", "ssrf"]): if i < len(scan_results): results[task_name] = scan_results[i] # Generate report if options.get("report", True): report_type = options.get("report_type", "html") await generate_report({"target": target, "results": results}, report_type) return json.dumps(results, indent=2) @mcp.tool() async def generate_report(scan_results: dict, report_type: str = "html") -> str: """Generate a comprehensive security report. Args: scan_results: Dictionary containing all scan results report_type: Output format (html, pdf, json) """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_file = f"{CONFIG['reporting']['output_dir']}/report_{timestamp}.{report_type}" try: if report_type == "html": # Use template to generate HTML report with open(f"{CONFIG['reporting']['template_dir']}/report.html") as f: template = f.read() # Replace placeholders with results report_content = template.replace("{{RESULTS}}", json.dumps(scan_results, indent=2)) with open(report_file, "w") as f: f.write(report_content) elif report_type == "pdf": # Convert HTML to PDF import pdfkit pdfkit.from_string(report_content, report_file) elif report_type == "json": with open(report_file, "w") as f: json.dump(scan_results, f, indent=2) return f"Report generated: {report_file}" except Exception as e: return f"Report generation error: {str(e)}" @mcp.tool() async def send_http_request(url: str, method: str = "GET", headers: dict = None, data: str = None, verify_ssl: bool = False, timeout: int = 30) -> str: """Send an HTTP request to a URL and read the response with headers. Args: url: Target URL to send request to method: HTTP method (GET, POST, PUT, DELETE, etc.) headers: Dictionary of HTTP headers to include data: Request body data verify_ssl: Whether to verify SSL certificates timeout: Request timeout in seconds Returns: String containing response headers and body """ try: result = [] result.append(f"=== Request to {url} ===") async with httpx.AsyncClient(verify=verify_ssl, timeout=float(timeout)) as client: if method.upper() == "GET": response = await client.get(url, headers=headers) elif method.upper() == "POST": response = await client.post(url, headers=headers, content=data) elif method.upper() == "PUT": response = await client.put(url, headers=headers, content=data) elif method.upper() == "DELETE": response = await client.delete(url, headers=headers) elif method.upper() == "HEAD": response = await client.head(url, headers=headers) elif method.upper() == "OPTIONS": response = await client.options(url, headers=headers) elif method.upper() == "PATCH": response = await client.patch(url, headers=headers, content=data) else: return f"Unsupported HTTP method: {method}" # Add response information result.append(f"Status Code: {response.status_code}") # Add headers result.append("\n=== Response Headers ===") for header, value in response.headers.items(): result.append(f"{header}: {value}") # Add response body result.append("\n=== Response Body ===") if 'application/json' in response.headers.get('content-type', ''): try: formatted_json = json.dumps(response.json(), indent=2) result.append(formatted_json) except: result.append(response.text) else: result.append(response.text) return "\n".join(result) except Exception as e: return f"Error sending HTTP request: {str(e)}" if __name__ == "__main__": # Create necessary directories Path(CONFIG["reporting"]["output_dir"]).mkdir(parents=True, exist_ok=True) # Initialize and run the server with stdio transport mcp.run(transport='stdio')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ch1nhpd/Pentest-Tools-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server