import logging
import asyncio
from typing import Dict, Any, List
from .recon.smart_recon import SmartRecon
from .recon.cloud_mapper import CloudMapper
from .scanning.smart_scanner import SmartScanner
from .exploitation.web_exploits import WebExploiter
from .analysis.context_analyzer import ContextAnalyzer
from .exploitation.chain_reactor import ChainReactor
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("Orchestrator")
class Orchestrator:
"""
The Brain of the Unified Bug Bounty MCP.
Orchestrates complex workflows by combining specialized modules.
"""
def __init__(self):
self.recon = SmartRecon()
self.cloud = CloudMapper()
self.scanner = SmartScanner()
self.exploiter = WebExploiter()
self.analyzer = ContextAnalyzer()
self.reactor = ChainReactor()
async def analyze_attack_surface(self, file_path: str) -> Dict[str, Any]:
"""
Ingests recon data (URLs) from a file and identifies high-value targets.
"""
try:
with open(file_path, 'r') as f:
urls = [line.strip() for line in f if line.strip()]
logger.info(f"🧠 Analyzing {len(urls)} URLs from {file_path}")
return self.analyzer.analyze_urls(urls)
except Exception as e:
return {"error": f"Failed to read file: {str(e)}"}
async def suggest_p1_chains(self, finding_type: str) -> Dict[str, Any]:
"""
Suggests critical attack chains based on a finding.
"""
return await self.reactor.suggest_chains(finding_type, "")
async def run_ultimate_recon(self, target: str) -> Dict[str, Any]:
"""
Executes a 'God Level' reconnaissance workflow.
Combines Subfinder, Amass, Puredns, Permutations, and HTTPx.
"""
logger.info(f"🚀 Starting ULTIMATE RECON on {target}")
# 1. Passive & Active Subdomain Enumeration
subdomains = await self.recon.enumerate_subdomains(target)
# 2. DNS Resolution & Validation (Puredns logic)
live_subdomains = await self.recon.validate_dns(subdomains)
# 3. HTTP Probing (HTTPx with tech detect)
http_assets = await self.recon.probe_http(live_subdomains)
return {
"target": target,
"subdomains_found": len(subdomains),
"live_subdomains": len(live_subdomains),
"http_assets": http_assets
}
async def run_cloud_intelligence(self, target: str) -> Dict[str, Any]:
"""
Executes Cloud Asset Discovery (Elite-Recon Logic).
Checks for CNAME takeovers, S3 buckets, and JS secrets.
"""
logger.info(f"☁️ Starting CLOUD INTELLIGENCE on {target}")
results = {
"cname_takeovers": await self.cloud.check_cname_takeovers(target),
"open_buckets": await self.cloud.scan_buckets(target),
"js_secrets": await self.cloud.extract_js_secrets(target)
}
return results
async def run_smart_vuln_scan(self, target: str) -> Dict[str, Any]:
"""
Executes an Intelligent Vulnerability Scan.
Adapts strategy based on WAF detection.
"""
logger.info(f"🛡️ Starting SMART VULN SCAN on {target}")
# 1. WAF Detection
waf_info = await self.scanner.detect_waf(target)
# 2. Adaptive Scanning
if waf_info['detected']:
logger.warning(f"⚠️ WAF Detected: {waf_info['name']}. Engaging EVASION mode.")
scan_results = await self.scanner.scan_with_evasion(target)
else:
logger.info("✅ No WAF detected. Engaging AGGRESSIVE mode.")
scan_results = await self.scanner.scan_aggressive(target)
return scan_results
async def run_web_app_attack(self, url: str) -> Dict[str, Any]:
"""
Executes focused Web Application Exploitation.
SQLi, XSS, SSRF, IDOR.
"""
logger.info(f"⚔️ Starting WEB ATTACK on {url}")
return await self.exploiter.attack_all(url)