"""Subdomain Enumeration Module"""
import asyncio
from typing import Dict, List
from src.utils.helpers import run_command, deduplicate_list, get_timestamp
from src.utils.validators import validate_domain
async def enumerate_subdomains(
domain: str,
tools: List[str],
passive_only: bool = False
) -> Dict:
"""
Enumerate subdomains using multiple tools
Args:
domain: Target domain
tools: List of tools to use
passive_only: Only passive enumeration
Returns:
Dictionary with subdomains found
"""
if not validate_domain(domain):
return {"error": "Invalid domain", "domain": domain}
results = {
"domain": domain,
"timestamp": get_timestamp(),
"tools_used": tools,
"subdomains": [],
"by_tool": {}
}
tasks = []
for tool in tools:
if tool == "subfinder":
tasks.append(run_subfinder(domain, passive_only))
elif tool == "amass":
tasks.append(run_amass(domain, passive_only))
elif tool == "assetfinder":
tasks.append(run_assetfinder(domain))
elif tool == "sublist3r":
tasks.append(run_sublist3r(domain))
# Run all tools concurrently
tool_results = await asyncio.gather(*tasks, return_exceptions=True)
# Merge results
all_subdomains = []
for i, tool in enumerate(tools):
if isinstance(tool_results[i], dict):
tool_subs = tool_results[i].get('subdomains', [])
results['by_tool'][tool] = {
"count": len(tool_subs),
"subdomains": tool_subs
}
all_subdomains.extend(tool_subs)
# Deduplicate and sort
results['subdomains'] = sorted(deduplicate_list(all_subdomains))
results['total_count'] = len(results['subdomains'])
return results
async def run_subfinder(domain: str, passive_only: bool = False) -> Dict:
"""Run subfinder"""
cmd = f"subfinder -d {domain} -silent"
if passive_only:
cmd += " -all"
result = await run_command(cmd, timeout=300)
if result['success']:
subdomains = [line.strip() for line in result['stdout'].split('\n') if line.strip()]
return {"subdomains": subdomains}
return {"subdomains": [], "error": result['stderr']}
async def run_amass(domain: str, passive_only: bool = False) -> Dict:
"""Run amass"""
mode = "enum" if not passive_only else "enum -passive"
cmd = f"amass {mode} -d {domain} -silent"
result = await run_command(cmd, timeout=600)
if result['success']:
subdomains = [line.strip() for line in result['stdout'].split('\n') if line.strip()]
return {"subdomains": subdomains}
return {"subdomains": [], "error": result['stderr']}
async def run_assetfinder(domain: str) -> Dict:
"""Run assetfinder"""
cmd = f"assetfinder --subs-only {domain}"
result = await run_command(cmd, timeout=300)
if result['success']:
subdomains = [line.strip() for line in result['stdout'].split('\n') if line.strip()]
return {"subdomains": subdomains}
return {"subdomains": [], "error": result['stderr']}
async def run_sublist3r(domain: str) -> Dict:
"""Run sublist3r"""
cmd = f"sublist3r -d {domain} -t 10"
result = await run_command(cmd, timeout=300)
if result['success']:
subdomains = [line.strip() for line in result['stdout'].split('\n') if line.strip()]
return {"subdomains": subdomains}
return {"subdomains": [], "error": result['stderr']}