"""Main MCP server for bug bounty hunting."""
import asyncio
import logging
from pathlib import Path
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from .config import ConfigManager
from .storage.database import DatabaseManager
from .storage.cache import CacheManager
from .utils.executor import ToolExecutor
from .tools.management import ManagementTools
from .tools.recon import ReconTools
from .tools.active_recon import ActiveReconTools
from .tools.scanning import ScanningTools
from .tools.fuzzing import FuzzingTools
from .tools.reporting import ReportingTools
from .tools.proxy import ProxyTools
from .tools.advanced_scanning import AdvancedScanningTools
from .tools.cloud_security import CloudSecurityTools
from .tools.phase2_tools import Phase2Tools
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Create MCP server instance
app = Server("bugbounty-mcp-server")
# Global instances (initialized in main)
config: ConfigManager
db: DatabaseManager
cache: CacheManager
executor: ToolExecutor
management_tools: ManagementTools
recon_tools: ReconTools
active_recon_tools: ActiveReconTools
scanning_tools: ScanningTools
fuzzing_tools: FuzzingTools
reporting_tools: ReportingTools
proxy_tools: ProxyTools
advanced_scanning_tools: AdvancedScanningTools
cloud_security_tools: CloudSecurityTools
phase2_tools: Phase2Tools
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List all available tools."""
return [
# Program Management Tools
Tool(
name="add_program",
description="Add a new bug bounty program with scope configuration",
inputSchema={
"type": "object",
"properties": {
"program_name": {"type": "string", "description": "Name of the program"},
"platform": {
"type": "string",
"enum": ["hackerone", "bugcrowd", "intigriti", "yeswehack", "custom"],
"description": "Bug bounty platform"
},
"scope_domains": {
"type": "array",
"items": {"type": "string"},
"description": "In-scope domains (supports wildcards like *.example.com)"
},
"scope_ips": {
"type": "array",
"items": {"type": "string"},
"description": "In-scope IP addresses or CIDR ranges"
},
"out_of_scope": {
"type": "array",
"items": {"type": "string"},
"description": "Out-of-scope targets"
},
"url": {"type": "string", "description": "Program URL"},
"api_token": {"type": "string", "description": "API token for platform"},
},
"required": ["program_name", "platform", "scope_domains"]
}
),
Tool(
name="list_programs",
description="List all configured bug bounty programs",
inputSchema={
"type": "object",
"properties": {
"platform": {
"type": "string",
"description": "Filter by platform (optional)"
},
"enrolled_only": {
"type": "boolean",
"description": "Only show enrolled programs",
"default": False
}
}
}
),
Tool(
name="get_program_scope",
description="Get detailed scope information for a program",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"}
},
"required": ["program_id"]
}
),
Tool(
name="validate_target",
description="Validate if a target is in scope for a program (CRITICAL: Use before any testing)",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"target": {"type": "string", "description": "Target to validate (domain, IP, or URL)"}
},
"required": ["program_id", "target"]
}
),
# Reconnaissance Tools
Tool(
name="subdomain_enum",
description="Enumerate subdomains for a domain (automatically filters to in-scope only)",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"domain": {"type": "string", "description": "Domain to enumerate"},
"method": {
"type": "string",
"enum": ["passive", "active", "all"],
"default": "all",
"description": "Enumeration method"
}
},
"required": ["program_id", "domain"]
}
),
Tool(
name="port_scan",
description="Perform port scanning on a target",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"target": {"type": "string", "description": "Target IP or domain"},
"scan_type": {
"type": "string",
"enum": ["quick", "full", "custom"],
"default": "quick",
"description": "Type of scan"
},
"ports": {
"type": "array",
"items": {"type": "number"},
"description": "Specific ports for custom scan"
}
},
"required": ["program_id", "target"]
}
),
Tool(
name="technology_detection",
description="Detect web technologies and frameworks used on a website",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to analyze"}
},
"required": ["url"]
}
),
Tool(
name="dns_enumeration",
description="Enumerate DNS records for a domain",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"domain": {"type": "string", "description": "Domain to enumerate"}
},
"required": ["program_id", "domain"]
}
),
# Active Reconnaissance Tools (Advanced)
Tool(
name="advanced_subdomain_enum",
description="Advanced subdomain enumeration using amass",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"domain": {"type": "string", "description": "Domain to enumerate"},
"mode": {
"type": "string",
"enum": ["passive", "active", "hybrid"],
"default": "passive",
"description": "Enumeration mode"
},
"wordlist": {"type": "string", "description": "Custom wordlist path (optional)"}
},
"required": ["program_id", "domain"]
}
),
Tool(
name="web_crawl",
description="Crawl website to discover URLs, endpoints, and parameters",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"url": {"type": "string", "description": "Base URL to crawl"},
"depth": {"type": "number", "default": 3, "description": "Crawl depth"},
"max_pages": {"type": "number", "default": 500, "description": "Maximum pages to crawl"},
"js_analysis": {"type": "boolean", "default": True, "description": "Enable JavaScript analysis"}
},
"required": ["program_id", "url"]
}
),
Tool(
name="network_scan",
description="Fast network scanning using masscan",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"cidr": {"type": "string", "description": "CIDR range to scan"},
"ports": {"type": "string", "default": "top-100", "description": "Ports to scan"},
"rate": {"type": "number", "default": 1000, "description": "Packets per second"}
},
"required": ["program_id", "cidr"]
}
),
Tool(
name="api_discovery",
description="Discover and enumerate API endpoints",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"base_url": {"type": "string", "description": "Base API URL"},
"api_type": {
"type": "string",
"enum": ["rest", "graphql", "soap", "auto"],
"default": "auto",
"description": "API type"
}
},
"required": ["program_id", "base_url"]
}
),
Tool(
name="screenshot_recon",
description="Take screenshots of URLs for visual analysis",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"urls": {
"type": "array",
"items": {"type": "string"},
"description": "List of URLs to screenshot"
},
"resolution": {"type": "string", "default": "1440x900", "description": "Screenshot resolution"}
},
"required": ["program_id", "urls"]
}
),
Tool(
name="git_recon",
description="Discover Git repositories and potential secrets",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"company_name": {"type": "string", "description": "Company name to search"},
"scan_repos": {"type": "boolean", "default": True, "description": "Scan discovered repos for secrets"}
},
"required": ["program_id", "company_name"]
}
),
Tool(
name="cloud_asset_enum",
description="Enumerate cloud assets for a company",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"company_name": {"type": "string", "description": "Company name"},
"clouds": {
"type": "array",
"items": {"type": "string"},
"description": "Cloud providers to check (aws/azure/gcp)"
}
},
"required": ["program_id", "company_name"]
}
),
Tool(
name="cert_transparency_search",
description="Search certificate transparency logs",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"domain": {"type": "string", "description": "Domain to search"},
"days_back": {"type": "number", "default": 30, "description": "Days to search back"}
},
"required": ["program_id", "domain"]
}
),
Tool(
name="email_harvest",
description="Harvest email addresses and employee information",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"domain": {"type": "string", "description": "Domain to search"},
"sources": {
"type": "array",
"items": {"type": "string"},
"description": "Sources to use (google/linkedin/hunter)"
}
},
"required": ["program_id", "domain"]
}
),
Tool(
name="ldap_enum",
description="Enumerate LDAP/Active Directory information",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"target": {"type": "string", "description": "LDAP server target"},
"port": {"type": "number", "default": 389, "description": "LDAP port"},
"auth": {
"type": "object",
"description": "Authentication credentials (optional)"
}
},
"required": ["program_id", "target"]
}
),
# Vulnerability Scanning Tools
Tool(
name="nuclei_scan",
description="Run Nuclei vulnerability scanner with various templates",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"target": {"type": "string", "description": "Target URL or host"},
"severity_filter": {
"type": "array",
"items": {
"type": "string",
"enum": ["critical", "high", "medium", "low", "info"]
},
"description": "Filter by severity levels"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by tags (e.g., 'xss', 'sqli', 'rce')"
}
},
"required": ["program_id", "target"]
}
),
Tool(
name="xss_scan",
description="Scan for Cross-Site Scripting (XSS) vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"url": {"type": "string", "description": "Target URL"},
"payload_list": {"type": "string", "description": "Custom payload list file"}
},
"required": ["program_id", "url"]
}
),
Tool(
name="ssl_analysis",
description="Analyze SSL/TLS configuration for vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"domain": {"type": "string", "description": "Target domain"}
},
"required": ["program_id", "domain"]
}
),
# Fuzzing Tools
Tool(
name="path_fuzzing",
description="Fuzz for hidden paths and files",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"base_url": {"type": "string", "description": "Base URL to fuzz"},
"wordlist": {
"type": "string",
"default": "common.txt",
"description": "Wordlist file name"
},
"extensions": {
"type": "array",
"items": {"type": "string"},
"description": "File extensions to try (e.g., ['php', 'asp', 'jsp'])"
}
},
"required": ["program_id", "base_url"]
}
),
Tool(
name="parameter_fuzzing",
description="Fuzz for hidden parameters in requests",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"url": {"type": "string", "description": "Target URL"},
"wordlist": {
"type": "string",
"default": "parameters.txt",
"description": "Parameter wordlist"
},
"method": {
"type": "string",
"enum": ["GET", "POST"],
"default": "GET",
"description": "HTTP method"
}
},
"required": ["program_id", "url"]
}
),
# Proxy/Traffic Interception Tools
Tool(
name="start_traffic_intercept",
description="Start mitmproxy/mitmdump for traffic interception and analysis",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"listen_host": {
"type": "string",
"default": "127.0.0.1",
"description": "Host to listen on"
},
"listen_port": {
"type": "number",
"default": 8080,
"description": "Port to listen on"
},
"mode": {
"type": "string",
"enum": ["regular", "transparent", "reverse", "upstream", "socks5"],
"default": "regular",
"description": "Proxy mode"
},
"save_flows": {
"type": "boolean",
"default": True,
"description": "Whether to save captured flows"
},
"filter_pattern": {
"type": "string",
"description": "Optional filter pattern for traffic"
}
},
"required": ["program_id"]
}
),
Tool(
name="analyze_traffic_flows",
description="Analyze captured traffic flows from mitmdump",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"flow_file": {"type": "string", "description": "Path to .mitm flow file"},
"target_filter": {
"type": "string",
"description": "Optional filter for specific targets"
}
},
"required": ["program_id", "flow_file"]
}
),
Tool(
name="extract_api_endpoints",
description="Extract API endpoints from captured traffic flows",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"flow_file": {"type": "string", "description": "Path to .mitm flow file"}
},
"required": ["program_id", "flow_file"]
}
),
# Phase 1: Advanced Scanning Tools
Tool(
name="cors_scan",
description="Scan for CORS misconfigurations and vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"url": {"type": "string", "description": "Target URL to test"},
"custom_origins": {
"type": "array",
"items": {"type": "string"},
"description": "Optional custom origins to test"
}
},
"required": ["program_id", "url"]
}
),
Tool(
name="security_headers_scan",
description="Scan for missing or weak security headers",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"url": {"type": "string", "description": "Target URL to test"}
},
"required": ["program_id", "url"]
}
),
Tool(
name="secret_scan",
description="Scan for exposed secrets and API keys",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"url": {"type": "string", "description": "Target URL to scan"},
"scan_js_files": {
"type": "boolean",
"default": True,
"description": "Whether to scan JavaScript files"
}
},
"required": ["program_id", "url"]
}
),
Tool(
name="s3_scanner",
description="Scan for S3 bucket misconfigurations",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"target": {"type": "string", "description": "Domain or bucket name to scan"},
"check_permissions": {
"type": "boolean",
"default": True,
"description": "Whether to check bucket permissions"
}
},
"required": ["program_id", "target"]
}
),
Tool(
name="jwt_analyzer",
description="Analyze JWT token for security issues",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"token": {"type": "string", "description": "JWT token to analyze"},
"url": {"type": "string", "description": "Optional URL where token was found"}
},
"required": ["program_id", "token"]
}
),
# Phase 2: Advanced Testing Tools
Tool(
name="graphql_scanner",
description="Scan GraphQL endpoint for security issues",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"url": {"type": "string", "description": "GraphQL endpoint URL"},
"test_auth": {
"type": "boolean",
"default": True,
"description": "Whether to test authentication"
}
},
"required": ["program_id", "url"]
}
),
Tool(
name="js_analyzer",
description="Analyze JavaScript files for endpoints and secrets",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"url": {"type": "string", "description": "URL to analyze"},
"extract_endpoints": {
"type": "boolean",
"default": True,
"description": "Whether to extract API endpoints"
},
"extract_secrets": {
"type": "boolean",
"default": True,
"description": "Whether to extract secrets"
}
},
"required": ["program_id", "url"]
}
),
Tool(
name="wayback_analyzer",
description="Enhanced Wayback Machine analysis for historical endpoints",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"domain": {"type": "string", "description": "Domain to analyze"},
"extract_params": {
"type": "boolean",
"default": True,
"description": "Whether to extract parameters"
}
},
"required": ["program_id", "domain"]
}
),
# Reporting Tools
Tool(
name="generate_report",
description="Generate a comprehensive vulnerability report",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"scan_ids": {
"type": "array",
"items": {"type": "string"},
"description": "List of scan IDs to include"
},
"format": {
"type": "string",
"enum": ["markdown", "json"],
"default": "markdown",
"description": "Report format"
}
},
"required": ["program_id", "scan_ids"]
}
),
Tool(
name="export_findings",
description="Export findings from a specific scan",
inputSchema={
"type": "object",
"properties": {
"scan_id": {"type": "string", "description": "Scan ID"},
"format": {
"type": "string",
"enum": ["json", "csv"],
"default": "json",
"description": "Export format"
}
},
"required": ["scan_id"]
}
),
Tool(
name="get_statistics",
description="Get statistics on scans and findings",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier (optional)"}
}
}
),
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls."""
try:
logger.info(f"Tool called: {name} with arguments: {arguments}")
result = None
# Program Management Tools
if name == "add_program":
result = await management_tools.add_program(**arguments)
elif name == "list_programs":
result = await management_tools.list_programs(**arguments)
elif name == "get_program_scope":
result = await management_tools.get_program_scope(**arguments)
elif name == "validate_target":
result = await management_tools.validate_target(**arguments)
elif name == "get_statistics":
result = await management_tools.get_statistics(**arguments)
# Reconnaissance Tools
elif name == "subdomain_enum":
result = await recon_tools.subdomain_enum(**arguments)
elif name == "port_scan":
result = await recon_tools.port_scan(**arguments)
elif name == "technology_detection":
result = await recon_tools.technology_detection(**arguments)
elif name == "dns_enumeration":
result = await recon_tools.dns_enumeration(**arguments)
# Active Reconnaissance Tools
elif name == "advanced_subdomain_enum":
result = await active_recon_tools.advanced_subdomain_enum(**arguments)
elif name == "web_crawl":
result = await active_recon_tools.web_crawl(**arguments)
elif name == "network_scan":
result = await active_recon_tools.network_scan(**arguments)
elif name == "api_discovery":
result = await active_recon_tools.api_discovery(**arguments)
elif name == "screenshot_recon":
result = await active_recon_tools.screenshot_recon(**arguments)
elif name == "git_recon":
result = await active_recon_tools.git_recon(**arguments)
elif name == "cloud_asset_enum":
result = await active_recon_tools.cloud_asset_enum(**arguments)
elif name == "cert_transparency_search":
result = await active_recon_tools.cert_transparency_search(**arguments)
elif name == "email_harvest":
result = await active_recon_tools.email_harvest(**arguments)
elif name == "ldap_enum":
result = await active_recon_tools.ldap_enum(**arguments)
# Scanning Tools
elif name == "nuclei_scan":
result = await scanning_tools.nuclei_scan(**arguments)
elif name == "xss_scan":
result = await scanning_tools.xss_scan(**arguments)
elif name == "ssl_analysis":
result = await scanning_tools.ssl_analysis(**arguments)
# Fuzzing Tools
elif name == "path_fuzzing":
result = await fuzzing_tools.path_fuzzing(**arguments)
elif name == "parameter_fuzzing":
result = await fuzzing_tools.parameter_fuzzing(**arguments)
# Proxy/Traffic Interception Tools
elif name == "start_traffic_intercept":
result = await proxy_tools.start_traffic_intercept(**arguments)
elif name == "analyze_traffic_flows":
result = await proxy_tools.analyze_traffic_flows(**arguments)
elif name == "extract_api_endpoints":
result = await proxy_tools.extract_api_endpoints(**arguments)
# Phase 1: Advanced Scanning Tools
elif name == "cors_scan":
result = await advanced_scanning_tools.cors_scan(**arguments)
elif name == "security_headers_scan":
result = await advanced_scanning_tools.security_headers_scan(**arguments)
elif name == "secret_scan":
result = await advanced_scanning_tools.secret_scan(**arguments)
# Phase 1: Cloud Security Tools
elif name == "s3_scanner":
result = await cloud_security_tools.s3_scanner(**arguments)
elif name == "jwt_analyzer":
result = await cloud_security_tools.jwt_analyzer(**arguments)
# Phase 2: Advanced Testing Tools
elif name == "graphql_scanner":
result = await phase2_tools.graphql_scanner(**arguments)
elif name == "js_analyzer":
result = await phase2_tools.js_analyzer(**arguments)
elif name == "wayback_analyzer":
result = await phase2_tools.wayback_analyzer(**arguments)
# Reporting Tools
elif name == "generate_report":
result = await reporting_tools.generate_report(**arguments)
elif name == "export_findings":
result = await reporting_tools.export_findings(**arguments)
else:
result = {"success": False, "error": f"Unknown tool: {name}"}
# Format result as JSON
import json
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
logger.error(f"Error executing tool {name}: {str(e)}", exc_info=True)
import json
error_result = {"success": False, "error": str(e)}
return [TextContent(type="text", text=json.dumps(error_result, indent=2))]
async def main():
"""Main entry point for the MCP server."""
global config, db, cache, executor
global management_tools, recon_tools, active_recon_tools, scanning_tools, fuzzing_tools, reporting_tools, proxy_tools
global advanced_scanning_tools, cloud_security_tools, phase2_tools
logger.info("Initializing Bug Bounty MCP Server...")
# Initialize core components
config = ConfigManager()
db = DatabaseManager(config.server_config.database_path)
cache = CacheManager()
# Initialize executor with optional proxy
proxy_url = config.server_config.proxy_url if config.server_config.proxy_enabled else None
executor = ToolExecutor(proxy_url=proxy_url)
# Initialize tool modules
management_tools = ManagementTools(config, db)
recon_tools = ReconTools(config, db, cache, executor)
active_recon_tools = ActiveReconTools(config, db, cache, executor)
scanning_tools = ScanningTools(config, db, executor)
fuzzing_tools = FuzzingTools(config, db, executor)
reporting_tools = ReportingTools(config, db)
proxy_tools = ProxyTools(config, db, cache, executor)
# Initialize Phase 1 & 2 tool modules
advanced_scanning_tools = AdvancedScanningTools(config, db, executor)
cloud_security_tools = CloudSecurityTools(config, db, executor)
phase2_tools = Phase2Tools(config, db, cache, executor)
logger.info("Bug Bounty MCP Server initialized successfully")
logger.info(f"Loaded {len(config.programs)} programs")
logger.info("Starting stdio server...")
# Run the stdio server
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())