Skip to main content
Glama
profile_transformation.py25 kB
"""Adversary/Defender profile transformation tools for Ludus MCP server. This module provides tools to transform range configurations by: - Adversary Profile: Injecting realistic vulnerabilities for red team training - Defender Profile: Adding monitoring and detection capabilities for blue team training """ from typing import Any from fastmcp import FastMCP from ludus_mcp.core.client import LudusAPIClient from ludus_mcp.server.handlers.profile_transformer import ProfileTransformerHandler from ludus_mcp.server.tools.utils import format_tool_response from ludus_mcp.utils.logging import get_logger logger = get_logger(__name__) def create_profile_transformation_tools(client: LudusAPIClient) -> FastMCP: """Create adversary/defender profile transformation tools. Args: client: Ludus API client Returns: FastMCP instance with profile transformation tools registered """ mcp = FastMCP("Adversary/Defender Profiles") # ==================== ADVERSARY PROFILE ==================== @mcp.tool() async def apply_adversary_profile( config: dict[str, Any], threat_level: str = "medium", target_vms: list[str] | None = None, include_documentation: bool = True, ) -> dict: """Apply adversary profile to inject realistic vulnerabilities for red team training. This tool transforms an existing Ludus configuration by injecting educational vulnerabilities that simulate real-world security weaknesses. Perfect for: - Red team training and practice - Security testing scenarios - Learning exploitation techniques - Purple team exercises Args: config: The Ludus range configuration to transform (dict with 'ludus' key) threat_level: Level of vulnerabilities to inject - "low": Basic weaknesses (weak passwords, open shares) - "medium": AD attacks (Kerberoasting, AS-REP roasting, weak GPO) - "high": Advanced attacks (unconstrained delegation, certificate vulns) target_vms: Optional list of specific VM hostnames to target (None = all VMs) include_documentation: Generate educational docs explaining vulnerabilities Returns: Dictionary containing: - status: "success" - profile_type: "adversary" - threat_level: The applied threat level - modified_config: Transformed Ludus configuration - vulnerability_injections: List of injected vulnerabilities - vulnerabilities_count: Total number of vulnerabilities - affected_vms: List of VMs that were modified - documentation: Educational documentation (if requested) - warnings: Important safety warnings - next_steps: What to do next Examples: # Apply medium-level vulnerabilities to all VMs result = await apply_adversary_profile( config=my_range_config, threat_level="medium" ) # Apply high-level vulnerabilities only to domain controllers result = await apply_adversary_profile( config=my_range_config, threat_level="high", target_vms=["DC01", "DC02"] ) # Apply low-level vulnerabilities without documentation result = await apply_adversary_profile( config=my_range_config, threat_level="low", include_documentation=False ) Vulnerability Categories: Active Directory: - Weak domain passwords - Kerberoasting opportunities - AS-REP roasting - Unconstrained delegation - DCSyn rights misconfigurations - Certificate Services vulnerabilities Windows Security: - Weak local administrator passwords - Unquoted service paths - Weak ACLs and permissions - AlwaysInstallElevated - Cached credentials Network Security: - Open SMB shares - Exposed RDP services - LLMNR poisoning opportunities - Weak network segmentation Notes: ⚠️ These vulnerabilities are for EDUCATIONAL PURPOSES ONLY ⚠️ Deploy ONLY in isolated lab environments ⚠️ NEVER expose these systems to production or the internet The modified configuration uses Ansible roles to implement vulnerabilities: - ludus_ad_weak_passwords - ludus_ad_kerberoast - ludus_ad_asreproast - ludus_ad_unconstrained_delegation - ludus_weak_local_admin - ludus_unquoted_service_paths - ludus_open_shares """ handler = ProfileTransformerHandler(client) result = await handler.apply_adversary_profile( config=config, threat_level=threat_level, target_vms=target_vms, include_documentation=include_documentation, ) return format_tool_response(result) # ==================== DEFENDER PROFILE ==================== @mcp.tool() async def apply_defender_profile( config: dict[str, Any], monitoring_level: str = "comprehensive", siem_type: str = "wazuh", detection_focus: list[str] | None = None, ) -> dict: """Apply defender profile to add monitoring and detection capabilities for blue team training. This tool enhances an existing Ludus configuration with comprehensive security monitoring, logging, and detection capabilities. Perfect for: - Blue team training and SOC practice - SIEM deployment and configuration - Threat hunting exercises - Purple team defensive exercises - Incident detection and response training Args: config: The Ludus range configuration to enhance (dict with 'ludus' key) monitoring_level: Level of monitoring to deploy - "basic": SIEM agent + event forwarding - "comprehensive": + Sysmon/Auditd + PowerShell logging - "advanced": + EDR, process monitoring, FIM, network monitoring siem_type: SIEM platform to deploy - "wazuh": Wazuh (open source SIEM/XDR) - "splunk": Splunk Enterprise - "elastic": Elastic Stack (ELK) detection_focus: Optional list of specific attack types to focus detection on (e.g., ["kerberos_attacks", "lateral_movement", "credential_access"]) Returns: Dictionary containing: - status: "success" - profile_type: "defender" - monitoring_level: The applied monitoring level - siem_type: The SIEM platform used - modified_config: Enhanced Ludus configuration - monitoring_enhancements: List of added monitoring capabilities - enhancements_count: Total number of enhancements - siem_added: Whether SIEM server was added - detection_rules: List of detection rules configured - affected_vms: List of VMs that were enhanced - documentation: Implementation and usage documentation - next_steps: What to do after deployment Examples: # Add comprehensive Wazuh monitoring to all VMs result = await apply_defender_profile( config=my_range_config, monitoring_level="comprehensive", siem_type="wazuh" ) # Add advanced monitoring with Splunk result = await apply_defender_profile( config=my_range_config, monitoring_level="advanced", siem_type="splunk" ) # Focus detection on Kerberos attacks and lateral movement result = await apply_defender_profile( config=my_range_config, monitoring_level="comprehensive", siem_type="wazuh", detection_focus=["kerberos_attacks", "lateral_movement"] ) Monitoring Capabilities: Windows Monitoring: - SIEM agent (Wazuh/Splunk/Elastic) - Event log forwarding - Sysmon (process, network, file monitoring) - PowerShell script block logging - EDR capabilities (advanced level) - Process monitoring (advanced level) - File integrity monitoring (advanced level) Linux Monitoring: - SIEM agent (Wazuh/Splunk/Elastic) - Syslog forwarding - Auditd (system call auditing) - OSQuery (endpoint visibility) - EDR capabilities (advanced level) - Process monitoring (advanced level) - File integrity monitoring (advanced level) Network Monitoring: - NetFlow collection - Packet capture - Zeek network security monitor (advanced level) - Suricata IDS/IPS (advanced level) Detection Rules: - Kerberoasting detection (Event ID 4769) - Lateral movement detection (PsExec, WMI) - Credential dumping (LSASS access, Mimikatz) - PowerShell obfuscation - Suspicious process execution - And more based on detection_focus Notes: - SIEM server will be automatically added if not present - All VMs will have monitoring agents installed - Detection rules are pre-configured but customizable - Dashboard and alerting must be configured post-deployment """ handler = ProfileTransformerHandler(client) result = await handler.apply_defender_profile( config=config, monitoring_level=monitoring_level, siem_type=siem_type, detection_focus=detection_focus, ) return format_tool_response(result) # ==================== PROFILE UTILITIES ==================== @mcp.tool() async def list_available_profiles() -> dict: """List all available adversary and defender profiles with descriptions. Returns detailed information about all available security profiles, their capabilities, and use cases. Returns: Dictionary containing: - adversary_profiles: Available threat levels and capabilities - defender_profiles: Available monitoring levels and capabilities - siem_types: Supported SIEM platforms - use_cases: Common use cases for each profile combination Example: profiles = await list_available_profiles() print(profiles['adversary_profiles']['medium']['capabilities']) """ profiles = { "status": "success", "adversary_profiles": { "low": { "name": "Low Threat Level", "description": "Basic security weaknesses suitable for beginners", "capabilities": [ "Weak passwords", "Open SMB shares", "Outdated software", "Weak local admin passwords", "Open RDP exposure", ], "use_cases": [ "Beginner red team training", "Basic vulnerability scanning", "Introduction to exploitation", ], }, "medium": { "name": "Medium Threat Level", "description": "Realistic AD and Windows misconfigurations", "capabilities": [ "All low-level vulnerabilities", "Kerberoasting opportunities", "AS-REP roasting", "Weak GPO configurations", "Unquoted service paths", "Weak ACLs", "LLMNR poisoning", ], "use_cases": [ "Intermediate red team training", "Active Directory exploitation practice", "Lateral movement exercises", "Purple team assessments", ], }, "high": { "name": "High Threat Level", "description": "Advanced attack paths and complex vulnerabilities", "capabilities": [ "All medium-level vulnerabilities", "Unconstrained delegation", "DCSyn rights misconfigurations", "Forest trust weaknesses", "Certificate Services vulnerabilities", "Zerologon-vulnerable configurations", "AlwaysInstallElevated", "Cached credentials", "DNS zone transfers", "NTLM relay opportunities", ], "use_cases": [ "Advanced red team training", "Complex AD attack path practice", "Enterprise penetration testing simulation", "Advanced purple team exercises", ], }, }, "defender_profiles": { "basic": { "name": "Basic Monitoring", "description": "Fundamental logging and SIEM integration", "capabilities": { "windows": ["SIEM agent", "Event log forwarding"], "linux": ["SIEM agent", "Syslog forwarding"], "network": [], }, "use_cases": [ "Basic SOC operations", "Log aggregation", "Simple alerting", ], }, "comprehensive": { "name": "Comprehensive Monitoring", "description": "Advanced logging with detailed visibility", "capabilities": { "windows": [ "SIEM agent", "Event log forwarding", "Sysmon", "PowerShell logging", ], "linux": [ "SIEM agent", "Syslog forwarding", "Auditd", "OSQuery", ], "network": ["NetFlow", "Packet capture"], }, "use_cases": [ "SOC analyst training", "Threat hunting", "Incident detection", "Purple team defense", ], }, "advanced": { "name": "Advanced Monitoring", "description": "Enterprise-grade SOC stack with comprehensive visibility", "capabilities": { "windows": [ "All comprehensive capabilities", "EDR", "Process monitoring", "File integrity monitoring", ], "linux": [ "All comprehensive capabilities", "EDR", "Process monitoring", "File integrity monitoring", ], "network": [ "All comprehensive capabilities", "Zeek", "Suricata IDS/IPS", ], }, "use_cases": [ "Advanced SOC operations", "Enterprise threat hunting", "Complex incident response", "Full purple team exercises", ], }, }, "siem_types": { "wazuh": { "name": "Wazuh", "description": "Open-source SIEM and XDR platform", "cost": "Free / Open Source", "features": [ "Host-based intrusion detection", "Log analysis", "File integrity monitoring", "Vulnerability detection", "Compliance monitoring", ], }, "splunk": { "name": "Splunk Enterprise", "description": "Enterprise SIEM and data analytics platform", "cost": "Commercial / Free trial", "features": [ "Advanced log analysis", "Machine learning", "Custom dashboards", "Extensive app ecosystem", "Advanced correlation", ], }, "elastic": { "name": "Elastic Stack (ELK)", "description": "Elasticsearch, Logstash, Kibana stack", "cost": "Open Source / Commercial features", "features": [ "Full-text search", "Real-time analysis", "Custom visualizations", "Scalable architecture", "API-driven", ], }, }, "purple_team_workflow": { "description": "Combine adversary and defender profiles for complete purple team training", "steps": [ "1. Generate base configuration with generate_config_from_description()", "2. Apply adversary profile to inject vulnerabilities", "3. Apply defender profile to add monitoring", "4. Deploy the purple team range", "5. Practice: Red team attacks → Blue team detects → Improve defenses", ], }, } return format_tool_response(profiles) @mcp.tool() async def preview_profile_changes( config: dict[str, Any], profile_type: str, profile_level: str, ) -> dict: """Preview changes that would be made by applying a profile without modifying the config. This is a dry-run tool that shows what would happen if you applied a profile, without actually modifying the configuration. Useful for understanding the impact before committing to changes. Args: config: The Ludus range configuration to analyze profile_type: Type of profile to preview ("adversary" or "defender") profile_level: Level to preview (threat level for adversary, monitoring level for defender) Returns: Dictionary containing: - status: "success" - profile_type: The profile type previewed - profile_level: The level previewed - affected_vms: List of VMs that would be modified - changes_summary: Summary of changes - ansible_roles_added: List of Ansible roles that would be added - estimated_impact: Estimated resource and complexity impact - recommendations: Recommendations based on the preview Examples: # Preview medium adversary profile preview = await preview_profile_changes( config=my_config, profile_type="adversary", profile_level="medium" ) # Preview advanced defender profile preview = await preview_profile_changes( config=my_config, profile_type="defender", profile_level="advanced" ) """ vms = config.get("ludus", []) vm_names = [vm.get("hostname", "Unknown") for vm in vms] if profile_type == "adversary": changes = { "low": ["Weak passwords on all domain-joined systems", "Open SMB shares"], "medium": [ "Weak passwords", "Kerberoasting on service accounts", "AS-REP roasting opportunities", "Unquoted service paths on workstations", ], "high": [ "All medium-level vulnerabilities", "Unconstrained delegation on specific computers", "DCSyn rights for specific users", "Advanced AD misconfigurations", ], } roles = { "low": ["ludus_ad_weak_passwords", "ludus_open_shares"], "medium": [ "ludus_ad_weak_passwords", "ludus_ad_kerberoast", "ludus_ad_asreproast", "ludus_unquoted_service_paths", ], "high": [ "ludus_ad_weak_passwords", "ludus_ad_kerberoast", "ludus_ad_asreproast", "ludus_ad_unconstrained_delegation", "ludus_unquoted_service_paths", "ludus_weak_local_admin", ], } else: # defender changes = { "basic": ["SIEM agent on all VMs", "Event log forwarding"], "comprehensive": [ "SIEM agents", "Sysmon on Windows VMs", "Auditd on Linux VMs", "PowerShell logging", ], "advanced": [ "All comprehensive monitoring", "EDR on all endpoints", "File integrity monitoring", "Network flow monitoring", "Zeek and Suricata", ], } roles = { "basic": ["ludus_wazuh_agent"], "comprehensive": [ "ludus_wazuh_agent", "ludus_sysmon", "ludus_auditd", "ludus_powershell_logging", ], "advanced": [ "ludus_wazuh_agent", "ludus_sysmon", "ludus_auditd", "ludus_powershell_logging", "ludus_edr", "ludus_fim", ], } result = { "status": "success", "profile_type": profile_type, "profile_level": profile_level, "affected_vms": vm_names, "vm_count": len(vms), "changes_summary": changes.get(profile_level, []), "ansible_roles_added": roles.get(profile_level, []), "estimated_impact": { "resource_increase": "low" if profile_level in ["low", "basic"] else "medium" if profile_level in ["medium", "comprehensive"] else "high", "deployment_time_increase": "5-10 minutes" if profile_level in ["low", "basic"] else "10-20 minutes" if profile_level in ["medium", "comprehensive"] else "20-30 minutes", "complexity": profile_level, }, "recommendations": [ f"This will modify all {len(vms)} VMs in your configuration", f"Estimated additional deployment time: {5 if profile_level in ['low', 'basic'] else 15 if profile_level in ['medium', 'comprehensive'] else 25} minutes", "Review the changes_summary and ansible_roles_added before applying", f"Use apply_{profile_type}_profile() to apply these changes", ], } return format_tool_response(result) return mcp

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server