Skip to main content
Glama
bug_bounty_mcp.py•27.5 kB
#!/usr/bin/env python3 """ Bug Bounty Hunter MCP Server Professional MCP server with 100+ security tools for bug bounty hunting """ import asyncio import os import sys from pathlib import Path # Add src to path sys.path.insert(0, str(Path(__file__).parent)) from fastmcp import FastMCP from src.core.tool_manager import ToolManager from src.utils.validators import validate_url, validate_domain from src.utils.helpers import run_command, parse_output # Initialize MCP server mcp = FastMCP("Bug Bounty Hunter MCP") # Initialize tool manager tool_manager = ToolManager() # ============================================================================ # RECONNAISSANCE TOOLS # ============================================================================ @mcp.tool() async def subdomain_enumeration( domain: str, tools: list[str] = ["subfinder", "amass", "assetfinder"], passive_only: bool = False ) -> dict: """ Enumerate subdomains using multiple tools (subfinder, amass, assetfinder, etc.) Args: domain: Target domain (e.g., example.com) tools: List of tools to use (subfinder, amass, assetfinder, sublist3r) passive_only: Only use passive methods (no active DNS queries) Returns: Dictionary with subdomains found by each tool """ from src.tools.recon.subdomain_enum import enumerate_subdomains if not validate_domain(domain): return {"error": "Invalid domain format"} result = await enumerate_subdomains(domain, tools, passive_only) return result @mcp.tool() async def port_scan( target: str, ports: str = "top-1000", scan_type: str = "quick", service_detection: bool = True ) -> dict: """ Scan ports using nmap/masscan/naabu Args: target: Target IP/domain or CIDR range ports: Port specification (top-1000, all, 1-65535, or custom) scan_type: quick, full, stealth, or aggressive service_detection: Enable service/version detection Returns: Dictionary with open ports and services """ from src.tools.recon.port_scan import scan_ports result = await scan_ports(target, ports, scan_type, service_detection) return result @mcp.tool() async def http_probe( targets: list[str], check_title: bool = True, check_status: bool = True, check_tech: bool = True, threads: int = 50 ) -> dict: """ Probe HTTP/HTTPS services using httpx Args: targets: List of domains/IPs to probe check_title: Extract page titles check_status: Check HTTP status codes check_tech: Detect technologies (Wappalyzer) threads: Number of concurrent threads Returns: Dictionary with live hosts and their details """ from src.tools.recon.http_probe import probe_http result = await probe_http(targets, check_title, check_status, check_tech, threads) return result @mcp.tool() async def dns_enumeration( domain: str, record_types: list[str] = ["A", "AAAA", "CNAME", "MX", "NS", "TXT"], bruteforce: bool = False ) -> dict: """ Comprehensive DNS enumeration Args: domain: Target domain record_types: DNS record types to query bruteforce: Enable DNS bruteforce with wordlist Returns: Dictionary with DNS records """ from src.tools.recon.dns_enum import enumerate_dns result = await enumerate_dns(domain, record_types, bruteforce) return result @mcp.tool() async def technology_detection( url: str, detailed: bool = True ) -> dict: """ Detect technologies used by target (CMS, frameworks, servers, etc.) Args: url: Target URL detailed: Include detailed version information Returns: Dictionary with detected technologies """ from src.tools.recon.tech_detect import detect_technologies if not validate_url(url): return {"error": "Invalid URL format"} result = await detect_technologies(url, detailed) return result @mcp.tool() async def wayback_urls( domain: str, filter_extensions: list[str] = [], match_pattern: str = "" ) -> dict: """ Fetch URLs from Wayback Machine and other archives Args: domain: Target domain filter_extensions: File extensions to filter (js, php, asp, etc.) match_pattern: Regex pattern to match URLs Returns: List of archived URLs """ from src.tools.recon.wayback import fetch_wayback_urls result = await fetch_wayback_urls(domain, filter_extensions, match_pattern) return result # ============================================================================ # WEB CRAWLING & SPIDERING # ============================================================================ @mcp.tool() async def web_crawler( url: str, depth: int = 3, include_js: bool = True, extract_endpoints: bool = True ) -> dict: """ Crawl website and extract URLs, endpoints, and parameters Args: url: Starting URL depth: Crawling depth include_js: Parse JavaScript files for endpoints extract_endpoints: Extract API endpoints Returns: Dictionary with discovered URLs and endpoints """ from src.tools.recon.crawler import crawl_website result = await crawl_website(url, depth, include_js, extract_endpoints) return result @mcp.tool() async def javascript_analysis( url: str, extract_endpoints: bool = True, extract_secrets: bool = True ) -> dict: """ Analyze JavaScript files for endpoints, secrets, and vulnerabilities Args: url: Target URL or JS file extract_endpoints: Extract API endpoints extract_secrets: Look for hardcoded secrets (API keys, tokens) Returns: Dictionary with findings """ from src.tools.recon.js_analysis import analyze_javascript result = await analyze_javascript(url, extract_endpoints, extract_secrets) return result @mcp.tool() async def parameter_discovery( url: str, method: str = "all" ) -> dict: """ Discover hidden parameters using various techniques Args: url: Target URL method: Discovery method (all, arjun, paramspider, wordlist) Returns: List of discovered parameters """ from src.tools.recon.param_discovery import discover_parameters result = await discover_parameters(url, method) return result # ============================================================================ # VULNERABILITY SCANNING # ============================================================================ @mcp.tool() async def nuclei_scan( target: str, templates: list[str] = ["all"], severity: list[str] = ["critical", "high", "medium"], rate_limit: int = 150 ) -> dict: """ Run Nuclei vulnerability scanner with templates Args: target: Target URL or list file templates: Template categories (cves, exposures, misconfigurations, etc.) severity: Filter by severity levels rate_limit: Requests per second Returns: Dictionary with vulnerabilities found """ from src.tools.vuln_scan.nuclei_runner import run_nuclei result = await run_nuclei(target, templates, severity, rate_limit) return result @mcp.tool() async def xss_scanner( url: str, parameters: list[str] = [], payload_type: str = "all" ) -> dict: """ Scan for XSS vulnerabilities (reflected, stored, DOM-based) Args: url: Target URL parameters: Specific parameters to test (empty = auto-detect) payload_type: Type of payloads (reflected, dom, stored, all) Returns: Dictionary with XSS vulnerabilities found """ from src.tools.vuln_scan.xss_scanner import scan_xss result = await scan_xss(url, parameters, payload_type) return result @mcp.tool() async def sql_injection_scan( url: str, parameters: list[str] = [], dbms: str = "auto", level: int = 3, risk: int = 2 ) -> dict: """ Test for SQL injection vulnerabilities using sqlmap Args: url: Target URL parameters: Parameters to test dbms: Database type (auto, mysql, mssql, postgresql, oracle) level: Test level (1-5) risk: Risk level (1-3) Returns: Dictionary with SQL injection findings """ from src.tools.vuln_scan.sqli_scanner import scan_sqli result = await scan_sqli(url, parameters, dbms, level, risk) return result @mcp.tool() async def ssrf_scanner( url: str, payloads: str = "auto", callback_url: str = "" ) -> dict: """ Test for SSRF (Server-Side Request Forgery) vulnerabilities Args: url: Target URL with injectable parameter payloads: Payload type (auto, cloud-metadata, internal-services) callback_url: Your callback URL for OOB testing (e.g., Burp Collaborator) Returns: Dictionary with SSRF findings """ from src.tools.vuln_scan.ssrf_scanner import scan_ssrf result = await scan_ssrf(url, payloads, callback_url) return result @mcp.tool() async def cors_misconfiguration( url: str ) -> dict: """ Test for CORS misconfigurations Args: url: Target URL Returns: Dictionary with CORS issues found """ from src.tools.vuln_scan.cors_scanner import scan_cors result = await scan_cors(url) return result # ============================================================================ # FUZZING & BRUTE-FORCE # ============================================================================ @mcp.tool() async def directory_fuzzing( url: str, wordlist: str = "common", extensions: list[str] = ["php", "html", "js", "txt"], threads: int = 50, recursion_depth: int = 2 ) -> dict: """ Fuzz directories and files using ffuf/gobuster Args: url: Target base URL wordlist: Wordlist to use (common, medium, large, or custom path) extensions: File extensions to append threads: Number of concurrent threads recursion_depth: How deep to recurse Returns: Dictionary with discovered paths """ from src.tools.fuzzing.directory_fuzz import fuzz_directories result = await fuzz_directories(url, wordlist, extensions, threads, recursion_depth) return result @mcp.tool() async def parameter_fuzzing( url: str, wordlist: str = "parameters", method: str = "GET", filter_status: list[int] = [404] ) -> dict: """ Fuzz for hidden parameters Args: url: Target URL wordlist: Parameter wordlist (parameters, custom path) method: HTTP method (GET, POST) filter_status: Status codes to filter out Returns: Dictionary with discovered parameters """ from src.tools.fuzzing.param_fuzz import fuzz_parameters result = await fuzz_parameters(url, wordlist, method, filter_status) return result @mcp.tool() async def subdomain_bruteforce( domain: str, wordlist: str = "common", resolver: str = "8.8.8.8", threads: int = 100 ) -> dict: """ Bruteforce subdomains using DNS resolution Args: domain: Target domain wordlist: Subdomain wordlist (common, large, or custom path) resolver: DNS resolver to use threads: Number of concurrent threads Returns: List of valid subdomains """ from src.tools.fuzzing.subdomain_bruteforce import bruteforce_subdomains result = await bruteforce_subdomains(domain, wordlist, resolver, threads) return result @mcp.tool() async def vhost_fuzzing( ip: str, base_domain: str, wordlist: str = "vhosts", filter_length: bool = True ) -> dict: """ Fuzz for virtual hosts Args: ip: Target IP address base_domain: Base domain for Host header wordlist: VHost wordlist filter_length: Filter by response length Returns: Dictionary with discovered vhosts """ from src.tools.fuzzing.vhost_fuzz import fuzz_vhosts result = await fuzz_vhosts(ip, base_domain, wordlist, filter_length) return result # ============================================================================ # API TESTING # ============================================================================ @mcp.tool() async def api_discovery( url: str, crawl: bool = True ) -> dict: """ Discover API endpoints and documentation Args: url: Target base URL crawl: Crawl for API endpoints Returns: Dictionary with discovered APIs """ from src.tools.api.api_discovery import discover_apis result = await discover_apis(url, crawl) return result @mcp.tool() async def swagger_parser( url: str ) -> dict: """ Parse and analyze Swagger/OpenAPI documentation Args: url: URL to Swagger JSON/YAML Returns: Dictionary with parsed API endpoints and schemas """ from src.tools.api.swagger_parser import parse_swagger result = await parse_swagger(url) return result @mcp.tool() async def graphql_testing( url: str, introspection: bool = True, test_mutations: bool = True ) -> dict: """ Test GraphQL endpoints for vulnerabilities Args: url: GraphQL endpoint URL introspection: Attempt introspection query test_mutations: Test for dangerous mutations Returns: Dictionary with GraphQL findings """ from src.tools.api.graphql_tester import test_graphql result = await test_graphql(url, introspection, test_mutations) return result @mcp.tool() async def api_rate_limit_test( url: str, requests: int = 100, concurrent: int = 10 ) -> dict: """ Test API rate limiting Args: url: Target API endpoint requests: Total requests to send concurrent: Concurrent requests Returns: Dictionary with rate limit findings """ from src.tools.api.rate_limit import test_rate_limit result = await test_rate_limit(url, requests, concurrent) return result # ============================================================================ # INJECTION ATTACKS # ============================================================================ @mcp.tool() async def command_injection_test( url: str, parameters: list[str] = [], callback_url: str = "" ) -> dict: """ Test for command injection vulnerabilities Args: url: Target URL parameters: Parameters to test callback_url: Callback URL for OOB detection Returns: Dictionary with command injection findings """ from src.tools.injection.command_injection import test_command_injection result = await test_command_injection(url, parameters, callback_url) return result @mcp.tool() async def xxe_injection_test( url: str, method: str = "POST" ) -> dict: """ Test for XXE (XML External Entity) injection Args: url: Target URL accepting XML method: HTTP method Returns: Dictionary with XXE findings """ from src.tools.injection.xxe_injection import test_xxe result = await test_xxe(url, method) return result @mcp.tool() async def ssti_scanner( url: str, parameters: list[str] = [], template_engines: list[str] = ["all"] ) -> dict: """ Test for SSTI (Server-Side Template Injection) Args: url: Target URL parameters: Parameters to test template_engines: Engines to test (jinja2, twig, freemarker, velocity, all) Returns: Dictionary with SSTI findings """ from src.tools.injection.ssti_scanner import scan_ssti result = await scan_ssti(url, parameters, template_engines) return result @mcp.tool() async def ldap_injection_test( url: str, parameters: list[str] = [] ) -> dict: """ Test for LDAP injection vulnerabilities Args: url: Target URL parameters: Parameters to test Returns: Dictionary with LDAP injection findings """ from src.tools.injection.ldap_injection import test_ldap_injection result = await test_ldap_injection(url, parameters) return result @mcp.tool() async def nosql_injection_test( url: str, parameters: list[str] = [], database: str = "auto" ) -> dict: """ Test for NoSQL injection vulnerabilities Args: url: Target URL parameters: Parameters to test database: Database type (auto, mongodb, couchdb, etc.) Returns: Dictionary with NoSQL injection findings """ from src.tools.injection.nosql_injection import test_nosql_injection result = await test_nosql_injection(url, parameters, database) return result # ============================================================================ # ACCESS CONTROL # ============================================================================ @mcp.tool() async def idor_scanner( url: str, parameter: str, min_value: int = 1, max_value: int = 1000, step: int = 1 ) -> dict: """ Test for IDOR (Insecure Direct Object Reference) vulnerabilities Args: url: Target URL with ID parameter parameter: Parameter name to fuzz min_value: Starting value max_value: Ending value step: Increment step Returns: Dictionary with IDOR findings """ from src.tools.access_control.idor_scanner import scan_idor result = await scan_idor(url, parameter, min_value, max_value, step) return result @mcp.tool() async def path_traversal_test( url: str, parameters: list[str] = [], os_type: str = "auto" ) -> dict: """ Test for path traversal/directory traversal vulnerabilities Args: url: Target URL parameters: Parameters to test os_type: Operating system (auto, linux, windows) Returns: Dictionary with path traversal findings """ from src.tools.access_control.path_traversal import test_path_traversal result = await test_path_traversal(url, parameters, os_type) return result @mcp.tool() async def lfi_rfi_scanner( url: str, parameters: list[str] = [], test_rfi: bool = True ) -> dict: """ Test for LFI (Local File Inclusion) and RFI (Remote File Inclusion) Args: url: Target URL parameters: Parameters to test test_rfi: Also test for RFI Returns: Dictionary with LFI/RFI findings """ from src.tools.access_control.lfi_rfi_scanner import scan_lfi_rfi result = await scan_lfi_rfi(url, parameters, test_rfi) return result # ============================================================================ # AUTHENTICATION & SESSION # ============================================================================ @mcp.tool() async def jwt_analyzer( token: str, wordlist: str = "" ) -> dict: """ Analyze and test JWT tokens for vulnerabilities Args: token: JWT token to analyze wordlist: Wordlist for secret bruteforce (optional) Returns: Dictionary with JWT analysis and vulnerabilities """ from src.tools.auth.jwt_analyzer import analyze_jwt result = await analyze_jwt(token, wordlist) return result @mcp.tool() async def session_analysis( url: str, cookie: str = "" ) -> dict: """ Analyze session management security Args: url: Target URL cookie: Session cookie to analyze Returns: Dictionary with session security findings """ from src.tools.auth.session_analyzer import analyze_session result = await analyze_session(url, cookie) return result @mcp.tool() async def oauth_tester( client_id: str, redirect_uri: str, authorization_endpoint: str ) -> dict: """ Test OAuth implementation for vulnerabilities Args: client_id: OAuth client ID redirect_uri: Redirect URI authorization_endpoint: Authorization endpoint URL Returns: Dictionary with OAuth vulnerabilities """ from src.tools.auth.oauth_tester import test_oauth result = await test_oauth(client_id, redirect_uri, authorization_endpoint) return result # ============================================================================ # CLOUD SECURITY # ============================================================================ @mcp.tool() async def s3_bucket_scanner( domain: str, permutations: bool = True ) -> dict: """ Scan for AWS S3 buckets and test permissions Args: domain: Target domain permutations: Generate bucket name permutations Returns: Dictionary with S3 buckets found and their permissions """ from src.tools.cloud.s3_scanner import scan_s3_buckets result = await scan_s3_buckets(domain, permutations) return result @mcp.tool() async def subdomain_takeover_check( subdomains: list[str] ) -> dict: """ Check for subdomain takeover vulnerabilities Args: subdomains: List of subdomains to check Returns: Dictionary with vulnerable subdomains """ from src.tools.cloud.takeover_checker import check_takeover result = await check_takeover(subdomains) return result @mcp.tool() async def cloud_metadata_test( url: str ) -> dict: """ Test for cloud metadata endpoints (AWS, Azure, GCP) Args: url: Target URL with potential SSRF Returns: Dictionary with metadata access findings """ from src.tools.cloud.metadata_tester import test_metadata result = await test_metadata(url) return result # ============================================================================ # CONTENT DISCOVERY # ============================================================================ @mcp.tool() async def sensitive_file_scanner( url: str, check_backups: bool = True, check_configs: bool = True, check_git: bool = True ) -> dict: """ Scan for sensitive files and exposures Args: url: Target base URL check_backups: Check for backup files (.bak, .old, etc.) check_configs: Check for config files check_git: Check for .git exposure Returns: Dictionary with sensitive files found """ from src.tools.content.sensitive_files import scan_sensitive_files result = await scan_sensitive_files(url, check_backups, check_configs, check_git) return result @mcp.tool() async def git_exposure_scanner( url: str, dump: bool = True ) -> dict: """ Scan and dump exposed .git directories Args: url: Target URL dump: Attempt to dump git repository Returns: Dictionary with git exposure details """ from src.tools.content.git_scanner import scan_git_exposure result = await scan_git_exposure(url, dump) return result @mcp.tool() async def robots_sitemap_analyzer( url: str ) -> dict: """ Analyze robots.txt and sitemap.xml for interesting paths Args: url: Target base URL Returns: Dictionary with paths from robots.txt and sitemap """ from src.tools.content.robots_analyzer import analyze_robots_sitemap result = await analyze_robots_sitemap(url) return result # ============================================================================ # SSL/TLS TESTING # ============================================================================ @mcp.tool() async def ssl_tls_scanner( domain: str, port: int = 443 ) -> dict: """ Comprehensive SSL/TLS security testing Args: domain: Target domain port: HTTPS port (default 443) Returns: Dictionary with SSL/TLS findings """ from src.tools.ssl.ssl_scanner import scan_ssl_tls result = await scan_ssl_tls(domain, port) return result @mcp.tool() async def certificate_transparency( domain: str ) -> dict: """ Query certificate transparency logs for subdomains Args: domain: Target domain Returns: List of subdomains from CT logs """ from src.tools.ssl.ct_logs import query_ct_logs result = await query_ct_logs(domain) return result # ============================================================================ # WORKFLOWS & AUTOMATION # ============================================================================ @mcp.tool() async def full_reconnaissance( domain: str, deep_scan: bool = False ) -> dict: """ Complete reconnaissance workflow combining multiple tools Args: domain: Target domain deep_scan: Enable deep scanning (slower but more thorough) Returns: Comprehensive reconnaissance report """ from src.workflows.full_recon import run_full_recon result = await run_full_recon(domain, deep_scan) return result @mcp.tool() async def web_vulnerability_scan( url: str, scan_level: str = "medium" ) -> dict: """ Automated web vulnerability scanning workflow Args: url: Target URL scan_level: Scan intensity (quick, medium, thorough) Returns: Dictionary with all vulnerabilities found """ from src.workflows.web_vuln_scan import run_web_vuln_scan result = await run_web_vuln_scan(url, scan_level) return result @mcp.tool() async def api_security_test( api_url: str, documentation_url: str = "" ) -> dict: """ Comprehensive API security testing workflow Args: api_url: Base API URL documentation_url: Optional Swagger/OpenAPI documentation URL Returns: Dictionary with API security findings """ from src.workflows.api_testing import run_api_security_test result = await run_api_security_test(api_url, documentation_url) return result @mcp.tool() async def generate_report( results: dict, format: str = "markdown", output_file: str = "" ) -> dict: """ Generate professional bug bounty report Args: results: Dictionary with scan results format: Report format (markdown, html, json, pdf) output_file: Output file path (optional) Returns: Dictionary with report location and summary """ from src.core.reporter import generate_bug_bounty_report result = await generate_bug_bounty_report(results, format, output_file) return result @mcp.tool() async def validate_tools( ) -> dict: """ Validate that all required security tools are installed Returns: Dictionary with tool installation status """ from src.utils.validators import validate_tool_installation result = await validate_tool_installation() return result if __name__ == "__main__": mcp.run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MauricioDuarte100/BugBountyMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server