"""Reconnaissance tools for information gathering."""
import uuid
from typing import Dict, Any, List, Optional
from datetime import datetime
import logging
from ..models import ScanResult, ScanStatus, AuditLogEntry
from ..config import ConfigManager
from ..utils.validators import ScopeValidator
from ..utils.executor import ToolExecutor
from ..utils.parser import OutputParser
from ..storage.database import DatabaseManager
from ..storage.cache import CacheManager
logger = logging.getLogger(__name__)
class ReconTools:
"""Reconnaissance tools for bug bounty hunting."""
def __init__(
self,
config: ConfigManager,
db: DatabaseManager,
cache: CacheManager,
executor: ToolExecutor,
):
"""Initialize reconnaissance tools.
Args:
config: Configuration manager
db: Database manager
cache: Cache manager
executor: Tool executor
"""
self.config = config
self.db = db
self.cache = cache
self.executor = executor
self.parser = OutputParser()
async def subdomain_enum(
self,
program_id: str,
domain: str,
method: str = "all",
) -> Dict[str, Any]:
"""Enumerate subdomains for a domain.
Args:
program_id: Program identifier
domain: Domain to enumerate
method: Method (passive/active/all)
Returns:
Dictionary with discovered subdomains
"""
# Validate program and target
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(domain)
if not is_valid:
self.db.log_audit(AuditLogEntry(
action="subdomain_enum",
program_id=program_id,
target=domain,
success=False,
validation_result=reason,
))
return {
'success': False,
'error': f"Target validation failed: {reason}"
}
# Check cache
cache_key = f"subdomain_{program_id}_{domain}_{method}"
cached = self.cache.get(cache_key)
if cached:
logger.info(f"Using cached subdomain results for {domain}")
return cached
# Create scan record
scan_id = str(uuid.uuid4())
scan_result = ScanResult(
scan_id=scan_id,
program_id=program_id,
tool="subdomain_enum",
target=domain,
status=ScanStatus.RUNNING,
)
all_subdomains = set()
errors = []
try:
# Run subfinder
if method in ["passive", "all"]:
result = await self.executor.execute(
"subfinder",
["-d", domain, "-silent"],
timeout=300,
)
if result.success:
subdomains = self.parser.parse_subfinder_output(result.output)
all_subdomains.update(subdomains)
else:
errors.extend(result.errors)
# Filter in-scope subdomains
in_scope_subdomains = validator.filter_in_scope_targets(list(all_subdomains))
scan_result.status = ScanStatus.COMPLETED
scan_result.metadata = {
'method': method,
'total_found': len(all_subdomains),
'in_scope': len(in_scope_subdomains),
'filtered_out': len(all_subdomains) - len(in_scope_subdomains),
}
response = {
'success': True,
'scan_id': scan_id,
'domain': domain,
'subdomains': sorted(in_scope_subdomains),
'count': len(in_scope_subdomains),
'total_found': len(all_subdomains),
'filtered_count': len(all_subdomains) - len(in_scope_subdomains),
'errors': errors if errors else None,
}
# Cache results (24 hour TTL)
self.cache.set(cache_key, response, ttl=86400)
# Save to database
self.db.save_scan_result(scan_result)
# Log audit
self.db.log_audit(AuditLogEntry(
action="subdomain_enum",
program_id=program_id,
target=domain,
tool="subdomain_enum",
success=True,
details={'subdomains_found': len(in_scope_subdomains)},
))
return response
except Exception as e:
logger.error(f"Error in subdomain enumeration: {str(e)}")
scan_result.status = ScanStatus.FAILED
scan_result.error_message = str(e)
self.db.save_scan_result(scan_result)
return {
'success': False,
'error': str(e),
'scan_id': scan_id,
}
async def port_scan(
self,
program_id: str,
target: str,
scan_type: str = "quick",
ports: Optional[List[int]] = None,
) -> Dict[str, Any]:
"""Perform port scanning on a target.
Args:
program_id: Program identifier
target: Target to scan
scan_type: Type of scan (quick/full/custom)
ports: Specific ports for custom scan
Returns:
Dictionary with open ports
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(target)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
# Build nmap arguments
args = ["-Pn"] # Skip ping
if scan_type == "quick":
args.extend(["-F"]) # Fast mode, top 100 ports
elif scan_type == "full":
args.extend(["-p-"]) # All ports
elif scan_type == "custom" and ports:
port_list = ",".join(str(p) for p in ports)
args.extend(["-p", port_list])
args.extend(["-sV", target]) # Service version detection
# Create scan record
scan_id = str(uuid.uuid4())
try:
result = await self.executor.execute("nmap", args, timeout=600)
if result.success:
parsed = self.parser.parse_nmap_output(result.output)
scan_result = ScanResult(
scan_id=scan_id,
program_id=program_id,
tool="nmap",
target=target,
status=ScanStatus.COMPLETED,
metadata=parsed,
duration_seconds=result.execution_time,
)
self.db.save_scan_result(scan_result)
return {
'success': True,
'scan_id': scan_id,
'target': target,
'open_ports': parsed.get('open_ports', []),
'services': parsed.get('services', {}),
'os_detection': parsed.get('os_detection'),
}
else:
return {
'success': False,
'error': 'Nmap scan failed',
'details': result.errors,
}
except Exception as e:
logger.error(f"Error in port scan: {str(e)}")
return {'success': False, 'error': str(e)}
async def technology_detection(
self,
url: str,
) -> Dict[str, Any]:
"""Detect web technologies used on a website.
Args:
url: URL to analyze
Returns:
Dictionary with detected technologies
"""
# Check cache
cache_key = f"tech_{url}"
cached = self.cache.get(cache_key)
if cached:
return cached
try:
# Use httpx for probing
result = await self.executor.execute(
"httpx",
["-u", url, "-tech-detect", "-json", "-silent"],
timeout=60,
)
if result.success:
try:
data = self.parser.parse_json_output(result.output)
response = {
'success': True,
'url': url,
'technologies': data.get('technologies', []),
'status_code': data.get('status_code'),
'title': data.get('title'),
'webserver': data.get('webserver'),
}
# Cache for 7 days
self.cache.set(cache_key, response, ttl=604800)
return response
except Exception:
return {
'success': False,
'error': 'Failed to parse technology detection output',
}
else:
return {
'success': False,
'error': 'Technology detection failed',
'details': result.errors,
}
except Exception as e:
logger.error(f"Error in technology detection: {str(e)}")
return {'success': False, 'error': str(e)}
async def dns_enumeration(
self,
program_id: str,
domain: str,
) -> Dict[str, Any]:
"""Enumerate DNS records for a domain.
Args:
program_id: Program identifier
domain: Domain to enumerate
Returns:
Dictionary with DNS records
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(domain)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
try:
# Use dnsx for DNS enumeration
result = await self.executor.execute(
"dnsx",
["-d", domain, "-a", "-aaaa", "-cname", "-mx", "-txt", "-json", "-silent"],
timeout=60,
)
if result.success:
records = {}
for line in result.output.strip().split('\n'):
if line:
try:
data = self.parser.parse_json_output(line)
record_type = data.get('type', 'unknown')
if record_type not in records:
records[record_type] = []
records[record_type].append(data.get('value'))
except Exception:
pass
return {
'success': True,
'domain': domain,
'records': records,
}
else:
return {
'success': False,
'error': 'DNS enumeration failed',
'details': result.errors,
}
except Exception as e:
logger.error(f"Error in DNS enumeration: {str(e)}")
return {'success': False, 'error': str(e)}