"""Cloud security scanning tools."""
import asyncio
import json
import uuid
import re
from typing import Dict, Any, Optional, List
from urllib.parse import urlparse
import logging
from ..config import ConfigManager
from ..storage.database import DatabaseManager
from ..utils.executor import ToolExecutor
from ..utils.validators import ScopeValidator
logger = logging.getLogger(__name__)
class CloudSecurityTools:
"""Cloud security scanning tools."""
def __init__(
self,
config: ConfigManager,
db: DatabaseManager,
executor: ToolExecutor,
):
"""Initialize cloud security tools.
Args:
config: Configuration manager
db: Database manager
executor: Tool executor
"""
self.config = config
self.db = db
self.executor = executor
async def s3_scanner(
self,
program_id: str,
target: str,
check_permissions: bool = True,
) -> Dict[str, Any]:
"""Scan for S3 bucket misconfigurations.
Args:
program_id: Program identifier
target: Domain or bucket name to scan
check_permissions: Whether to check bucket permissions
Returns:
Dictionary with S3 bucket findings
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
scan_id = str(uuid.uuid4())
buckets_found = []
try:
# Generate potential bucket names
if target.startswith('http'):
parsed = urlparse(target)
domain = parsed.netloc
else:
domain = target
# Remove www and common prefixes
domain_clean = domain.replace('www.', '').replace('dev.', '').replace('staging.', '')
domain_parts = domain_clean.split('.')
company_name = domain_parts[0]
# Generate bucket name permutations
bucket_names = [
company_name,
f"{company_name}-assets",
f"{company_name}-uploads",
f"{company_name}-static",
f"{company_name}-backup",
f"{company_name}-backups",
f"{company_name}-data",
f"{company_name}-files",
f"{company_name}-images",
f"{company_name}-prod",
f"{company_name}-production",
f"{company_name}-staging",
f"{company_name}-dev",
f"{company_name}-test",
domain_clean.replace('.', '-'),
domain_clean.replace('.', ''),
]
# Check each bucket
for bucket_name in bucket_names:
bucket_url = f"https://{bucket_name}.s3.amazonaws.com/"
# Test bucket existence
args = ["-s", "-I", "-X", "HEAD", bucket_url]
result = await self.executor.execute("curl", args, timeout=10)
if result.success:
response = result.output.lower()
# Bucket exists if we don't get NoSuchBucket
if 'nosuchbucket' not in response and '404' not in response:
bucket_info = {
'bucket_name': bucket_name,
'url': bucket_url,
'exists': True,
'accessible': False,
'listable': False,
'writable': False,
'severity': 'info',
}
if check_permissions:
# Test read permissions (list)
list_result = await self.executor.execute(
"curl", ["-s", bucket_url], timeout=10
)
if list_result.success and '<ListBucketResult' in list_result.output:
bucket_info['listable'] = True
bucket_info['accessible'] = True
bucket_info['severity'] = 'high'
# Extract file count
files = re.findall(r'<Key>([^<]+)</Key>', list_result.output)
bucket_info['file_count'] = len(files)
bucket_info['sample_files'] = files[:10]
# Test write permissions
test_file = f"test-{uuid.uuid4().hex[:8]}.txt"
write_result = await self.executor.execute(
"curl",
[
"-s",
"-X", "PUT",
"-d", "test",
f"{bucket_url}{test_file}"
],
timeout=10
)
if write_result.success and '200' in write_result.output:
bucket_info['writable'] = True
bucket_info['severity'] = 'critical'
# Clean up test file
await self.executor.execute(
"curl",
["-s", "-X", "DELETE", f"{bucket_url}{test_file}"],
timeout=10
)
buckets_found.append(bucket_info)
# Save findings
if buckets_found:
high_severity = [b for b in buckets_found if b['severity'] in ['high', 'critical']]
overall_severity = 'critical' if any(b['writable'] for b in buckets_found) else \
'high' if high_severity else 'medium'
await self.db.save_finding(
program_id=program_id,
scan_id=scan_id,
finding_type="s3_misconfiguration",
severity=overall_severity,
title=f"S3 Bucket Exposure for {target}",
description=f"Found {len(buckets_found)} S3 buckets, {len(high_severity)} with public access",
evidence=json.dumps(buckets_found, indent=2),
url=target,
)
return {
'success': True,
'scan_id': scan_id,
'target': target,
'buckets_found': len(buckets_found),
'buckets': buckets_found,
'listable_buckets': [b for b in buckets_found if b.get('listable')],
'writable_buckets': [b for b in buckets_found if b.get('writable')],
}
except Exception as e:
logger.error(f"Error in S3 scan: {str(e)}")
return {'success': False, 'error': str(e)}
async def jwt_analyzer(
self,
program_id: str,
token: str,
url: Optional[str] = None,
) -> Dict[str, Any]:
"""Analyze JWT token for security issues.
Args:
program_id: Program identifier
token: JWT token to analyze
url: Optional URL where token was found
Returns:
Dictionary with JWT analysis results
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
scan_id = str(uuid.uuid4())
vulnerabilities = []
try:
# Parse JWT
parts = token.split('.')
if len(parts) != 3:
return {'success': False, 'error': 'Invalid JWT format (should have 3 parts)'}
header_b64, payload_b64, signature = parts
# Decode header and payload
import base64
# Add padding if needed
header_b64 += '=' * (4 - len(header_b64) % 4)
payload_b64 += '=' * (4 - len(payload_b64) % 4)
try:
header = json.loads(base64.urlsafe_b64decode(header_b64))
payload = json.loads(base64.urlsafe_b64decode(payload_b64))
except Exception as e:
return {'success': False, 'error': f'Failed to decode JWT: {str(e)}'}
# Check for vulnerabilities
# 1. None algorithm
if header.get('alg', '').lower() == 'none':
vulnerabilities.append({
'severity': 'critical',
'type': 'JWT None Algorithm',
'description': 'Token uses "none" algorithm - signature not required',
'impact': 'Attacker can create arbitrary tokens without secret',
'exploit': 'Remove signature and set alg to "none"',
})
# 2. Weak algorithm
weak_algs = ['HS256', 'HS384', 'HS512']
if header.get('alg') in weak_algs:
vulnerabilities.append({
'severity': 'medium',
'type': 'JWT Weak Algorithm',
'description': f'Token uses symmetric algorithm {header.get("alg")}',
'impact': 'Vulnerable to brute force attacks on weak secrets',
'recommendation': 'Use asymmetric algorithms (RS256, ES256)',
})
# 3. Algorithm confusion (RS256 → HS256)
if header.get('alg') == 'RS256':
vulnerabilities.append({
'severity': 'high',
'type': 'Potential Algorithm Confusion',
'description': 'RS256 algorithm may be vulnerable to HS256 confusion',
'impact': 'If server accepts HS256, attacker can sign with public key',
'exploit': 'Change alg to HS256 and sign with RS256 public key',
})
# 4. Missing expiration
if 'exp' not in payload:
vulnerabilities.append({
'severity': 'medium',
'type': 'JWT Missing Expiration',
'description': 'Token has no expiration time (exp claim)',
'impact': 'Token valid indefinitely, cannot be revoked',
'recommendation': 'Add exp claim with reasonable timeout',
})
# 5. Long expiration
import time
if 'exp' in payload:
exp_time = payload['exp']
current_time = int(time.time())
time_diff = exp_time - current_time
if time_diff > 86400 * 30: # 30 days
vulnerabilities.append({
'severity': 'low',
'type': 'JWT Long Expiration',
'description': f'Token valid for {time_diff // 86400} days',
'impact': 'Long-lived tokens increase exposure window',
'recommendation': 'Use shorter expiration (1 hour recommended)',
})
# 6. Sensitive data in payload
sensitive_keys = ['password', 'secret', 'token', 'api_key', 'private_key']
for key in payload.keys():
if any(sensitive in key.lower() for sensitive in sensitive_keys):
vulnerabilities.append({
'severity': 'high',
'type': 'JWT Sensitive Data Exposure',
'description': f'Token contains sensitive claim: {key}',
'impact': 'Sensitive data exposed in unencrypted JWT payload',
'recommendation': 'Remove sensitive data from JWT or use JWE',
})
# 7. Missing standard claims
if 'aud' not in payload:
vulnerabilities.append({
'severity': 'info',
'type': 'JWT Missing Audience',
'description': 'Token missing audience (aud) claim',
'impact': 'Token not tied to specific audience, may be misused',
'recommendation': 'Add aud claim to restrict token usage',
})
# Save findings if critical/high
if any(v['severity'] in ['critical', 'high'] for v in vulnerabilities):
overall_severity = 'critical' if any(v['severity'] == 'critical' for v in vulnerabilities) else 'high'
await self.db.save_finding(
program_id=program_id,
scan_id=scan_id,
finding_type="jwt_vulnerability",
severity=overall_severity,
title="JWT Security Issues Detected",
description=f"Found {len(vulnerabilities)} JWT security issues",
evidence=json.dumps({
'header': header,
'payload': payload,
'vulnerabilities': vulnerabilities,
}, indent=2),
url=url or "N/A",
)
return {
'success': True,
'scan_id': scan_id,
'header': header,
'payload': payload,
'algorithm': header.get('alg'),
'vulnerabilities_found': len(vulnerabilities),
'vulnerabilities': vulnerabilities,
'is_expired': payload.get('exp', 0) < int(time.time()) if 'exp' in payload else None,
}
except Exception as e:
logger.error(f"Error in JWT analysis: {str(e)}")
return {'success': False, 'error': str(e)}