"""Phase 2 advanced testing tools - GraphQL, SSRF, IDOR, JS Analysis, Wayback."""
import asyncio
import json
import uuid
import re
from typing import Dict, Any, Optional, List
from urllib.parse import urlparse, urljoin
import logging
from ..config import ConfigManager
from ..storage.database import DatabaseManager
from ..storage.cache import CacheManager
from ..utils.executor import ToolExecutor
from ..utils.validators import ScopeValidator
from ..utils.parser import OutputParser
logger = logging.getLogger(__name__)
class Phase2Tools:
"""Phase 2 advanced testing tools."""
def __init__(
self,
config: ConfigManager,
db: DatabaseManager,
cache: CacheManager,
executor: ToolExecutor,
):
"""Initialize Phase 2 tools.
Args:
config: Configuration manager
db: Database manager
cache: Cache manager
executor: Tool executor
"""
self.config = config
self.db = db
self.cache = cache
self.executor = executor
self.parser = OutputParser()
async def graphql_scanner(
self,
program_id: str,
url: str,
test_auth: bool = True,
) -> Dict[str, Any]:
"""Scan GraphQL endpoint for security issues.
Args:
program_id: Program identifier
url: GraphQL endpoint URL
test_auth: Whether to test authentication/authorization
Returns:
Dictionary with GraphQL findings
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_url(url)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
scan_id = str(uuid.uuid4())
findings = []
try:
# 1. Test for introspection
introspection_query = {
"query": "{ __schema { queryType { name } } }"
}
args = [
"-s",
"-X", "POST",
"-H", "Content-Type: application/json",
"-d", json.dumps(introspection_query),
url
]
result = await self.executor.execute("curl", args, timeout=30)
if result.success:
try:
response = json.loads(result.output)
# Introspection enabled
if 'data' in response and '__schema' in response['data']:
findings.append({
'severity': 'medium',
'type': 'GraphQL Introspection Enabled',
'description': 'GraphQL introspection is publicly accessible',
'impact': 'Exposes entire API schema to attackers',
'recommendation': 'Disable introspection in production',
})
# Get full schema
full_introspection = {
"query": """
{
__schema {
types {
name
fields {
name
type { name }
}
}
queryType { name }
mutationType { name }
}
}
"""
}
schema_result = await self.executor.execute(
"curl",
[
"-s", "-X", "POST",
"-H", "Content-Type: application/json",
"-d", json.dumps(full_introspection),
url
],
timeout=30
)
if schema_result.success:
schema_data = json.loads(schema_result.output)
findings[-1]['schema'] = schema_data
except json.JSONDecodeError:
pass
# 2. Test for batching attacks
batch_query = [
{"query": "{ __typename }"},
{"query": "{ __typename }"},
{"query": "{ __typename }"},
]
batch_result = await self.executor.execute(
"curl",
[
"-s", "-X", "POST",
"-H", "Content-Type: application/json",
"-d", json.dumps(batch_query),
url
],
timeout=30
)
if batch_result.success and '[' in batch_result.output:
findings.append({
'severity': 'medium',
'type': 'GraphQL Batch Query Enabled',
'description': 'Server accepts batched GraphQL queries',
'impact': 'Can be abused for DoS or rate limit bypass',
'recommendation': 'Implement batch query limits',
})
# 3. Test for depth/complexity issues
deep_query = {
"query": """
{
a: __schema {
b: types {
c: fields {
d: type {
e: ofType {
f: ofType {
name
}
}
}
}
}
}
}
"""
}
depth_result = await self.executor.execute(
"curl",
[
"-s", "-X", "POST",
"-H", "Content-Type: application/json",
"-d", json.dumps(deep_query),
url
],
timeout=30
)
if depth_result.success and 'data' in depth_result.output:
findings.append({
'severity': 'medium',
'type': 'GraphQL No Depth Limit',
'description': 'Server accepts deeply nested queries',
'impact': 'Vulnerable to DoS via query depth attacks',
'recommendation': 'Implement query depth limiting',
})
# Save findings
if findings:
await self.db.save_finding(
program_id=program_id,
scan_id=scan_id,
finding_type="graphql_vulnerabilities",
severity="high",
title=f"GraphQL Security Issues on {url}",
description=f"Found {len(findings)} GraphQL security issues",
evidence=json.dumps(findings, indent=2),
url=url,
)
return {
'success': True,
'scan_id': scan_id,
'url': url,
'findings': findings,
'introspection_enabled': any(f['type'] == 'GraphQL Introspection Enabled' for f in findings),
}
except Exception as e:
logger.error(f"Error in GraphQL scan: {str(e)}")
return {'success': False, 'error': str(e)}
async def js_analyzer(
self,
program_id: str,
url: str,
extract_endpoints: bool = True,
extract_secrets: bool = True,
) -> Dict[str, Any]:
"""Analyze JavaScript files for endpoints and secrets.
Args:
program_id: Program identifier
url: URL to analyze (can be JS file or page)
extract_endpoints: Whether to extract API endpoints
extract_secrets: Whether to extract secrets
Returns:
Dictionary with extracted data
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_url(url)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
scan_id = str(uuid.uuid4())
try:
# Fetch content
args = ["-s", "-L", url]
result = await self.executor.execute("curl", args, timeout=30)
if not result.success:
return {'success': False, 'error': 'Failed to fetch content'}
content = result.output
# Find JS files if this is an HTML page
js_files = []
if '<script' in content or '<html' in content.lower():
js_urls = re.findall(r'<script[^>]+src=["\']([^"\']+)["\']', content)
for js_url in js_urls[:20]: # Limit to 20 files
if js_url.startswith('//'):
js_url = urlparse(url).scheme + ':' + js_url
elif js_url.startswith('/'):
parsed = urlparse(url)
js_url = f"{parsed.scheme}://{parsed.netloc}{js_url}"
elif not js_url.startswith('http'):
js_url = urljoin(url, js_url)
# Validate
is_valid_js, _ = validator.validate_url(js_url)
if is_valid_js:
js_files.append(js_url)
else:
js_files = [url] # Already a JS file
all_endpoints = []
all_secrets = []
# Analyze each JS file
for js_file in js_files:
js_result = await self.executor.execute(
"curl", ["-s", "-L", js_file], timeout=30
)
if not js_result.success:
continue
js_content = js_result.output
if extract_endpoints:
# Extract API endpoints
endpoint_patterns = [
r'["\']/(api|v1|v2|v3|graphql|rest)/[a-zA-Z0-9/_-]+["\']',
r'https?://[a-zA-Z0-9.-]+/[a-zA-Z0-9/_-]+',
r'["\'][/][a-zA-Z0-9/_-]+\.(json|xml|txt)["\']',
]
for pattern in endpoint_patterns:
matches = re.finditer(pattern, js_content)
for match in matches:
endpoint = match.group(0).strip('"\'')
if endpoint not in all_endpoints:
all_endpoints.append({
'endpoint': endpoint,
'source': js_file,
})
if extract_secrets:
# Extract potential secrets
secret_patterns = {
'API Key': r'[aA][pP][iI][_-]?[kK][eE][yY]["\']?\s*[:=]\s*["\'][a-zA-Z0-9_-]{20,}["\']',
'Token': r'[tT][oO][kK][eE][nN]["\']?\s*[:=]\s*["\'][a-zA-Z0-9_-]{20,}["\']',
'AWS Key': r'AKIA[0-9A-Z]{16}',
'Base64': r'["\'][A-Za-z0-9+/]{40,}={0,2}["\']',
}
for secret_type, pattern in secret_patterns.items():
matches = re.finditer(pattern, js_content)
for match in matches:
all_secrets.append({
'type': secret_type,
'value': match.group(0)[:100],
'source': js_file,
})
# Save findings if significant
if all_endpoints or all_secrets:
await self.db.save_finding(
program_id=program_id,
scan_id=scan_id,
finding_type="js_analysis",
severity="medium",
title=f"JavaScript Analysis for {url}",
description=f"Extracted {len(all_endpoints)} endpoints and {len(all_secrets)} potential secrets",
evidence=json.dumps({
'endpoints': all_endpoints,
'secrets': all_secrets,
}, indent=2),
url=url,
)
return {
'success': True,
'scan_id': scan_id,
'url': url,
'js_files_analyzed': len(js_files),
'endpoints_found': len(all_endpoints),
'endpoints': all_endpoints,
'secrets_found': len(all_secrets),
'secrets': all_secrets,
}
except Exception as e:
logger.error(f"Error in JS analysis: {str(e)}")
return {'success': False, 'error': str(e)}
async def wayback_analyzer(
self,
program_id: str,
domain: str,
extract_params: bool = True,
) -> Dict[str, Any]:
"""Enhanced Wayback Machine analysis.
Args:
program_id: Program identifier
domain: Domain to analyze
extract_params: Whether to extract parameters
Returns:
Dictionary with wayback analysis
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
scan_id = str(uuid.uuid4())
# Check cache
cache_key = f"wayback_{domain}"
cached = self.cache.get(cache_key)
if cached:
return cached
try:
# Use waybackurls if available
args = [domain]
result = await self.executor.execute("waybackurls", args, timeout=180)
if not result.success:
return {'success': False, 'error': 'waybackurls not available or failed'}
urls = result.output.strip().split('\n')
# Extract parameters
parameters = set()
old_endpoints = []
for url in urls:
if not url:
continue
# Extract parameters
if extract_params and '?' in url:
query = url.split('?', 1)[1]
params = query.split('&')
for param in params:
if '=' in param:
param_name = param.split('=')[0]
parameters.add(param_name)
# Find old/interesting endpoints
interesting = ['admin', 'api', 'dev', 'test', 'debug', 'backup', 'old', 'v1', 'v2']
if any(keyword in url.lower() for keyword in interesting):
old_endpoints.append(url)
result_data = {
'success': True,
'scan_id': scan_id,
'domain': domain,
'total_urls': len(urls),
'unique_parameters': len(parameters),
'parameters': sorted(list(parameters)),
'old_endpoints': old_endpoints[:50], # Limit
'sample_urls': urls[:100], # Limit
}
# Cache for 7 days
self.cache.set(cache_key, result_data, ttl=604800)
return result_data
except Exception as e:
logger.error(f"Error in wayback analysis: {str(e)}")
return {'success': False, 'error': str(e)}