"""Fuzzing tools for parameter and path discovery."""
import uuid
from typing import Dict, Any, List, Optional
import logging
from ..models import ScanResult, ScanStatus
from ..config import ConfigManager
from ..utils.validators import ScopeValidator
from ..utils.executor import ToolExecutor
from ..utils.parser import OutputParser
from ..storage.database import DatabaseManager
logger = logging.getLogger(__name__)
class FuzzingTools:
"""Fuzzing tools for discovery."""
def __init__(
self,
config: ConfigManager,
db: DatabaseManager,
executor: ToolExecutor,
):
"""Initialize fuzzing tools.
Args:
config: Configuration manager
db: Database manager
executor: Tool executor
"""
self.config = config
self.db = db
self.executor = executor
self.parser = OutputParser()
async def path_fuzzing(
self,
program_id: str,
base_url: str,
wordlist: str = "common.txt",
extensions: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Fuzz for hidden paths and files.
Args:
program_id: Program identifier
base_url: Base URL to fuzz
wordlist: Wordlist file name
extensions: File extensions to try
Returns:
Dictionary with discovered paths
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_url(base_url)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
# Build ffuf arguments
args = [
"-u", f"{base_url}/FUZZ",
"-w", wordlist,
"-mc", "200,204,301,302,307,401,403",
"-json",
"-s", # Silent mode
]
if extensions:
args.extend(["-e", ",".join(extensions)])
# Rate limit
tool_config = self.config.get_tool_config("ffuf")
if tool_config and tool_config.rate_limit:
args.extend(["-rate", str(tool_config.rate_limit)])
scan_id = str(uuid.uuid4())
try:
result = await self.executor.execute("ffuf", args, timeout=600)
discovered_paths = []
if result.success:
discovered_paths = self.parser.parse_ffuf_output(result.output)
scan_result = ScanResult(
scan_id=scan_id,
program_id=program_id,
tool="ffuf",
target=base_url,
status=ScanStatus.COMPLETED,
duration_seconds=result.execution_time,
metadata={
'wordlist': wordlist,
'extensions': extensions,
'paths_found': len(discovered_paths),
}
)
self.db.save_scan_result(scan_result)
return {
'success': True,
'scan_id': scan_id,
'base_url': base_url,
'paths_found': len(discovered_paths),
'paths': discovered_paths,
}
except Exception as e:
logger.error(f"Error in path fuzzing: {str(e)}")
return {'success': False, 'error': str(e)}
async def parameter_fuzzing(
self,
program_id: str,
url: str,
wordlist: str = "parameters.txt",
method: str = "GET",
) -> Dict[str, Any]:
"""Fuzz for hidden parameters.
Args:
program_id: Program identifier
url: Target URL
wordlist: Parameter wordlist
method: HTTP method
Returns:
Dictionary with discovered parameters
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_url(url)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
# Build ffuf arguments for parameter fuzzing
if method.upper() == "GET":
fuzz_url = f"{url}?FUZZ=test"
else:
fuzz_url = url
args = [
"-u", fuzz_url,
"-w", wordlist,
"-X", method.upper(),
"-mc", "all",
"-fc", "404",
"-json",
"-s",
]
scan_id = str(uuid.uuid4())
try:
result = await self.executor.execute("ffuf", args, timeout=300)
discovered_params = []
if result.success:
discovered_params = self.parser.parse_ffuf_output(result.output)
return {
'success': True,
'scan_id': scan_id,
'url': url,
'method': method,
'parameters_found': len(discovered_params),
'parameters': discovered_params,
}
except Exception as e:
logger.error(f"Error in parameter fuzzing: {str(e)}")
return {'success': False, 'error': str(e)}