"""Active reconnaissance tools for advanced information gathering."""
import uuid
import os
import json
import tempfile
from typing import Dict, Any, List, Optional
from datetime import datetime
import logging
from urllib.parse import urlparse, parse_qs
from ..models import ScanResult, ScanStatus, AuditLogEntry
from ..config import ConfigManager
from ..utils.validators import ScopeValidator
from ..utils.executor import ToolExecutor
from ..utils.parser import OutputParser
from ..storage.database import DatabaseManager
from ..storage.cache import CacheManager
logger = logging.getLogger(__name__)
class ActiveReconTools:
"""Advanced active reconnaissance tools for bug bounty hunting."""
def __init__(
self,
config: ConfigManager,
db: DatabaseManager,
cache: CacheManager,
executor: ToolExecutor,
):
"""Initialize active reconnaissance tools."""
self.config = config
self.db = db
self.cache = cache
self.executor = executor
self.parser = OutputParser()
async def advanced_subdomain_enum(
self,
program_id: str,
domain: str,
mode: str = "passive",
wordlist: Optional[str] = None,
) -> Dict[str, Any]:
"""Advanced subdomain enumeration using amass.
Args:
program_id: Program identifier
domain: Domain to enumerate
mode: Enumeration mode (passive/active/hybrid)
wordlist: Custom wordlist path (optional)
Returns:
Dictionary with discovered subdomains
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(domain)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
# Check cache
cache_key = f"amass_{program_id}_{domain}_{mode}"
cached = self.cache.get(cache_key)
if cached:
logger.info(f"Using cached amass results for {domain}")
return cached
scan_id = str(uuid.uuid4())
# Build amass command
args = ["enum", "-d", domain]
if mode == "passive":
args.append("-passive")
elif mode == "active":
args.append("-active")
# hybrid = no flag (default)
if wordlist and mode in ["active", "hybrid"]:
args.extend(["-brute", "-w", wordlist])
try:
# Execute amass
result = await self.executor.execute(
"amass",
args,
timeout=900, # 15 minutes for thorough scan
)
if result.success:
# Parse JSON output
subdomains = self.parser.parse_amass_output(result.output)
# Filter in-scope
in_scope = validator.filter_in_scope_targets(subdomains)
response = {
'success': True,
'scan_id': scan_id,
'domain': domain,
'subdomains': sorted(in_scope),
'count': len(in_scope),
'total_found': len(subdomains),
'filtered_count': len(subdomains) - len(in_scope),
'method': mode,
}
# Cache results (24 hours)
self.cache.set(cache_key, response, ttl=86400)
# Audit log
self.db.log_audit(AuditLogEntry(
action="advanced_subdomain_enum",
program_id=program_id,
target=domain,
tool="amass",
success=True,
details={'subdomains_found': len(in_scope), 'mode': mode},
))
return response
else:
return {
'success': False,
'error': 'Amass execution failed',
'details': result.errors,
}
except Exception as e:
logger.error(f"Error in advanced subdomain enum: {str(e)}")
return {'success': False, 'error': str(e), 'scan_id': scan_id}
async def web_crawl(
self,
program_id: str,
url: str,
depth: int = 3,
max_pages: int = 500,
js_analysis: bool = True,
) -> Dict[str, Any]:
"""Crawl website to discover URLs, endpoints, and parameters.
Args:
program_id: Program identifier
url: Base URL to crawl
depth: Crawl depth
max_pages: Maximum pages to crawl
js_analysis: Enable JavaScript analysis with katana
Returns:
Dictionary with discovered URLs and endpoints
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(url)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
# Check cache
cache_key = f"web_crawl_{program_id}_{url}_{depth}"
cached = self.cache.get(cache_key)
if cached:
logger.info(f"Using cached crawl results for {url}")
return cached
scan_id = str(uuid.uuid4())
all_urls = set()
all_params = set()
all_endpoints = set()
forms = []
js_files = []
try:
# Phase 1: Crawl with gospider
logger.info(f"Running gospider on {url}")
gospider_args = [
"-s", url,
"-d", str(depth),
"-c", str(max_pages),
"--json",
"--sitemap",
"--robots",
]
gospider_result = await self.executor.execute(
"gospider",
gospider_args,
timeout=600,
)
if gospider_result.success:
data = self.parser.parse_gospider_output(gospider_result.output)
all_urls.update(data.get('urls', []))
all_params.update(data.get('parameters', []))
forms.extend(data.get('forms', []))
js_files.extend(data.get('js_files', []))
# Phase 2: JS-heavy crawl with katana (if enabled)
if js_analysis:
logger.info(f"Running katana for JS analysis on {url}")
katana_args = [
"-u", url,
"-d", str(depth),
"-js-crawl",
"-jc", # JavaScript file crawling
"-json",
]
katana_result = await self.executor.execute(
"katana",
katana_args,
timeout=600,
)
if katana_result.success:
katana_data = self.parser.parse_katana_output(katana_result.output)
all_urls.update(katana_data.get('urls', []))
all_endpoints.update(katana_data.get('endpoints', []))
# Filter in-scope URLs
in_scope_urls = validator.filter_in_scope_targets(list(all_urls))
# Extract API endpoints
api_endpoints = self._extract_api_endpoints(in_scope_urls)
# Extract parameters
params = self._extract_parameters(in_scope_urls)
all_params.update(params)
response = {
'success': True,
'scan_id': scan_id,
'base_url': url,
'urls_found': len(in_scope_urls),
'urls': sorted(in_scope_urls)[:1000], # Limit output size
'api_endpoints': sorted(list(api_endpoints)),
'parameters': sorted(list(all_params)),
'forms': forms[:100],
'js_files': sorted(list(set(js_files)))[:100],
}
# Cache results (12 hours)
self.cache.set(cache_key, response, ttl=43200)
# Audit log
self.db.log_audit(AuditLogEntry(
action="web_crawl",
program_id=program_id,
target=url,
tool="gospider+katana" if js_analysis else "gospider",
success=True,
details={'urls_found': len(in_scope_urls)},
))
return response
except Exception as e:
logger.error(f"Error in web crawl: {str(e)}")
return {'success': False, 'error': str(e), 'scan_id': scan_id}
async def network_scan(
self,
program_id: str,
cidr: str,
ports: str = "top-100",
rate: int = 1000,
) -> Dict[str, Any]:
"""Fast network scanning using masscan.
Args:
program_id: Program identifier
cidr: CIDR range to scan
ports: Ports to scan
rate: Packets per second
Returns:
Dictionary with discovered hosts and open ports
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(cidr)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
scan_id = str(uuid.uuid4())
# Build masscan command
args = [cidr, "-p", ports, "--rate", str(rate), "--open", "-oJ", "-"]
try:
# Note: masscan requires root/sudo
result = await self.executor.execute(
"sudo masscan",
args,
timeout=1800, # 30 minutes for large scans
)
if result.success:
# Parse JSON output
hosts_data = self.parser.parse_masscan_output(result.output)
response = {
'success': True,
'scan_id': scan_id,
'cidr': cidr,
'hosts_found': len(hosts_data.get('hosts', [])),
'hosts': hosts_data.get('hosts', []),
'ports_scanned': ports,
'rate': rate,
}
# Audit log
self.db.log_audit(AuditLogEntry(
action="network_scan",
program_id=program_id,
target=cidr,
tool="masscan",
success=True,
details={'hosts_found': len(hosts_data.get('hosts', []))},
))
return response
else:
return {
'success': False,
'error': 'Masscan execution failed (may require sudo)',
'details': result.errors,
}
except Exception as e:
logger.error(f"Error in network scan: {str(e)}")
return {'success': False, 'error': str(e), 'scan_id': scan_id}
async def api_discovery(
self,
program_id: str,
base_url: str,
api_type: str = "auto",
) -> Dict[str, Any]:
"""Discover and enumerate API endpoints.
Args:
program_id: Program identifier
base_url: Base API URL
api_type: API type (rest/graphql/soap/auto)
Returns:
Dictionary with discovered API endpoints
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(base_url)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
scan_id = str(uuid.uuid4())
try:
endpoints = []
graphql_schema = None
# Auto-detect API type if needed
if api_type == "auto":
api_type = await self._detect_api_type(base_url)
# GraphQL discovery
if api_type in ["graphql", "auto"]:
logger.info("Attempting GraphQL introspection...")
graphql_data = await self._discover_graphql(base_url)
if graphql_data:
graphql_schema = graphql_data
# REST API discovery using path crawling
logger.info("Discovering REST endpoints...")
rest_endpoints = await self._discover_rest_endpoints(base_url)
endpoints.extend(rest_endpoints)
response = {
'success': True,
'scan_id': scan_id,
'base_url': base_url,
'api_type': api_type,
'endpoints': endpoints,
'graphql_schema': graphql_schema,
}
# Audit log
self.db.log_audit(AuditLogEntry(
action="api_discovery",
program_id=program_id,
target=base_url,
tool="custom",
success=True,
details={'endpoints_found': len(endpoints)},
))
return response
except Exception as e:
logger.error(f"Error in API discovery: {str(e)}")
return {'success': False, 'error': str(e), 'scan_id': scan_id}
async def screenshot_recon(
self,
program_id: str,
urls: List[str],
resolution: str = "1440x900",
) -> Dict[str, Any]:
"""Take screenshots of URLs for visual analysis.
Args:
program_id: Program identifier
urls: List of URLs to screenshot
resolution: Screenshot resolution
Returns:
Dictionary with screenshot information
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
# Validate all URLs
valid_urls = []
for url in urls[:50]: # Limit to 50 screenshots
if validator.validate_target(url)[0]:
valid_urls.append(url)
if not valid_urls:
return {'success': False, 'error': 'No valid in-scope URLs provided'}
scan_id = str(uuid.uuid4())
screenshot_dir = os.path.join(self.config.get_data_dir(), "screenshots", scan_id)
os.makedirs(screenshot_dir, exist_ok=True)
try:
# Create temp file with URLs
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write('\n'.join(valid_urls))
url_file = f.name
# Run gowitness
args = [
"file",
"-f", url_file,
"--screenshot-path", screenshot_dir,
"--resolution", resolution,
]
result = await self.executor.execute(
"gowitness",
args,
timeout=900, # 15 minutes
)
# Cleanup temp file
os.unlink(url_file)
if result.success:
# List generated screenshots
screenshots = []
if os.path.exists(screenshot_dir):
for filename in os.listdir(screenshot_dir):
if filename.endswith(('.png', '.jpg')):
screenshots.append({
'filename': filename,
'path': os.path.join(screenshot_dir, filename),
})
response = {
'success': True,
'scan_id': scan_id,
'urls_processed': len(valid_urls),
'screenshots_taken': len(screenshots),
'screenshot_dir': screenshot_dir,
'screenshots': screenshots,
}
# Audit log
self.db.log_audit(AuditLogEntry(
action="screenshot_recon",
program_id=program_id,
target=f"{len(valid_urls)} URLs",
tool="gowitness",
success=True,
details={'screenshots': len(screenshots)},
))
return response
else:
return {
'success': False,
'error': 'Gowitness execution failed',
'details': result.errors,
}
except Exception as e:
logger.error(f"Error in screenshot recon: {str(e)}")
return {'success': False, 'error': str(e), 'scan_id': scan_id}
async def git_recon(
self,
program_id: str,
company_name: str,
scan_repos: bool = True,
) -> Dict[str, Any]:
"""Discover Git repositories and potential secrets.
Args:
program_id: Program identifier
company_name: Company name to search
scan_repos: Scan discovered repos for secrets
Returns:
Dictionary with discovered repositories
"""
scan_id = str(uuid.uuid4())
try:
# Search GitHub for company repositories
# This would use GitHub API (requires token)
repositories = await self._search_github_repos(company_name)
secrets_found = []
if scan_repos and repositories:
# Scan repos with truffleHog
logger.info("Scanning repositories for secrets...")
for repo in repositories[:5]: # Limit to 5 repos
secrets = await self._scan_repo_secrets(repo['url'])
secrets_found.extend(secrets)
response = {
'success': True,
'scan_id': scan_id,
'company': company_name,
'repositories': repositories,
'secrets_found': secrets_found,
}
return response
except Exception as e:
logger.error(f"Error in git recon: {str(e)}")
return {'success': False, 'error': str(e), 'scan_id': scan_id}
async def cloud_asset_enum(
self,
program_id: str,
company_name: str,
clouds: List[str] = None,
) -> Dict[str, Any]:
"""Enumerate cloud assets for a company.
Args:
program_id: Program identifier
company_name: Company name
clouds: Cloud providers to check (aws/azure/gcp)
Returns:
Dictionary with discovered cloud assets
"""
if clouds is None:
clouds = ["aws", "azure", "gcp"]
scan_id = str(uuid.uuid4())
try:
results = {
'success': True,
'scan_id': scan_id,
'company': company_name,
}
# AWS S3 bucket enumeration
if "aws" in clouds:
logger.info("Enumerating AWS assets...")
aws_assets = await self._enumerate_aws_assets(company_name)
results['aws'] = aws_assets
# Azure blob storage
if "azure" in clouds:
logger.info("Enumerating Azure assets...")
azure_assets = await self._enumerate_azure_assets(company_name)
results['azure'] = azure_assets
# GCP storage
if "gcp" in clouds:
logger.info("Enumerating GCP assets...")
gcp_assets = await self._enumerate_gcp_assets(company_name)
results['gcp'] = gcp_assets
return results
except Exception as e:
logger.error(f"Error in cloud asset enum: {str(e)}")
return {'success': False, 'error': str(e), 'scan_id': scan_id}
async def cert_transparency_search(
self,
program_id: str,
domain: str,
days_back: int = 30,
) -> Dict[str, Any]:
"""Search certificate transparency logs.
Args:
program_id: Program identifier
domain: Domain to search
days_back: Days to search back
Returns:
Dictionary with certificate information
"""
scan_id = str(uuid.uuid4())
try:
# Search crt.sh for certificates
logger.info(f"Searching certificate transparency logs for {domain}")
certificates = await self._search_crtsh(domain, days_back)
# Extract subdomains from certificates
all_subdomains = set()
for cert in certificates:
all_subdomains.update(cert.get('san', []))
# Validate
program = self.config.get_program(program_id)
if program:
validator = ScopeValidator(program)
in_scope = validator.filter_in_scope_targets(list(all_subdomains))
else:
in_scope = list(all_subdomains)
response = {
'success': True,
'scan_id': scan_id,
'domain': domain,
'certificates': certificates,
'total_certificates': len(certificates),
'subdomains_found': sorted(in_scope),
'subdomains_count': len(in_scope),
}
return response
except Exception as e:
logger.error(f"Error in cert transparency search: {str(e)}")
return {'success': False, 'error': str(e), 'scan_id': scan_id}
async def email_harvest(
self,
program_id: str,
domain: str,
sources: List[str] = None,
) -> Dict[str, Any]:
"""Harvest email addresses and employee information.
Args:
program_id: Program identifier
domain: Domain to search
sources: Sources to use (google/linkedin/hunter)
Returns:
Dictionary with discovered emails
"""
if sources is None:
sources = ["google", "linkedin"]
scan_id = str(uuid.uuid4())
try:
# Run theHarvester
logger.info(f"Harvesting emails for {domain}")
args = [
"-d", domain,
"-b", ",".join(sources),
"-l", "500", # Limit results
]
result = await self.executor.execute(
"theHarvester",
args,
timeout=600,
)
if result.success:
# Parse output
emails = self.parser.parse_harvester_output(result.output)
# Detect email pattern
email_pattern = self._detect_email_pattern(emails)
response = {
'success': True,
'scan_id': scan_id,
'domain': domain,
'emails': sorted(list(set(emails))),
'email_count': len(set(emails)),
'email_pattern': email_pattern,
'sources': sources,
}
return response
else:
return {
'success': False,
'error': 'theHarvester execution failed',
'details': result.errors,
}
except Exception as e:
logger.error(f"Error in email harvest: {str(e)}")
return {'success': False, 'error': str(e), 'scan_id': scan_id}
async def ldap_enum(
self,
program_id: str,
target: str,
port: int = 389,
auth: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""Enumerate LDAP/Active Directory information.
Args:
program_id: Program identifier
target: LDAP server target
port: LDAP port
auth: Authentication credentials (optional)
Returns:
Dictionary with LDAP enumeration results
"""
scan_id = str(uuid.uuid4())
try:
# Build ldapsearch command
args = [
"-x", # Simple authentication
"-H", f"ldap://{target}:{port}",
"-b", "", # Base DN (will be discovered)
"-s", "base",
"objectclass=*",
]
if auth:
args.extend(["-D", auth.get('username', '')])
args.extend(["-w", auth.get('password', '')])
result = await self.executor.execute(
"ldapsearch",
args,
timeout=300,
)
if result.success:
# Parse LDAP output
ldap_data = self.parser.parse_ldap_output(result.output)
response = {
'success': True,
'scan_id': scan_id,
'target': target,
'port': port,
'base_dn': ldap_data.get('base_dn'),
'naming_contexts': ldap_data.get('naming_contexts', []),
'users': ldap_data.get('users', []),
'groups': ldap_data.get('groups', []),
}
return response
else:
return {
'success': False,
'error': 'LDAP enumeration failed',
'details': result.errors,
}
except Exception as e:
logger.error(f"Error in LDAP enum: {str(e)}")
return {'success': False, 'error': str(e), 'scan_id': scan_id}
# Helper methods
def _extract_api_endpoints(self, urls: List[str]) -> List[str]:
"""Extract potential API endpoints from URLs."""
api_patterns = ['/api/', '/v1/', '/v2/', '/v3/', '/graphql', '/rest/', '/ws/']
endpoints = []
for url in urls:
for pattern in api_patterns:
if pattern in url.lower():
endpoints.append(url)
break
return list(set(endpoints))
def _extract_parameters(self, urls: List[str]) -> List[str]:
"""Extract unique parameters from URLs."""
parameters = []
for url in urls:
if '?' in url:
parsed = urlparse(url)
params = parse_qs(parsed.query)
parameters.extend(params.keys())
return list(set(parameters))
async def _detect_api_type(self, base_url: str) -> str:
"""Auto-detect API type."""
# Simple heuristic: check common GraphQL paths
if 'graphql' in base_url.lower():
return 'graphql'
return 'rest'
async def _discover_graphql(self, base_url: str) -> Optional[Dict[str, Any]]:
"""Discover GraphQL schema via introspection."""
# This would send introspection query
# Simplified for now
return None
async def _discover_rest_endpoints(self, base_url: str) -> List[Dict[str, Any]]:
"""Discover REST API endpoints."""
# This would use crawling and pattern matching
return []
async def _search_github_repos(self, company_name: str) -> List[Dict[str, Any]]:
"""Search GitHub for company repositories."""
# Would use GitHub API
return []
async def _scan_repo_secrets(self, repo_url: str) -> List[Dict[str, Any]]:
"""Scan repository for secrets using truffleHog."""
return []
async def _enumerate_aws_assets(self, company_name: str) -> Dict[str, List[str]]:
"""Enumerate AWS assets."""
return {'s3_buckets': [], 'cloudfront': []}
async def _enumerate_azure_assets(self, company_name: str) -> Dict[str, List[str]]:
"""Enumerate Azure assets."""
return {'blob_storage': [], 'web_apps': []}
async def _enumerate_gcp_assets(self, company_name: str) -> Dict[str, List[str]]:
"""Enumerate GCP assets."""
return {'storage': [], 'app_engine': []}
async def _search_crtsh(self, domain: str, days_back: int) -> List[Dict[str, Any]]:
"""Search crt.sh certificate transparency logs."""
import aiohttp
try:
async with aiohttp.ClientSession() as session:
url = f"https://crt.sh/?q=%.{domain}&output=json"
async with session.get(url, timeout=30) as response:
if response.status == 200:
data = await response.json()
return data[:100] # Limit results
except Exception as e:
logger.error(f"Error searching crt.sh: {str(e)}")
return []
def _detect_email_pattern(self, emails: List[str]) -> Optional[str]:
"""Detect email naming pattern."""
if not emails:
return None
# Simple pattern detection
# Look for common patterns like first.last@, firstlast@, etc.
patterns = set()
for email in emails:
if '@' in email:
local_part = email.split('@')[0]
if '.' in local_part:
patterns.add('{first}.{last}')
elif '-' in local_part:
patterns.add('{first}-{last}')
else:
patterns.add('{firstlast}')
return list(patterns)[0] if patterns else None