#!/usr/bin/env python3
"""
Bug Bounty Hunter MCP Server
Professional MCP server with 100+ security tools for bug bounty hunting
"""
import asyncio
import os
import sys
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent))
from fastmcp import FastMCP
from src.core.tool_manager import ToolManager
from src.utils.validators import validate_url, validate_domain
from src.utils.helpers import run_command, parse_output
# Initialize MCP server
mcp = FastMCP("Bug Bounty Hunter MCP")
# Initialize tool manager
tool_manager = ToolManager()
# ============================================================================
# RECONNAISSANCE TOOLS
# ============================================================================
@mcp.tool()
async def subdomain_enumeration(
domain: str,
tools: list[str] = ["subfinder", "amass", "assetfinder"],
passive_only: bool = False
) -> dict:
"""
Enumerate subdomains using multiple tools (subfinder, amass, assetfinder, etc.)
Args:
domain: Target domain (e.g., example.com)
tools: List of tools to use (subfinder, amass, assetfinder, sublist3r)
passive_only: Only use passive methods (no active DNS queries)
Returns:
Dictionary with subdomains found by each tool
"""
from src.tools.recon.subdomain_enum import enumerate_subdomains
if not validate_domain(domain):
return {"error": "Invalid domain format"}
result = await enumerate_subdomains(domain, tools, passive_only)
return result
@mcp.tool()
async def port_scan(
target: str,
ports: str = "top-1000",
scan_type: str = "quick",
service_detection: bool = True
) -> dict:
"""
Scan ports using nmap/masscan/naabu
Args:
target: Target IP/domain or CIDR range
ports: Port specification (top-1000, all, 1-65535, or custom)
scan_type: quick, full, stealth, or aggressive
service_detection: Enable service/version detection
Returns:
Dictionary with open ports and services
"""
from src.tools.recon.port_scan import scan_ports
result = await scan_ports(target, ports, scan_type, service_detection)
return result
@mcp.tool()
async def http_probe(
targets: list[str],
check_title: bool = True,
check_status: bool = True,
check_tech: bool = True,
threads: int = 50
) -> dict:
"""
Probe HTTP/HTTPS services using httpx
Args:
targets: List of domains/IPs to probe
check_title: Extract page titles
check_status: Check HTTP status codes
check_tech: Detect technologies (Wappalyzer)
threads: Number of concurrent threads
Returns:
Dictionary with live hosts and their details
"""
from src.tools.recon.http_probe import probe_http
result = await probe_http(targets, check_title, check_status, check_tech, threads)
return result
@mcp.tool()
async def dns_enumeration(
domain: str,
record_types: list[str] = ["A", "AAAA", "CNAME", "MX", "NS", "TXT"],
bruteforce: bool = False
) -> dict:
"""
Comprehensive DNS enumeration
Args:
domain: Target domain
record_types: DNS record types to query
bruteforce: Enable DNS bruteforce with wordlist
Returns:
Dictionary with DNS records
"""
from src.tools.recon.dns_enum import enumerate_dns
result = await enumerate_dns(domain, record_types, bruteforce)
return result
@mcp.tool()
async def technology_detection(
url: str,
detailed: bool = True
) -> dict:
"""
Detect technologies used by target (CMS, frameworks, servers, etc.)
Args:
url: Target URL
detailed: Include detailed version information
Returns:
Dictionary with detected technologies
"""
from src.tools.recon.tech_detect import detect_technologies
if not validate_url(url):
return {"error": "Invalid URL format"}
result = await detect_technologies(url, detailed)
return result
@mcp.tool()
async def wayback_urls(
domain: str,
filter_extensions: list[str] = [],
match_pattern: str = ""
) -> dict:
"""
Fetch URLs from Wayback Machine and other archives
Args:
domain: Target domain
filter_extensions: File extensions to filter (js, php, asp, etc.)
match_pattern: Regex pattern to match URLs
Returns:
List of archived URLs
"""
from src.tools.recon.wayback import fetch_wayback_urls
result = await fetch_wayback_urls(domain, filter_extensions, match_pattern)
return result
# ============================================================================
# WEB CRAWLING & SPIDERING
# ============================================================================
@mcp.tool()
async def web_crawler(
url: str,
depth: int = 3,
include_js: bool = True,
extract_endpoints: bool = True
) -> dict:
"""
Crawl website and extract URLs, endpoints, and parameters
Args:
url: Starting URL
depth: Crawling depth
include_js: Parse JavaScript files for endpoints
extract_endpoints: Extract API endpoints
Returns:
Dictionary with discovered URLs and endpoints
"""
from src.tools.recon.crawler import crawl_website
result = await crawl_website(url, depth, include_js, extract_endpoints)
return result
@mcp.tool()
async def javascript_analysis(
url: str,
extract_endpoints: bool = True,
extract_secrets: bool = True
) -> dict:
"""
Analyze JavaScript files for endpoints, secrets, and vulnerabilities
Args:
url: Target URL or JS file
extract_endpoints: Extract API endpoints
extract_secrets: Look for hardcoded secrets (API keys, tokens)
Returns:
Dictionary with findings
"""
from src.tools.recon.js_analysis import analyze_javascript
result = await analyze_javascript(url, extract_endpoints, extract_secrets)
return result
@mcp.tool()
async def parameter_discovery(
url: str,
method: str = "all"
) -> dict:
"""
Discover hidden parameters using various techniques
Args:
url: Target URL
method: Discovery method (all, arjun, paramspider, wordlist)
Returns:
List of discovered parameters
"""
from src.tools.recon.param_discovery import discover_parameters
result = await discover_parameters(url, method)
return result
# ============================================================================
# VULNERABILITY SCANNING
# ============================================================================
@mcp.tool()
async def nuclei_scan(
target: str,
templates: list[str] = ["all"],
severity: list[str] = ["critical", "high", "medium"],
rate_limit: int = 150
) -> dict:
"""
Run Nuclei vulnerability scanner with templates
Args:
target: Target URL or list file
templates: Template categories (cves, exposures, misconfigurations, etc.)
severity: Filter by severity levels
rate_limit: Requests per second
Returns:
Dictionary with vulnerabilities found
"""
from src.tools.vuln_scan.nuclei_runner import run_nuclei
result = await run_nuclei(target, templates, severity, rate_limit)
return result
@mcp.tool()
async def xss_scanner(
url: str,
parameters: list[str] = [],
payload_type: str = "all"
) -> dict:
"""
Scan for XSS vulnerabilities (reflected, stored, DOM-based)
Args:
url: Target URL
parameters: Specific parameters to test (empty = auto-detect)
payload_type: Type of payloads (reflected, dom, stored, all)
Returns:
Dictionary with XSS vulnerabilities found
"""
from src.tools.vuln_scan.xss_scanner import scan_xss
result = await scan_xss(url, parameters, payload_type)
return result
@mcp.tool()
async def sql_injection_scan(
url: str,
parameters: list[str] = [],
dbms: str = "auto",
level: int = 3,
risk: int = 2
) -> dict:
"""
Test for SQL injection vulnerabilities using sqlmap
Args:
url: Target URL
parameters: Parameters to test
dbms: Database type (auto, mysql, mssql, postgresql, oracle)
level: Test level (1-5)
risk: Risk level (1-3)
Returns:
Dictionary with SQL injection findings
"""
from src.tools.vuln_scan.sqli_scanner import scan_sqli
result = await scan_sqli(url, parameters, dbms, level, risk)
return result
@mcp.tool()
async def ssrf_scanner(
url: str,
payloads: str = "auto",
callback_url: str = ""
) -> dict:
"""
Test for SSRF (Server-Side Request Forgery) vulnerabilities
Args:
url: Target URL with injectable parameter
payloads: Payload type (auto, cloud-metadata, internal-services)
callback_url: Your callback URL for OOB testing (e.g., Burp Collaborator)
Returns:
Dictionary with SSRF findings
"""
from src.tools.vuln_scan.ssrf_scanner import scan_ssrf
result = await scan_ssrf(url, payloads, callback_url)
return result
@mcp.tool()
async def cors_misconfiguration(
url: str
) -> dict:
"""
Test for CORS misconfigurations
Args:
url: Target URL
Returns:
Dictionary with CORS issues found
"""
from src.tools.vuln_scan.cors_scanner import scan_cors
result = await scan_cors(url)
return result
# ============================================================================
# FUZZING & BRUTE-FORCE
# ============================================================================
@mcp.tool()
async def directory_fuzzing(
url: str,
wordlist: str = "common",
extensions: list[str] = ["php", "html", "js", "txt"],
threads: int = 50,
recursion_depth: int = 2
) -> dict:
"""
Fuzz directories and files using ffuf/gobuster
Args:
url: Target base URL
wordlist: Wordlist to use (common, medium, large, or custom path)
extensions: File extensions to append
threads: Number of concurrent threads
recursion_depth: How deep to recurse
Returns:
Dictionary with discovered paths
"""
from src.tools.fuzzing.directory_fuzz import fuzz_directories
result = await fuzz_directories(url, wordlist, extensions, threads, recursion_depth)
return result
@mcp.tool()
async def parameter_fuzzing(
url: str,
wordlist: str = "parameters",
method: str = "GET",
filter_status: list[int] = [404]
) -> dict:
"""
Fuzz for hidden parameters
Args:
url: Target URL
wordlist: Parameter wordlist (parameters, custom path)
method: HTTP method (GET, POST)
filter_status: Status codes to filter out
Returns:
Dictionary with discovered parameters
"""
from src.tools.fuzzing.param_fuzz import fuzz_parameters
result = await fuzz_parameters(url, wordlist, method, filter_status)
return result
@mcp.tool()
async def subdomain_bruteforce(
domain: str,
wordlist: str = "common",
resolver: str = "8.8.8.8",
threads: int = 100
) -> dict:
"""
Bruteforce subdomains using DNS resolution
Args:
domain: Target domain
wordlist: Subdomain wordlist (common, large, or custom path)
resolver: DNS resolver to use
threads: Number of concurrent threads
Returns:
List of valid subdomains
"""
from src.tools.fuzzing.subdomain_bruteforce import bruteforce_subdomains
result = await bruteforce_subdomains(domain, wordlist, resolver, threads)
return result
@mcp.tool()
async def vhost_fuzzing(
ip: str,
base_domain: str,
wordlist: str = "vhosts",
filter_length: bool = True
) -> dict:
"""
Fuzz for virtual hosts
Args:
ip: Target IP address
base_domain: Base domain for Host header
wordlist: VHost wordlist
filter_length: Filter by response length
Returns:
Dictionary with discovered vhosts
"""
from src.tools.fuzzing.vhost_fuzz import fuzz_vhosts
result = await fuzz_vhosts(ip, base_domain, wordlist, filter_length)
return result
# ============================================================================
# API TESTING
# ============================================================================
@mcp.tool()
async def api_discovery(
url: str,
crawl: bool = True
) -> dict:
"""
Discover API endpoints and documentation
Args:
url: Target base URL
crawl: Crawl for API endpoints
Returns:
Dictionary with discovered APIs
"""
from src.tools.api.api_discovery import discover_apis
result = await discover_apis(url, crawl)
return result
@mcp.tool()
async def swagger_parser(
url: str
) -> dict:
"""
Parse and analyze Swagger/OpenAPI documentation
Args:
url: URL to Swagger JSON/YAML
Returns:
Dictionary with parsed API endpoints and schemas
"""
from src.tools.api.swagger_parser import parse_swagger
result = await parse_swagger(url)
return result
@mcp.tool()
async def graphql_testing(
url: str,
introspection: bool = True,
test_mutations: bool = True
) -> dict:
"""
Test GraphQL endpoints for vulnerabilities
Args:
url: GraphQL endpoint URL
introspection: Attempt introspection query
test_mutations: Test for dangerous mutations
Returns:
Dictionary with GraphQL findings
"""
from src.tools.api.graphql_tester import test_graphql
result = await test_graphql(url, introspection, test_mutations)
return result
@mcp.tool()
async def api_rate_limit_test(
url: str,
requests: int = 100,
concurrent: int = 10
) -> dict:
"""
Test API rate limiting
Args:
url: Target API endpoint
requests: Total requests to send
concurrent: Concurrent requests
Returns:
Dictionary with rate limit findings
"""
from src.tools.api.rate_limit import test_rate_limit
result = await test_rate_limit(url, requests, concurrent)
return result
# ============================================================================
# INJECTION ATTACKS
# ============================================================================
@mcp.tool()
async def command_injection_test(
url: str,
parameters: list[str] = [],
callback_url: str = ""
) -> dict:
"""
Test for command injection vulnerabilities
Args:
url: Target URL
parameters: Parameters to test
callback_url: Callback URL for OOB detection
Returns:
Dictionary with command injection findings
"""
from src.tools.injection.command_injection import test_command_injection
result = await test_command_injection(url, parameters, callback_url)
return result
@mcp.tool()
async def xxe_injection_test(
url: str,
method: str = "POST"
) -> dict:
"""
Test for XXE (XML External Entity) injection
Args:
url: Target URL accepting XML
method: HTTP method
Returns:
Dictionary with XXE findings
"""
from src.tools.injection.xxe_injection import test_xxe
result = await test_xxe(url, method)
return result
@mcp.tool()
async def ssti_scanner(
url: str,
parameters: list[str] = [],
template_engines: list[str] = ["all"]
) -> dict:
"""
Test for SSTI (Server-Side Template Injection)
Args:
url: Target URL
parameters: Parameters to test
template_engines: Engines to test (jinja2, twig, freemarker, velocity, all)
Returns:
Dictionary with SSTI findings
"""
from src.tools.injection.ssti_scanner import scan_ssti
result = await scan_ssti(url, parameters, template_engines)
return result
@mcp.tool()
async def ldap_injection_test(
url: str,
parameters: list[str] = []
) -> dict:
"""
Test for LDAP injection vulnerabilities
Args:
url: Target URL
parameters: Parameters to test
Returns:
Dictionary with LDAP injection findings
"""
from src.tools.injection.ldap_injection import test_ldap_injection
result = await test_ldap_injection(url, parameters)
return result
@mcp.tool()
async def nosql_injection_test(
url: str,
parameters: list[str] = [],
database: str = "auto"
) -> dict:
"""
Test for NoSQL injection vulnerabilities
Args:
url: Target URL
parameters: Parameters to test
database: Database type (auto, mongodb, couchdb, etc.)
Returns:
Dictionary with NoSQL injection findings
"""
from src.tools.injection.nosql_injection import test_nosql_injection
result = await test_nosql_injection(url, parameters, database)
return result
# ============================================================================
# ACCESS CONTROL
# ============================================================================
@mcp.tool()
async def idor_scanner(
url: str,
parameter: str,
min_value: int = 1,
max_value: int = 1000,
step: int = 1
) -> dict:
"""
Test for IDOR (Insecure Direct Object Reference) vulnerabilities
Args:
url: Target URL with ID parameter
parameter: Parameter name to fuzz
min_value: Starting value
max_value: Ending value
step: Increment step
Returns:
Dictionary with IDOR findings
"""
from src.tools.access_control.idor_scanner import scan_idor
result = await scan_idor(url, parameter, min_value, max_value, step)
return result
@mcp.tool()
async def path_traversal_test(
url: str,
parameters: list[str] = [],
os_type: str = "auto"
) -> dict:
"""
Test for path traversal/directory traversal vulnerabilities
Args:
url: Target URL
parameters: Parameters to test
os_type: Operating system (auto, linux, windows)
Returns:
Dictionary with path traversal findings
"""
from src.tools.access_control.path_traversal import test_path_traversal
result = await test_path_traversal(url, parameters, os_type)
return result
@mcp.tool()
async def lfi_rfi_scanner(
url: str,
parameters: list[str] = [],
test_rfi: bool = True
) -> dict:
"""
Test for LFI (Local File Inclusion) and RFI (Remote File Inclusion)
Args:
url: Target URL
parameters: Parameters to test
test_rfi: Also test for RFI
Returns:
Dictionary with LFI/RFI findings
"""
from src.tools.access_control.lfi_rfi_scanner import scan_lfi_rfi
result = await scan_lfi_rfi(url, parameters, test_rfi)
return result
# ============================================================================
# AUTHENTICATION & SESSION
# ============================================================================
@mcp.tool()
async def jwt_analyzer(
token: str,
wordlist: str = ""
) -> dict:
"""
Analyze and test JWT tokens for vulnerabilities
Args:
token: JWT token to analyze
wordlist: Wordlist for secret bruteforce (optional)
Returns:
Dictionary with JWT analysis and vulnerabilities
"""
from src.tools.auth.jwt_analyzer import analyze_jwt
result = await analyze_jwt(token, wordlist)
return result
@mcp.tool()
async def session_analysis(
url: str,
cookie: str = ""
) -> dict:
"""
Analyze session management security
Args:
url: Target URL
cookie: Session cookie to analyze
Returns:
Dictionary with session security findings
"""
from src.tools.auth.session_analyzer import analyze_session
result = await analyze_session(url, cookie)
return result
@mcp.tool()
async def oauth_tester(
client_id: str,
redirect_uri: str,
authorization_endpoint: str
) -> dict:
"""
Test OAuth implementation for vulnerabilities
Args:
client_id: OAuth client ID
redirect_uri: Redirect URI
authorization_endpoint: Authorization endpoint URL
Returns:
Dictionary with OAuth vulnerabilities
"""
from src.tools.auth.oauth_tester import test_oauth
result = await test_oauth(client_id, redirect_uri, authorization_endpoint)
return result
# ============================================================================
# CLOUD SECURITY
# ============================================================================
@mcp.tool()
async def s3_bucket_scanner(
domain: str,
permutations: bool = True
) -> dict:
"""
Scan for AWS S3 buckets and test permissions
Args:
domain: Target domain
permutations: Generate bucket name permutations
Returns:
Dictionary with S3 buckets found and their permissions
"""
from src.tools.cloud.s3_scanner import scan_s3_buckets
result = await scan_s3_buckets(domain, permutations)
return result
@mcp.tool()
async def subdomain_takeover_check(
subdomains: list[str]
) -> dict:
"""
Check for subdomain takeover vulnerabilities
Args:
subdomains: List of subdomains to check
Returns:
Dictionary with vulnerable subdomains
"""
from src.tools.cloud.takeover_checker import check_takeover
result = await check_takeover(subdomains)
return result
@mcp.tool()
async def cloud_metadata_test(
url: str
) -> dict:
"""
Test for cloud metadata endpoints (AWS, Azure, GCP)
Args:
url: Target URL with potential SSRF
Returns:
Dictionary with metadata access findings
"""
from src.tools.cloud.metadata_tester import test_metadata
result = await test_metadata(url)
return result
# ============================================================================
# CONTENT DISCOVERY
# ============================================================================
@mcp.tool()
async def sensitive_file_scanner(
url: str,
check_backups: bool = True,
check_configs: bool = True,
check_git: bool = True
) -> dict:
"""
Scan for sensitive files and exposures
Args:
url: Target base URL
check_backups: Check for backup files (.bak, .old, etc.)
check_configs: Check for config files
check_git: Check for .git exposure
Returns:
Dictionary with sensitive files found
"""
from src.tools.content.sensitive_files import scan_sensitive_files
result = await scan_sensitive_files(url, check_backups, check_configs, check_git)
return result
@mcp.tool()
async def git_exposure_scanner(
url: str,
dump: bool = True
) -> dict:
"""
Scan and dump exposed .git directories
Args:
url: Target URL
dump: Attempt to dump git repository
Returns:
Dictionary with git exposure details
"""
from src.tools.content.git_scanner import scan_git_exposure
result = await scan_git_exposure(url, dump)
return result
@mcp.tool()
async def robots_sitemap_analyzer(
url: str
) -> dict:
"""
Analyze robots.txt and sitemap.xml for interesting paths
Args:
url: Target base URL
Returns:
Dictionary with paths from robots.txt and sitemap
"""
from src.tools.content.robots_analyzer import analyze_robots_sitemap
result = await analyze_robots_sitemap(url)
return result
# ============================================================================
# SSL/TLS TESTING
# ============================================================================
@mcp.tool()
async def ssl_tls_scanner(
domain: str,
port: int = 443
) -> dict:
"""
Comprehensive SSL/TLS security testing
Args:
domain: Target domain
port: HTTPS port (default 443)
Returns:
Dictionary with SSL/TLS findings
"""
from src.tools.ssl.ssl_scanner import scan_ssl_tls
result = await scan_ssl_tls(domain, port)
return result
@mcp.tool()
async def certificate_transparency(
domain: str
) -> dict:
"""
Query certificate transparency logs for subdomains
Args:
domain: Target domain
Returns:
List of subdomains from CT logs
"""
from src.tools.ssl.ct_logs import query_ct_logs
result = await query_ct_logs(domain)
return result
# ============================================================================
# WORKFLOWS & AUTOMATION
# ============================================================================
@mcp.tool()
async def full_reconnaissance(
domain: str,
deep_scan: bool = False
) -> dict:
"""
Complete reconnaissance workflow combining multiple tools
Args:
domain: Target domain
deep_scan: Enable deep scanning (slower but more thorough)
Returns:
Comprehensive reconnaissance report
"""
from src.workflows.full_recon import run_full_recon
result = await run_full_recon(domain, deep_scan)
return result
@mcp.tool()
async def web_vulnerability_scan(
url: str,
scan_level: str = "medium"
) -> dict:
"""
Automated web vulnerability scanning workflow
Args:
url: Target URL
scan_level: Scan intensity (quick, medium, thorough)
Returns:
Dictionary with all vulnerabilities found
"""
from src.workflows.web_vuln_scan import run_web_vuln_scan
result = await run_web_vuln_scan(url, scan_level)
return result
@mcp.tool()
async def api_security_test(
api_url: str,
documentation_url: str = ""
) -> dict:
"""
Comprehensive API security testing workflow
Args:
api_url: Base API URL
documentation_url: Optional Swagger/OpenAPI documentation URL
Returns:
Dictionary with API security findings
"""
from src.workflows.api_testing import run_api_security_test
result = await run_api_security_test(api_url, documentation_url)
return result
@mcp.tool()
async def generate_report(
results: dict,
format: str = "markdown",
output_file: str = ""
) -> dict:
"""
Generate professional bug bounty report
Args:
results: Dictionary with scan results
format: Report format (markdown, html, json, pdf)
output_file: Output file path (optional)
Returns:
Dictionary with report location and summary
"""
from src.core.reporter import generate_bug_bounty_report
result = await generate_bug_bounty_report(results, format, output_file)
return result
@mcp.tool()
async def validate_tools(
) -> dict:
"""
Validate that all required security tools are installed
Returns:
Dictionary with tool installation status
"""
from src.utils.validators import validate_tool_installation
result = await validate_tool_installation()
return result
if __name__ == "__main__":
mcp.run()