import logging
from typing import Dict, Any
logger = logging.getLogger("ChainReactor")
class ChainReactor:
"""
The 'P1 Maker'.
Suggests and simulates attack chains based on initial findings.
Focuses on escalating Low/Medium findings to High/Critical.
"""
async def suggest_chains(self, finding_type: str, context: str) -> Dict[str, Any]:
"""
Returns actionable attack chains for a given vulnerability type.
"""
chains = []
if finding_type.lower() == "xss":
chains.append({
"name": "Account Takeover via XSS",
"impact": "Critical (P1)",
"steps": [
"1. Verify XSS execution context (console.log).",
"2. Inject payload to read 'document.cookie' or 'localStorage'.",
"3. If HttpOnly is set, use XHR/Fetch to perform actions as victim (CSRF via XSS).",
"4. Target: Change Password or Email endpoints."
]
})
chains.append({
"name": "Phishing via HTML Injection (App Appearance)",
"impact": "High (P2)",
"steps": [
"1. Inject realistic login form overlay.",
"2. Send credentials to attacker server."
]
})
elif finding_type.lower() == "open_redirect":
chains.append({
"name": "OAuth Token Theft (ATO)",
"impact": "Critical (P1)",
"steps": [
"1. Find OAuth login flow (Sign in with Google/SSO).",
"2. Manipulate 'redirect_uri' to point to the Vulnerable Open Redirect Endpoint.",
"3. Append attacker domain to the Open Redirect.",
"4. If successful, the OAuth token leaks to attacker server."
]
})
chains.append({
"name": "SSRF via Redirect",
"impact": "High (P2)",
"steps": [
"1. Point redirect to internal service (e.g., 127.0.0.1:8080 or metadata).",
"2. If server-side client follows redirects, you may access internal network."
]
})
elif finding_type.lower() == "lfi":
chains.append({
"name": "RCE via Log Poisoning",
"impact": "Critical (P1)",
"steps": [
"1. Inject PHP payload into User-Agent or URL.",
"2. Trigger error or access to ensure it's logged (/var/log/apache2/access.log).",
"3. Include the log file via LFI to execute payload."
]
})
chains.append({
"name": "RCE via /proc/self/environ",
"impact": "Critical (P1)",
"steps": [
"1. Inject payload into User-Agent.",
"2. Include /proc/self/environ via LFI."
]
})
elif finding_type.lower() == "ssrf":
chains.append({
"name": "Cloud Account Takeover",
"impact": "Critical (P1)",
"steps": [
"1. Target Cloud Metadata (169.254.169.254).",
"2. Extract IAM Role Credentials (AWS) or Service Account Token (GCP).",
"3. Use CLI to authenticate and dump data/infrastructure."
]
})
return {
"initial_finding": finding_type,
"potential_chains": chains,
"advice": "Always demonstrate IMPACT. A pop-up alert(1) is info/low. Stealing a session token is Critical."
}