"""Output parsers for various security tools."""
import json
import re
from typing import Any, Dict, List, Optional
import logging
from ..models import Finding, Severity
logger = logging.getLogger(__name__)
class OutputParser:
"""Parses output from various security tools."""
@staticmethod
def parse_json_output(output: str) -> Dict[str, Any]:
"""Parse JSON output from tools.
Args:
output: Raw JSON output
Returns:
Parsed JSON as dictionary
"""
try:
return json.loads(output)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON: {e}")
return {}
@staticmethod
def parse_nuclei_output(output: str) -> List[Finding]:
"""Parse Nuclei JSON output into findings.
Args:
output: Nuclei JSON output (one JSON object per line)
Returns:
List of Finding objects
"""
findings = []
for line in output.strip().split('\n'):
if not line.strip():
continue
try:
data = json.loads(line)
severity_map = {
'critical': Severity.CRITICAL,
'high': Severity.HIGH,
'medium': Severity.MEDIUM,
'low': Severity.LOW,
'info': Severity.INFO,
}
finding = Finding(
title=data.get('info', {}).get('name', 'Unknown'),
severity=severity_map.get(
data.get('info', {}).get('severity', 'info').lower(),
Severity.INFO
),
description=data.get('info', {}).get('description', ''),
cwe_id=data.get('info', {}).get('classification', {}).get('cwe-id', [None])[0],
evidence={
'matched_at': data.get('matched-at', ''),
'matcher_name': data.get('matcher-name', ''),
'template_id': data.get('template-id', ''),
'type': data.get('type', ''),
},
references=data.get('info', {}).get('reference', []),
)
findings.append(finding)
except json.JSONDecodeError:
logger.warning(f"Failed to parse Nuclei line: {line[:100]}")
continue
return findings
@staticmethod
def parse_nmap_output(output: str) -> Dict[str, Any]:
"""Parse Nmap output (simple text parsing).
Args:
output: Nmap text output
Returns:
Dictionary with parsed results
"""
result = {
'open_ports': [],
'services': {},
'os_detection': None,
}
# Parse open ports
port_pattern = r'(\d+)/(\w+)\s+open\s+(\S+)'
for match in re.finditer(port_pattern, output):
port = match.group(1)
protocol = match.group(2)
service = match.group(3)
result['open_ports'].append({
'port': int(port),
'protocol': protocol,
'service': service,
})
result['services'][f"{port}/{protocol}"] = service
# Try to extract OS information
os_match = re.search(r'OS details: (.+)', output)
if os_match:
result['os_detection'] = os_match.group(1)
return result
@staticmethod
def parse_subfinder_output(output: str) -> List[str]:
"""Parse Subfinder output (list of subdomains).
Args:
output: Subfinder output (one subdomain per line)
Returns:
List of discovered subdomains
"""
subdomains = []
for line in output.strip().split('\n'):
subdomain = line.strip()
if subdomain and not subdomain.startswith('#'):
subdomains.append(subdomain)
return subdomains
@staticmethod
def parse_ffuf_output(output: str) -> List[Dict[str, Any]]:
"""Parse ffuf JSON output.
Args:
output: ffuf JSON output
Returns:
List of discovered paths/parameters
"""
results = []
try:
data = json.loads(output)
for result in data.get('results', []):
results.append({
'url': result.get('url', ''),
'status': result.get('status', 0),
'length': result.get('length', 0),
'words': result.get('words', 0),
'lines': result.get('lines', 0),
})
except json.JSONDecodeError:
logger.error("Failed to parse ffuf JSON output")
return results
@staticmethod
def parse_sqlmap_output(output: str) -> Optional[Finding]:
"""Parse SQLMap output for SQL injection findings.
Args:
output: SQLMap output
Returns:
Finding object if vulnerability found, None otherwise
"""
# Check if SQL injection was found
if 'is vulnerable' not in output.lower():
return None
# Extract database type
db_type = None
db_match = re.search(r'back-end DBMS: (\w+)', output, re.IGNORECASE)
if db_match:
db_type = db_match.group(1)
# Extract injection type
injection_type = None
type_match = re.search(r'Type: (.+)', output)
if type_match:
injection_type = type_match.group(1).strip()
# Extract payload
payload = None
payload_match = re.search(r'Payload: (.+)', output)
if payload_match:
payload = payload_match.group(1).strip()
return Finding(
title="SQL Injection Vulnerability",
severity=Severity.HIGH,
description=f"SQL injection vulnerability detected. Database: {db_type or 'Unknown'}",
cwe_id="CWE-89",
evidence={
'database_type': db_type,
'injection_type': injection_type,
'payload': payload,
},
remediation="Use parameterized queries/prepared statements. Avoid dynamic SQL construction with user input.",
)
@staticmethod
def parse_dalfox_output(output: str) -> List[Finding]:
"""Parse Dalfox output for XSS findings.
Args:
output: Dalfox output
Returns:
List of XSS findings
"""
findings = []
# Dalfox outputs in format: [TYPE] URL PARAM PAYLOAD
xss_pattern = r'\[(\w+)\]\s+(.+?)\s+(.+?)\s+(.+)'
for match in re.finditer(xss_pattern, output):
vuln_type = match.group(1)
url = match.group(2)
param = match.group(3)
payload = match.group(4)
finding = Finding(
title=f"Cross-Site Scripting (XSS) - {vuln_type}",
severity=Severity.MEDIUM,
description=f"XSS vulnerability found in parameter '{param}'",
cwe_id="CWE-79",
evidence={
'url': url,
'parameter': param,
'payload': payload,
'type': vuln_type,
},
remediation="Implement proper input validation and output encoding. Use Content Security Policy (CSP).",
)
findings.append(finding)
return findings
@staticmethod
def parse_testssl_output(output: str) -> List[Finding]:
"""Parse testssl.sh output for SSL/TLS issues.
Args:
output: testssl.sh output
Returns:
List of SSL/TLS findings
"""
findings = []
# Look for vulnerabilities
vuln_patterns = {
'Heartbleed': (Severity.CRITICAL, 'CWE-119'),
'POODLE': (Severity.HIGH, 'CWE-310'),
'BEAST': (Severity.MEDIUM, 'CWE-326'),
'CRIME': (Severity.MEDIUM, 'CWE-310'),
'Weak cipher': (Severity.LOW, 'CWE-326'),
}
for vuln_name, (severity, cwe) in vuln_patterns.items():
if vuln_name.lower() in output.lower() and 'vulnerable' in output.lower():
finding = Finding(
title=f"SSL/TLS Vulnerability: {vuln_name}",
severity=severity,
description=f"SSL/TLS configuration is vulnerable to {vuln_name}",
cwe_id=cwe,
evidence={'vulnerability': vuln_name},
remediation=f"Update SSL/TLS configuration to address {vuln_name} vulnerability",
)
findings.append(finding)
return findings
@staticmethod
def parse_amass_output(output: str) -> List[str]:
"""Parse Amass output (plain text format).
Args:
output: Amass text output
Returns:
List of discovered subdomains
"""
subdomains = []
for line in output.strip().split('\n'):
if not line.strip():
continue
# Amass output format: "subdomain.domain.com (FQDN) --> record_type --> value"
# We want to extract the subdomain part
if '(FQDN)' in line:
parts = line.split('(FQDN)')
if parts:
subdomain = parts[0].strip()
if subdomain and '.' in subdomain:
subdomains.append(subdomain)
elif line.strip() and not line.startswith('#'):
# Sometimes amass just outputs the subdomain directly
subdomain = line.strip()
if '.' in subdomain:
subdomains.append(subdomain)
return list(set(subdomains))
@staticmethod
def parse_gospider_output(output: str) -> Dict[str, Any]:
"""Parse GoSpider JSON output.
Args:
output: GoSpider JSON output
Returns:
Dictionary with discovered URLs, parameters, forms, and JS files
"""
result = {
'urls': [],
'parameters': set(),
'forms': [],
'js_files': [],
}
for line in output.strip().split('\n'):
if not line.strip():
continue
try:
data = json.loads(line)
# Extract URL
url = data.get('output', '')
if url:
result['urls'].append(url)
# Check if it's a JS file
if url.endswith('.js'):
result['js_files'].append(url)
# Extract parameters
if '?' in url:
params = url.split('?')[1].split('&')
for param in params:
if '=' in param:
param_name = param.split('=')[0]
result['parameters'].add(param_name)
# Extract form data
if data.get('output_type') == 'form':
result['forms'].append({
'url': url,
'method': data.get('method', 'GET'),
})
except json.JSONDecodeError:
continue
result['parameters'] = list(result['parameters'])
return result
@staticmethod
def parse_katana_output(output: str) -> Dict[str, Any]:
"""Parse Katana JSON output.
Args:
output: Katana JSON output
Returns:
Dictionary with discovered URLs and endpoints
"""
result = {
'urls': [],
'endpoints': [],
}
for line in output.strip().split('\n'):
if not line.strip():
continue
try:
data = json.loads(line)
url = data.get('request', {}).get('url', '')
if url:
result['urls'].append(url)
# Detect endpoints
if any(api_str in url.lower() for api_str in ['/api/', '/v1/', '/v2/', '/graphql']):
result['endpoints'].append(url)
except json.JSONDecodeError:
# Katana might output plain URLs
line = line.strip()
if line.startswith('http'):
result['urls'].append(line)
return result
@staticmethod
def parse_masscan_output(output: str) -> Dict[str, Any]:
"""Parse Masscan JSON output.
Args:
output: Masscan JSON output
Returns:
Dictionary with discovered hosts and ports
"""
result = {'hosts': []}
try:
# Masscan outputs JSON array
data = json.loads(output)
# Group by IP
hosts_dict = {}
for item in data:
if 'ip' in item and 'ports' in item:
ip = item['ip']
port_info = item['ports'][0] # Masscan has one port per record
if ip not in hosts_dict:
hosts_dict[ip] = {
'ip': ip,
'ports': [],
}
hosts_dict[ip]['ports'].append({
'port': port_info.get('port'),
'protocol': port_info.get('proto', 'tcp'),
'status': port_info.get('status', 'open'),
})
result['hosts'] = list(hosts_dict.values())
except json.JSONDecodeError:
logger.error("Failed to parse masscan JSON output")
return result
@staticmethod
def parse_harvester_output(output: str) -> List[str]:
"""Parse theHarvester output for email addresses.
Args:
output: theHarvester output
Returns:
List of discovered email addresses
"""
emails = []
# Email regex pattern
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
for match in re.finditer(email_pattern, output):
email = match.group(0)
emails.append(email.lower())
return list(set(emails))
@staticmethod
def parse_ldap_output(output: str) -> Dict[str, Any]:
"""Parse LDAP search output.
Args:
output: LDAP search output
Returns:
Dictionary with LDAP enumeration results
"""
result = {
'base_dn': None,
'naming_contexts': [],
'users': [],
'groups': [],
}
# Parse base DN
base_dn_match = re.search(r'dn:\s*(.+)', output)
if base_dn_match:
result['base_dn'] = base_dn_match.group(1).strip()
# Parse naming contexts
for match in re.finditer(r'namingContexts:\s*(.+)', output):
result['naming_contexts'].append(match.group(1).strip())
# Parse user entries (simplified)
for match in re.finditer(r'cn:\s*(.+)', output):
result['users'].append(match.group(1).strip())
return result