"""Vulnerability scanning tools."""
import uuid
from typing import Dict, Any, List, Optional
import logging
from ..models import ScanResult, ScanStatus, Finding, Severity, AuditLogEntry
from ..config import ConfigManager
from ..utils.validators import ScopeValidator
from ..utils.executor import ToolExecutor
from ..utils.parser import OutputParser
from ..storage.database import DatabaseManager
logger = logging.getLogger(__name__)
class ScanningTools:
"""Vulnerability scanning tools."""
def __init__(
self,
config: ConfigManager,
db: DatabaseManager,
executor: ToolExecutor,
):
"""Initialize scanning tools.
Args:
config: Configuration manager
db: Database manager
executor: Tool executor
"""
self.config = config
self.db = db
self.executor = executor
self.parser = OutputParser()
async def nuclei_scan(
self,
program_id: str,
target: str,
severity_filter: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Run Nuclei vulnerability scanner.
Args:
program_id: Program identifier
target: Target URL or host
severity_filter: Filter by severity (critical, high, medium, low, info)
tags: Filter by tags
Returns:
Dictionary with scan results
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(target)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
# Build nuclei arguments
args = ["-u", target, "-json", "-silent"]
if severity_filter:
args.extend(["-severity", ",".join(severity_filter)])
if tags:
args.extend(["-tags", ",".join(tags)])
# Create scan record
scan_id = str(uuid.uuid4())
try:
result = await self.executor.execute("nuclei", args, timeout=600)
findings = []
if result.success:
findings = self.parser.parse_nuclei_output(result.output)
scan_result = ScanResult(
scan_id=scan_id,
program_id=program_id,
tool="nuclei",
target=target,
status=ScanStatus.COMPLETED,
findings=findings,
duration_seconds=result.execution_time,
metadata={
'severity_filter': severity_filter,
'tags': tags,
}
)
self.db.save_scan_result(scan_result)
# Log audit
self.db.log_audit(AuditLogEntry(
action="nuclei_scan",
program_id=program_id,
target=target,
tool="nuclei",
success=True,
details={'findings_count': len(findings)},
))
return {
'success': True,
'scan_id': scan_id,
'target': target,
'findings_count': len(findings),
'findings': [
{
'title': f.title,
'severity': f.severity,
'description': f.description,
'cwe_id': f.cwe_id,
}
for f in findings
],
}
except Exception as e:
logger.error(f"Error in nuclei scan: {str(e)}")
return {'success': False, 'error': str(e)}
async def xss_scan(
self,
program_id: str,
url: str,
payload_list: Optional[str] = None,
) -> Dict[str, Any]:
"""Scan for XSS vulnerabilities.
Args:
program_id: Program identifier
url: Target URL
payload_list: Custom payload list file
Returns:
Dictionary with XSS findings
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_url(url)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
# Build dalfox arguments
args = ["url", url, "--silence"]
if payload_list:
args.extend(["--custom-payload", payload_list])
# Create scan record
scan_id = str(uuid.uuid4())
try:
result = await self.executor.execute("dalfox", args, timeout=300)
findings = []
if result.success:
findings = self.parser.parse_dalfox_output(result.output)
scan_result = ScanResult(
scan_id=scan_id,
program_id=program_id,
tool="dalfox",
target=url,
status=ScanStatus.COMPLETED,
findings=findings,
duration_seconds=result.execution_time,
)
self.db.save_scan_result(scan_result)
return {
'success': True,
'scan_id': scan_id,
'url': url,
'findings_count': len(findings),
'findings': [
{
'title': f.title,
'severity': f.severity,
'evidence': f.evidence,
}
for f in findings
],
}
except Exception as e:
logger.error(f"Error in XSS scan: {str(e)}")
return {'success': False, 'error': str(e)}
async def ssl_analysis(
self,
program_id: str,
domain: str,
) -> Dict[str, Any]:
"""Analyze SSL/TLS configuration.
Args:
program_id: Program identifier
domain: Target domain
Returns:
Dictionary with SSL/TLS findings
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(domain)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
# Build testssl.sh arguments
args = ["--json", domain]
scan_id = str(uuid.uuid4())
try:
result = await self.executor.execute("testssl.sh", args, timeout=300)
findings = []
if result.success:
findings = self.parser.parse_testssl_output(result.output)
scan_result = ScanResult(
scan_id=scan_id,
program_id=program_id,
tool="testssl",
target=domain,
status=ScanStatus.COMPLETED,
findings=findings,
duration_seconds=result.execution_time,
)
self.db.save_scan_result(scan_result)
return {
'success': True,
'scan_id': scan_id,
'domain': domain,
'findings_count': len(findings),
'findings': [
{
'title': f.title,
'severity': f.severity,
'description': f.description,
}
for f in findings
],
}
except Exception as e:
logger.error(f"Error in SSL analysis: {str(e)}")
return {'success': False, 'error': str(e)}