"""Advanced security scanning tools for web applications."""
import asyncio
import json
import re
import uuid
from typing import Dict, Any, Optional, List
from urllib.parse import urlparse
import logging
from ..config import ConfigManager
from ..storage.database import DatabaseManager
from ..utils.executor import ToolExecutor
from ..utils.validators import ScopeValidator
logger = logging.getLogger(__name__)
class AdvancedScanningTools:
"""Advanced security scanning tools."""
def __init__(
self,
config: ConfigManager,
db: DatabaseManager,
executor: ToolExecutor,
):
"""Initialize advanced scanning tools.
Args:
config: Configuration manager
db: Database manager
executor: Tool executor
"""
self.config = config
self.db = db
self.executor = executor
async def cors_scan(
self,
program_id: str,
url: str,
custom_origins: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Scan for CORS misconfigurations.
Args:
program_id: Program identifier
url: Target URL to test
custom_origins: Optional list of custom origins to test
Returns:
Dictionary with CORS vulnerability findings
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_url(url)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
scan_id = str(uuid.uuid4())
parsed = urlparse(url)
# Test origins
test_origins = custom_origins or [
"null",
"https://evil.com",
f"https://evil.{parsed.netloc}",
f"{parsed.scheme}://evil.com",
"http://localhost",
f"{parsed.scheme}://{parsed.netloc}.evil.com",
]
vulnerabilities = []
try:
# Build curl commands for each origin test
for origin in test_origins:
args = [
"-s",
"-i",
"-H", f"Origin: {origin}",
"-X", "OPTIONS",
url
]
result = await self.executor.execute("curl", args, timeout=30)
if result.success:
response = result.output.lower()
# Check for CORS headers
if "access-control-allow-origin" in response:
# Extract the actual ACAO header
acao_match = re.search(
r'access-control-allow-origin:\s*([^\r\n]+)',
response,
re.IGNORECASE
)
if acao_match:
acao_value = acao_match.group(1).strip()
# Check for credentials
allow_credentials = "access-control-allow-credentials: true" in response
# Vulnerability checks
vuln = None
# 1. Null origin accepted
if origin == "null" and acao_value == "null":
vuln = {
'severity': 'high' if allow_credentials else 'medium',
'type': 'CORS Null Origin Accepted',
'origin_tested': origin,
'acao_returned': acao_value,
'credentials_allowed': allow_credentials,
'description': 'Server accepts null origin which can be exploited via sandbox iframe',
'impact': 'Attacker can make authenticated requests from sandboxed iframe',
}
# 2. Reflected arbitrary origin
elif acao_value == origin and origin not in ["null", url]:
vuln = {
'severity': 'critical' if allow_credentials else 'high',
'type': 'CORS Arbitrary Origin Reflection',
'origin_tested': origin,
'acao_returned': acao_value,
'credentials_allowed': allow_credentials,
'description': 'Server reflects arbitrary Origin header',
'impact': 'Any domain can make cross-origin requests' +
(' with credentials' if allow_credentials else ''),
}
# 3. Wildcard with credentials
elif acao_value == "*" and allow_credentials:
vuln = {
'severity': 'high',
'type': 'CORS Wildcard with Credentials',
'origin_tested': origin,
'acao_returned': acao_value,
'credentials_allowed': allow_credentials,
'description': 'Wildcard origin with credentials (browsers block this)',
'impact': 'Configuration error, may indicate other CORS issues',
}
# 4. Pre-domain wildcard (subdomain trust)
elif ".evil.com" in origin and acao_value == origin:
vuln = {
'severity': 'medium',
'type': 'CORS Subdomain Trust',
'origin_tested': origin,
'acao_returned': acao_value,
'credentials_allowed': allow_credentials,
'description': 'Server trusts any subdomain (pre-domain wildcard)',
'impact': 'Attacker with subdomain control can make requests',
}
if vuln:
# Generate PoC
vuln['proof_of_concept'] = self._generate_cors_poc(
url, origin, acao_value, allow_credentials
)
vulnerabilities.append(vuln)
# Save to database
if vulnerabilities:
await self.db.save_finding(
program_id=program_id,
scan_id=scan_id,
finding_type="cors_misconfiguration",
severity="critical" if any(v['severity'] == 'critical' for v in vulnerabilities) else "high",
title=f"CORS Misconfiguration on {url}",
description=f"Found {len(vulnerabilities)} CORS vulnerabilities",
evidence=json.dumps(vulnerabilities, indent=2),
url=url,
)
return {
'success': True,
'scan_id': scan_id,
'url': url,
'vulnerabilities_found': len(vulnerabilities),
'vulnerabilities': vulnerabilities,
'tested_origins': test_origins,
}
except Exception as e:
logger.error(f"Error in CORS scan: {str(e)}")
return {'success': False, 'error': str(e)}
def _generate_cors_poc(
self,
target_url: str,
origin: str,
acao: str,
credentials: bool
) -> str:
"""Generate CORS exploitation PoC HTML.
Args:
target_url: Target URL
origin: Origin that was accepted
acao: Access-Control-Allow-Origin value
credentials: Whether credentials are allowed
Returns:
HTML PoC code
"""
cred_flag = "true" if credentials else "false"
poc = f'''<!DOCTYPE html>
<html>
<head>
<title>CORS PoC</title>
</head>
<body>
<h1>CORS Exploitation PoC</h1>
<p>Target: {target_url}</p>
<p>Origin: {origin}</p>
<div id="result"></div>
<script>
fetch('{target_url}', {{
method: 'GET',
credentials: '{('include' if credentials else 'omit')}',
mode: 'cors',
}})
.then(response => response.text())
.then(data => {{
document.getElementById('result').innerHTML =
'<h2>Success! Data retrieved:</h2><pre>' +
data.substring(0, 500) +
'</pre>';
}})
.catch(error => {{
document.getElementById('result').innerHTML =
'<h2>Error:</h2><pre>' + error + '</pre>';
}});
</script>
</body>
</html>'''
return poc
async def security_headers_scan(
self,
program_id: str,
url: str,
) -> Dict[str, Any]:
"""Scan for missing or weak security headers.
Args:
program_id: Program identifier
url: Target URL to test
Returns:
Dictionary with security header analysis
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_url(url)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
scan_id = str(uuid.uuid4())
try:
# Fetch headers
args = ["-s", "-I", "-L", url]
result = await self.executor.execute("curl", args, timeout=30)
if not result.success:
return {
'success': False,
'error': 'Failed to fetch headers',
'details': result.errors,
}
headers = {}
for line in result.output.split('\n'):
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip().lower()] = value.strip()
issues = []
recommendations = []
# Check security headers
security_headers = {
'strict-transport-security': {
'severity': 'medium',
'description': 'Missing HSTS header',
'impact': 'Site vulnerable to SSL stripping attacks',
'recommendation': 'Strict-Transport-Security: max-age=31536000; includeSubDomains; preload',
},
'x-frame-options': {
'severity': 'medium',
'description': 'Missing X-Frame-Options header',
'impact': 'Site vulnerable to clickjacking attacks',
'recommendation': 'X-Frame-Options: DENY or SAMEORIGIN',
},
'x-content-type-options': {
'severity': 'low',
'description': 'Missing X-Content-Type-Options header',
'impact': 'Browser may MIME-sniff responses',
'recommendation': 'X-Content-Type-Options: nosniff',
},
'x-xss-protection': {
'severity': 'info',
'description': 'Missing X-XSS-Protection header',
'impact': 'Legacy XSS protection disabled',
'recommendation': 'X-XSS-Protection: 1; mode=block',
},
'content-security-policy': {
'severity': 'high',
'description': 'Missing Content-Security-Policy header',
'impact': 'No CSP protection against XSS and injection attacks',
'recommendation': "Content-Security-Policy: default-src 'self'",
},
'referrer-policy': {
'severity': 'info',
'description': 'Missing Referrer-Policy header',
'impact': 'Referrer information may leak',
'recommendation': 'Referrer-Policy: strict-origin-when-cross-origin',
},
'permissions-policy': {
'severity': 'info',
'description': 'Missing Permissions-Policy header',
'impact': 'No control over browser features',
'recommendation': 'Permissions-Policy: geolocation=(), camera=(), microphone=()',
},
}
present_headers = {}
for header, details in security_headers.items():
if header in headers:
present_headers[header] = headers[header]
# Analyze header values
value = headers[header].lower()
# HSTS checks
if header == 'strict-transport-security':
if 'max-age' not in value:
issues.append({
'severity': 'medium',
'header': header,
'issue': 'HSTS missing max-age directive',
'current_value': headers[header],
})
elif int(re.search(r'max-age=(\d+)', value).group(1)) < 31536000:
issues.append({
'severity': 'low',
'header': header,
'issue': 'HSTS max-age too short (should be 1 year+)',
'current_value': headers[header],
})
if 'includesubdomains' not in value:
recommendations.append({
'header': header,
'recommendation': 'Add includeSubDomains to HSTS',
})
# CSP checks
if header == 'content-security-policy':
if 'unsafe-inline' in value:
issues.append({
'severity': 'medium',
'header': header,
'issue': "CSP contains 'unsafe-inline' which weakens protection",
'current_value': headers[header],
})
if 'unsafe-eval' in value:
issues.append({
'severity': 'medium',
'header': header,
'issue': "CSP contains 'unsafe-eval' which weakens protection",
'current_value': headers[header],
})
if '*' in value and 'default-src' in value:
issues.append({
'severity': 'high',
'header': header,
'issue': "CSP uses wildcard (*) in default-src",
'current_value': headers[header],
})
else:
issues.append({
'severity': details['severity'],
'header': header,
'issue': details['description'],
'impact': details['impact'],
'recommendation': details['recommendation'],
})
# Check for information disclosure headers
info_disclosure = ['server', 'x-powered-by', 'x-aspnet-version', 'x-aspnetmvc-version']
disclosed_info = {}
for header in info_disclosure:
if header in headers:
disclosed_info[header] = headers[header]
issues.append({
'severity': 'info',
'header': header,
'issue': f'Information disclosure via {header} header',
'current_value': headers[header],
'recommendation': f'Remove or obfuscate {header} header',
})
# Calculate overall score
total_headers = len(security_headers)
present_count = len(present_headers)
score = (present_count / total_headers) * 100
# Determine grade
if score >= 90:
grade = 'A'
elif score >= 75:
grade = 'B'
elif score >= 50:
grade = 'C'
elif score >= 25:
grade = 'D'
else:
grade = 'F'
# Save findings if issues found
if issues:
high_severity_issues = [i for i in issues if i['severity'] in ['high', 'critical']]
overall_severity = 'high' if high_severity_issues else 'medium'
await self.db.save_finding(
program_id=program_id,
scan_id=scan_id,
finding_type="security_headers",
severity=overall_severity,
title=f"Security Header Issues on {url}",
description=f"Found {len(issues)} security header issues (Grade: {grade})",
evidence=json.dumps({
'issues': issues,
'present_headers': present_headers,
'score': score,
'grade': grade,
}, indent=2),
url=url,
)
return {
'success': True,
'scan_id': scan_id,
'url': url,
'score': score,
'grade': grade,
'present_headers': present_headers,
'missing_headers': [h for h in security_headers if h not in headers],
'issues': issues,
'recommendations': recommendations,
'information_disclosure': disclosed_info,
'total_issues': len(issues),
}
except Exception as e:
logger.error(f"Error in security headers scan: {str(e)}")
return {'success': False, 'error': str(e)}
async def secret_scan(
self,
program_id: str,
url: str,
scan_js_files: bool = True,
) -> Dict[str, Any]:
"""Scan for exposed secrets and API keys.
Args:
program_id: Program identifier
url: Target URL to scan
scan_js_files: Whether to also scan JavaScript files
Returns:
Dictionary with discovered secrets
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_url(url)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
scan_id = str(uuid.uuid4())
secrets_found = []
# Secret patterns
patterns = {
'AWS Access Key': r'AKIA[0-9A-Z]{16}',
'AWS Secret Key': r'aws(.{0,20})?[\'"][0-9a-zA-Z/+]{40}[\'"]',
'Google API Key': r'AIza[0-9A-Za-z\-_]{35}',
'Google OAuth': r'[0-9]+-[0-9A-Za-z_]{32}\.apps\.googleusercontent\.com',
'GitHub Token': r'ghp_[0-9a-zA-Z]{36}',
'GitHub OAuth': r'gho_[0-9a-zA-Z]{36}',
'Slack Token': r'xox[baprs]-[0-9]{12}-[0-9]{12}-[0-9a-zA-Z]{24}',
'Slack Webhook': r'https://hooks\.slack\.com/services/T[a-zA-Z0-9_]{8}/B[a-zA-Z0-9_]{8}/[a-zA-Z0-9_]{24}',
'Stripe API Key': r'sk_live_[0-9a-zA-Z]{24}',
'Stripe Publishable': r'pk_live_[0-9a-zA-Z]{24}',
'Twilio API Key': r'SK[0-9a-fA-F]{32}',
'Generic API Key': r'[aA][pP][iI][_-]?[kK][eE][yY][\'"]?\s*[:=]\s*[\'"][0-9a-zA-Z\-_]{20,}[\'"]',
'Generic Secret': r'[sS][eE][cC][rR][eE][tT][\'"]?\s*[:=]\s*[\'"][0-9a-zA-Z\-_]{20,}[\'"]',
'Private Key': r'-----BEGIN (RSA |DSA |EC |OPENSSH )?PRIVATE KEY-----',
'JWT Token': r'eyJ[A-Za-z0-9-_=]+\.[A-Za-z0-9-_=]+\.?[A-Za-z0-9-_.+/=]*',
'Database URL': r'(postgresql|mysql|mongodb)://[^\s]+',
'Bearer Token': r'[bB]earer\s+[A-Za-z0-9\-_=]+\.[A-Za-z0-9\-_=]+\.?[A-Za-z0-9\-_.+/=]*',
}
try:
# Scan main page
args = ["-s", "-L", url]
result = await self.executor.execute("curl", args, timeout=30)
if result.success:
content = result.output
secrets_found.extend(
self._scan_content_for_secrets(content, url, patterns)
)
# Scan JavaScript files if requested
if scan_js_files:
# Find JS files in the page
js_urls = re.findall(
r'<script[^>]+src=["\']([^"\']+)["\']',
content if result.success else ''
)
for js_url in js_urls[:10]: # Limit to 10 JS files
# Make absolute URL
if js_url.startswith('//'):
js_url = urlparse(url).scheme + ':' + js_url
elif js_url.startswith('/'):
parsed = urlparse(url)
js_url = f"{parsed.scheme}://{parsed.netloc}{js_url}"
elif not js_url.startswith('http'):
continue
# Validate JS URL is in scope
is_valid_js, _ = validator.validate_url(js_url)
if not is_valid_js:
continue
# Fetch and scan JS file
js_result = await self.executor.execute(
"curl", ["-s", "-L", js_url], timeout=30
)
if js_result.success:
secrets_found.extend(
self._scan_content_for_secrets(
js_result.output, js_url, patterns
)
)
# Save findings
if secrets_found:
await self.db.save_finding(
program_id=program_id,
scan_id=scan_id,
finding_type="secret_exposure",
severity="critical",
title=f"Exposed Secrets Found on {url}",
description=f"Found {len(secrets_found)} potential secrets",
evidence=json.dumps(secrets_found, indent=2),
url=url,
)
return {
'success': True,
'scan_id': scan_id,
'url': url,
'secrets_found': len(secrets_found),
'secrets': secrets_found,
'scanned_js_files': scan_js_files,
}
except Exception as e:
logger.error(f"Error in secret scan: {str(e)}")
return {'success': False, 'error': str(e)}
def _scan_content_for_secrets(
self,
content: str,
source_url: str,
patterns: Dict[str, str]
) -> List[Dict[str, Any]]:
"""Scan content for secret patterns.
Args:
content: Content to scan
source_url: URL where content was found
patterns: Dictionary of secret patterns
Returns:
List of found secrets
"""
secrets = []
for secret_type, pattern in patterns.items():
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
secret_value = match.group(0)
# Get context (50 chars before and after)
start = max(0, match.start() - 50)
end = min(len(content), match.end() + 50)
context = content[start:end]
secrets.append({
'type': secret_type,
'value': secret_value[:50] + '...' if len(secret_value) > 50 else secret_value,
'full_value': secret_value,
'source': source_url,
'context': context,
'severity': 'critical' if secret_type in [
'AWS Access Key', 'AWS Secret Key', 'Private Key', 'Database URL'
] else 'high',
})
return secrets