Skip to main content
Glama
profile_transformer.py53.6 kB
"""Handler for applying Adversary/Defender profiles to range configurations. This handler transforms existing configurations by: - Adversary Profile: Injecting realistic vulnerabilities for red team training - Defender Profile: Adding monitoring, logging, and detection capabilities """ from typing import Any import copy import yaml from ludus_mcp.core.client import LudusAPIClient from ludus_mcp.utils.logging import get_logger logger = get_logger(__name__) class ProfileTransformerHandler: """Handler for applying security profiles to configurations.""" def __init__(self, client: LudusAPIClient) -> None: """Initialize the profile transformer handler.""" self.client = client async def apply_adversary_profile( self, config: dict[str, Any], threat_level: str = "medium", target_vms: list[str] | None = None, include_documentation: bool = True, ) -> dict[str, Any]: """Apply adversary profile to inject realistic vulnerabilities. Args: config: Original Ludus configuration threat_level: Vulnerability level (low, medium, high) target_vms: Specific VMs to target (None = all) include_documentation: Include educational docs about vulnerabilities Returns: Dictionary with modified config and vulnerability documentation """ logger.info(f"Applying adversary profile (threat_level={threat_level})") modified_config = copy.deepcopy(config) injections = [] vms = modified_config.get("ludus", []) # Determine vulnerability sets based on threat level vuln_sets = self._get_vulnerability_sets(threat_level) for vm in vms: vm_name = vm.get("vm_name", "") hostname = vm.get("hostname", "") # Skip if not in target list if target_vms and hostname not in target_vms: continue # Determine VM type vm_type = self._determine_vm_type(vm) # Apply vulnerabilities based on VM type if vm_type == "domain_controller": vm_injections = self._inject_dc_vulnerabilities(vm, vuln_sets, threat_level) injections.extend(vm_injections) elif vm_type == "workstation": vm_injections = self._inject_workstation_vulnerabilities(vm, vuln_sets, threat_level) injections.extend(vm_injections) elif vm_type == "server": vm_injections = self._inject_server_vulnerabilities(vm, vuln_sets, threat_level) injections.extend(vm_injections) # Generate documentation documentation = "" if include_documentation: documentation = self._generate_adversary_documentation(injections, threat_level) result = { "status": "success", "profile_type": "adversary", "threat_level": threat_level, "modified_config": modified_config, "vulnerability_injections": injections, "vulnerabilities_count": len(injections), "affected_vms": list(set(inj["vm_name"] for inj in injections)), "documentation": documentation, "warnings": [ "⚠️ These vulnerabilities are for EDUCATIONAL PURPOSES ONLY", "⚠️ Deploy in isolated environments only", "⚠️ Do NOT expose these systems to the internet", ], "next_steps": [ "1. Review the vulnerability injections and documentation", "2. Understand each vulnerability before deploying", "3. Deploy with: deploy_range(config=modified_config)", "4. Practice exploitation in controlled environment", ], } return result async def apply_defender_profile( self, config: dict[str, Any], monitoring_level: str = "comprehensive", siem_type: str = "wazuh", detection_focus: list[str] | None = None, ) -> dict[str, Any]: """Apply defender profile to add monitoring and detection capabilities. Args: config: Original Ludus configuration monitoring_level: Level of monitoring (basic, comprehensive, advanced) siem_type: SIEM to deploy (wazuh, splunk, elastic) detection_focus: Specific attack types to focus detection on Returns: Dictionary with modified config and monitoring documentation """ logger.info(f"Applying defender profile (level={monitoring_level}, siem={siem_type})") modified_config = copy.deepcopy(config) enhancements = [] vms = modified_config.get("ludus", []) # Get monitoring capabilities based on level monitoring_caps = self._get_monitoring_capabilities(monitoring_level) for vm in vms: vm_name = vm.get("vm_name", "") hostname = vm.get("hostname", "") template = vm.get("template", "") # Determine OS type is_windows = "win" in template.lower() is_linux = any(x in template.lower() for x in ["ubuntu", "debian", "centos", "rhel", "kali"]) # Apply monitoring based on OS if is_windows: vm_enhancements = self._add_windows_monitoring( vm, monitoring_caps, monitoring_level, siem_type ) enhancements.extend(vm_enhancements) elif is_linux: vm_enhancements = self._add_linux_monitoring( vm, monitoring_caps, monitoring_level, siem_type ) enhancements.extend(vm_enhancements) # Add SIEM server if not present siem_added = self._ensure_siem_server(modified_config, siem_type, monitoring_level) # Add detection rules detection_rules = self._add_detection_rules(detection_focus or [], siem_type) # Generate documentation documentation = self._generate_defender_documentation( enhancements, siem_type, monitoring_level, detection_rules ) result = { "status": "success", "profile_type": "defender", "monitoring_level": monitoring_level, "siem_type": siem_type, "modified_config": modified_config, "monitoring_enhancements": enhancements, "enhancements_count": len(enhancements), "siem_added": siem_added, "detection_rules": detection_rules, "affected_vms": list(set(enh["vm_name"] for enh in enhancements)), "documentation": documentation, "next_steps": [ "1. Review the monitoring enhancements", f"2. Ensure {siem_type.title()} SIEM is properly configured", "3. Deploy with: deploy_range(config=modified_config)", "4. Configure detection rules and alerting", "5. Practice threat hunting and incident response", ], } return result def _determine_vm_type(self, vm: dict[str, Any]) -> str: """Determine the type of VM.""" if vm.get("domain", {}).get("role") == "primary-dc": return "domain_controller" elif "workstation" in vm.get("template", "").lower() or "win10" in vm.get("template", "").lower(): return "workstation" elif "server" in vm.get("template", "").lower(): return "server" return "other" def _get_vulnerability_sets(self, threat_level: str) -> dict[str, list[str]]: """Get vulnerability sets based on threat level.""" vuln_sets = { "low": { "ad": ["weak_passwords", "open_shares"], "windows": ["outdated_software", "weak_local_admin"], "network": ["open_smb", "rdp_exposed"], }, "medium": { "ad": ["weak_passwords", "open_shares", "kerberoasting", "asrep_roasting", "weak_gpo"], "windows": ["outdated_software", "weak_local_admin", "unquoted_service_paths", "weak_acls"], "network": ["open_smb", "rdp_exposed", "llmnr_poisoning", "weak_network_segmentation"], }, "high": { "ad": [ "weak_passwords", "open_shares", "kerberoasting", "asrep_roasting", "weak_gpo", "unconstrained_delegation", "dcsync_rights", "forest_trusts", "certificate_services_vulns", "zerologon_vulnerable" ], "windows": [ "outdated_software", "weak_local_admin", "unquoted_service_paths", "weak_acls", "alwaysinstallelevated", "cached_credentials", "autologon_creds" ], "network": [ "open_smb", "rdp_exposed", "llmnr_poisoning", "weak_network_segmentation", "dns_zone_transfers", "ipv6_enabled", "ntlm_relay_possible" ], }, "critical": { "ad": [ "weak_passwords", "open_shares", "kerberoasting", "asrep_roasting", "weak_gpo", "unconstrained_delegation", "dcsync_rights", "forest_trusts", "certificate_services_vulns", "zerologon_vulnerable", "golden_ticket", "silver_ticket", "skeleton_key", "dcshadow", "krbtgt_backdoor" ], "windows": [ "outdated_software", "weak_local_admin", "unquoted_service_paths", "weak_acls", "alwaysinstallelevated", "cached_credentials", "autologon_creds", "dll_hijacking", "com_hijacking", "print_spooler_vulns", "smbghost", "petitpotam", "registry_autoruns", "scheduled_task_persistence" ], "network": [ "open_smb", "rdp_exposed", "llmnr_poisoning", "weak_network_segmentation", "dns_zone_transfers", "ipv6_enabled", "ntlm_relay_possible", "smb_signing_disabled", "ldap_signing_disabled", "responder_poisoning" ], "persistence": [ "wmi_subscriptions", "registry_run_keys", "startup_folder", "service_creation", "dll_sideloading", "bits_jobs" ], "exfiltration": [ "dns_tunneling", "cloud_storage_abuse", "email_exfil", "ftp_exfil" ], }, } return vuln_sets.get(threat_level, vuln_sets["medium"]) def _inject_dc_vulnerabilities( self, vm: dict[str, Any], vuln_sets: dict[str, list[str]], threat_level: str ) -> list[dict[str, Any]]: """Inject vulnerabilities into a domain controller.""" injections = [] vm_name = vm.get("vm_name", "") ad_vulns = vuln_sets.get("ad", []) # Initialize roles if not present if "roles" not in vm: vm["roles"] = [] if "weak_passwords" in ad_vulns: vm["roles"].append("ludus_ad_weak_passwords") injections.append({ "vm_name": vm_name, "vulnerability_type": "weak_passwords", "category": "Active Directory", "severity": "high", "ansible_role": "ludus_ad_weak_passwords", "description": "Domain users have weak, easily guessable passwords", "exploitation": "Password spraying, brute force attacks", "cve": None, }) if "kerberoasting" in ad_vulns: vm["roles"].append("ludus_ad_kerberoast") injections.append({ "vm_name": vm_name, "vulnerability_type": "kerberoasting", "category": "Active Directory", "severity": "high", "ansible_role": "ludus_ad_kerberoast", "description": "Service accounts with SPNs have weak passwords", "exploitation": "Kerberoasting attack to crack service account passwords", "cve": None, }) if "asrep_roasting" in ad_vulns: vm["roles"].append("ludus_ad_asreproast") injections.append({ "vm_name": vm_name, "vulnerability_type": "asrep_roasting", "category": "Active Directory", "severity": "medium", "ansible_role": "ludus_ad_asreproast", "description": "User accounts with 'Do not require Kerberos preauthentication' enabled", "exploitation": "AS-REP roasting to obtain crackable hashes", "cve": None, }) if "unconstrained_delegation" in ad_vulns: vm["roles"].append("ludus_ad_unconstrained_delegation") injections.append({ "vm_name": vm_name, "vulnerability_type": "unconstrained_delegation", "category": "Active Directory", "severity": "critical", "ansible_role": "ludus_ad_unconstrained_delegation", "description": "Computer accounts configured with unconstrained delegation", "exploitation": "Abuse unconstrained delegation to compromise domain", "cve": None, }) return injections def _inject_workstation_vulnerabilities( self, vm: dict[str, Any], vuln_sets: dict[str, list[str]], threat_level: str ) -> list[dict[str, Any]]: """Inject vulnerabilities into a workstation.""" injections = [] vm_name = vm.get("vm_name", "") windows_vulns = vuln_sets.get("windows", []) if "roles" not in vm: vm["roles"] = [] if "weak_local_admin" in windows_vulns: vm["roles"].append("ludus_weak_local_admin") injections.append({ "vm_name": vm_name, "vulnerability_type": "weak_local_admin", "category": "Windows Security", "severity": "high", "ansible_role": "ludus_weak_local_admin", "description": "Local administrator account with weak password", "exploitation": "Local privilege escalation, lateral movement", "cve": None, }) if "unquoted_service_paths" in windows_vulns: vm["roles"].append("ludus_unquoted_service_paths") injections.append({ "vm_name": vm_name, "vulnerability_type": "unquoted_service_paths", "category": "Windows Security", "severity": "medium", "ansible_role": "ludus_unquoted_service_paths", "description": "Windows services with unquoted executable paths", "exploitation": "Privilege escalation via DLL hijacking", "cve": None, }) return injections def _inject_server_vulnerabilities( self, vm: dict[str, Any], vuln_sets: dict[str, list[str]], threat_level: str ) -> list[dict[str, Any]]: """Inject vulnerabilities into a server.""" injections = [] vm_name = vm.get("vm_name", "") if "roles" not in vm: vm["roles"] = [] # Add common server vulnerabilities vm["roles"].append("ludus_open_shares") injections.append({ "vm_name": vm_name, "vulnerability_type": "open_shares", "category": "Network Security", "severity": "medium", "ansible_role": "ludus_open_shares", "description": "SMB shares with weak or no authentication", "exploitation": "Unauthorized file access, credential harvesting", "cve": None, }) return injections def _get_monitoring_capabilities(self, monitoring_level: str) -> dict[str, list[str]]: """Get monitoring capabilities based on level.""" capabilities = { "basic": { "windows": ["siem_agent", "event_forwarding"], "linux": ["siem_agent", "syslog_forwarding"], "network": [], }, "comprehensive": { "windows": ["siem_agent", "event_forwarding", "sysmon", "powershell_logging"], "linux": ["siem_agent", "syslog_forwarding", "auditd", "osquery"], "network": ["netflow", "packet_capture"], }, "advanced": { "windows": [ "siem_agent", "event_forwarding", "sysmon", "powershell_logging", "edr", "process_monitoring", "file_integrity_monitoring" ], "linux": [ "siem_agent", "syslog_forwarding", "auditd", "osquery", "edr", "process_monitoring", "file_integrity_monitoring" ], "network": ["netflow", "packet_capture", "zeek", "suricata"], }, "elite": { "windows": [ "siem_agent", "event_forwarding", "sysmon", "powershell_logging", "edr", "process_monitoring", "file_integrity_monitoring", "threat_hunting_tools", "memory_forensics", "behavioral_analytics", "automated_response", "deception_technology" ], "linux": [ "siem_agent", "syslog_forwarding", "auditd", "osquery", "edr", "process_monitoring", "file_integrity_monitoring", "threat_hunting_tools", "memory_forensics", "behavioral_analytics", "automated_response", "honeypot_integration" ], "network": [ "netflow", "packet_capture", "zeek", "suricata", "network_taps", "ssl_inspection", "dns_analytics", "threat_intel_feeds", "automated_blocking" ], "threat_intelligence": [ "misp_integration", "threat_feed_aggregation", "ioc_matching", "threat_actor_tracking", "ttp_correlation" ], "hunting": [ "sigma_rules", "yara_rules", "custom_hunting_queries", "baseline_deviation", "peer_group_analysis" ], }, } return capabilities.get(monitoring_level, capabilities["comprehensive"]) def _add_windows_monitoring( self, vm: dict[str, Any], monitoring_caps: dict, level: str, siem_type: str ) -> list[dict[str, Any]]: """Add Windows monitoring capabilities.""" enhancements = [] vm_name = vm.get("vm_name", "") windows_caps = monitoring_caps.get("windows", []) if "roles" not in vm: vm["roles"] = [] if "siem_agent" in windows_caps: agent_role = f"ludus_{siem_type}_agent" vm["roles"].append(agent_role) enhancements.append({ "vm_name": vm_name, "enhancement_type": "siem_agent", "category": "Logging", "ansible_role": agent_role, "description": f"{siem_type.title()} agent for centralized logging", "capabilities": ["Event log forwarding", "Real-time alerting"], }) if "sysmon" in windows_caps: vm["roles"].append("ludus_sysmon") enhancements.append({ "vm_name": vm_name, "enhancement_type": "sysmon", "category": "Advanced Logging", "ansible_role": "ludus_sysmon", "description": "Sysmon for detailed process and network logging", "capabilities": ["Process creation", "Network connections", "File creation"], }) if "powershell_logging" in windows_caps: vm["roles"].append("ludus_powershell_logging") enhancements.append({ "vm_name": vm_name, "enhancement_type": "powershell_logging", "category": "Advanced Logging", "ansible_role": "ludus_powershell_logging", "description": "Enhanced PowerShell script block and transcription logging", "capabilities": ["Script block logging", "Transcription", "Module logging"], }) return enhancements def _add_linux_monitoring( self, vm: dict[str, Any], monitoring_caps: dict, level: str, siem_type: str ) -> list[dict[str, Any]]: """Add Linux monitoring capabilities.""" enhancements = [] vm_name = vm.get("vm_name", "") linux_caps = monitoring_caps.get("linux", []) if "roles" not in vm: vm["roles"] = [] if "siem_agent" in linux_caps: agent_role = f"ludus_{siem_type}_agent" vm["roles"].append(agent_role) enhancements.append({ "vm_name": vm_name, "enhancement_type": "siem_agent", "category": "Logging", "ansible_role": agent_role, "description": f"{siem_type.title()} agent for centralized logging", "capabilities": ["Syslog forwarding", "Real-time alerting"], }) if "auditd" in linux_caps: vm["roles"].append("ludus_auditd") enhancements.append({ "vm_name": vm_name, "enhancement_type": "auditd", "category": "Advanced Logging", "ansible_role": "ludus_auditd", "description": "Linux audit framework for system call monitoring", "capabilities": ["File access", "System calls", "User activity"], }) return enhancements def _ensure_siem_server( self, config: dict[str, Any], siem_type: str, monitoring_level: str ) -> bool: """Ensure SIEM server is present in configuration.""" vms = config.get("ludus", []) # Check if SIEM already exists siem_exists = any(siem_type.lower() in vm.get("vm_name", "").lower() for vm in vms) if not siem_exists: # Add SIEM server siem_vm = { "vm_name": f"{{{{ range_id }}}}-{siem_type}", "hostname": siem_type.upper(), "template": "ubuntu-22-x64-server-template", "vlan": 10, "ip_last_octet": 100, "ram_gb": 8 if monitoring_level == "advanced" else 4, "cpus": 4 if monitoring_level == "advanced" else 2, "roles": [f"ludus_{siem_type}_server"], } vms.append(siem_vm) return True return False def _add_detection_rules( self, detection_focus: list[str], siem_type: str ) -> list[dict[str, Any]]: """Add detection rules based on focus areas.""" rules = [] common_rules = [ { "rule_name": "Kerberoasting Detection", "category": "Active Directory", "description": "Detects TGS requests for service accounts", "severity": "high", "indicators": ["Event ID 4769", "RC4 encryption", "Service accounts"], }, { "rule_name": "Lateral Movement Detection", "category": "Network", "description": "Detects PsExec and WMI-based lateral movement", "severity": "high", "indicators": ["Event ID 7045", "Service installation", "Remote execution"], }, { "rule_name": "Credential Dumping", "category": "Credential Access", "description": "Detects LSASS memory access and credential dumping", "severity": "critical", "indicators": ["LSASS process access", "Mimikatz indicators", "SAM access"], }, { "rule_name": "DCSync Attack Detection", "category": "Active Directory", "description": "Detects DCSync replication requests from non-DC machines", "severity": "critical", "indicators": ["Event ID 4662", "Replicating Directory Changes", "Non-DC source"], }, { "rule_name": "Golden Ticket Detection", "category": "Persistence", "description": "Detects anomalous Kerberos TGT characteristics", "severity": "critical", "indicators": ["Unusual ticket lifetime", "Forged PAC", "KRBTGT usage"], }, { "rule_name": "Pass-the-Hash Detection", "category": "Lateral Movement", "description": "Detects NTLM authentication from unusual sources", "severity": "high", "indicators": ["Event ID 4624 Type 3", "NTLM authentication", "Unusual source"], }, { "rule_name": "Ransomware Behavior", "category": "Impact", "description": "Detects rapid file encryption and shadow copy deletion", "severity": "critical", "indicators": ["Mass file modification", "vssadmin delete", "bcdedit /set"], }, { "rule_name": "Suspicious PowerShell", "category": "Execution", "description": "Detects encoded commands and download cradles", "severity": "high", "indicators": ["EncodedCommand", "DownloadString", "IEX", "Bypass"], }, { "rule_name": "Persistence via Registry", "category": "Persistence", "description": "Detects registry run key modifications", "severity": "medium", "indicators": ["HKLM\\Software\\Microsoft\\Windows\\CurrentVersion\\Run"], }, { "rule_name": "DNS Tunneling", "category": "Exfiltration", "description": "Detects anomalous DNS query patterns", "severity": "high", "indicators": ["High DNS volume", "Long subdomains", "Unusual TXT queries"], }, ] rules.extend(common_rules) return rules def get_apt_profile_configuration(self, apt_profile: str) -> dict[str, Any]: """Get configuration for specific APT profile.""" apt_configs = { "apt28": { "name": "APT28 (Fancy Bear)", "description": "Russian state-sponsored group focusing on credential theft and lateral movement", "threat_level": "critical", "focus_areas": ["credential_access", "lateral_movement", "persistence"], "techniques": [ "password_spraying", "kerberoasting", "golden_ticket", "dcsync", "rdp_hijacking", "wmi_lateral_movement" ], "tools": ["Mimikatz", "BloodHound", "Rubeus", "Impacket"], "persistence": ["wmi_subscriptions", "registry_run_keys", "service_creation"], }, "apt29": { "name": "APT29 (Cozy Bear)", "description": "Advanced persistent threat with focus on stealth and cloud exploitation", "threat_level": "critical", "focus_areas": ["persistence", "stealth", "cloud_exploitation"], "techniques": [ "dll_sideloading", "com_hijacking", "wmi_subscriptions", "cloud_token_theft", "oauth_abuse" ], "tools": ["Custom malware", "PowerShell Empire", "Cloud APIs"], "persistence": ["dll_hijacking", "scheduled_tasks", "bits_jobs"], }, "apt41": { "name": "APT41 (Double Dragon)", "description": "Dual espionage and financially motivated attacks", "threat_level": "high", "focus_areas": ["initial_access", "privilege_escalation", "exfiltration"], "techniques": [ "supply_chain_compromise", "web_shell", "credential_dumping", "data_staging", "cloud_storage_exfil" ], "tools": ["Custom RATs", "Web shells", "Credential dumpers"], "persistence": ["web_shells", "registry_autoruns", "service_creation"], }, "fin7": { "name": "FIN7 (Carbanak)", "description": "Financially motivated cybercrime group", "threat_level": "high", "focus_areas": ["credential_access", "lateral_movement", "exfiltration"], "techniques": [ "phishing", "password_theft", "pos_malware", "database_exfil", "payment_card_theft" ], "tools": ["Carbanak", "Cobalt Strike", "PowerShell", "Mimikatz"], "persistence": ["scheduled_tasks", "registry_run_keys"], }, "lazarus": { "name": "Lazarus Group", "description": "North Korean APT with destructive and financial motivations", "threat_level": "critical", "focus_areas": ["impact", "exfiltration", "credential_access"], "techniques": [ "disk_wiper", "ransomware", "cryptocurrency_theft", "swift_network_compromise", "destructive_malware" ], "tools": ["WannaCry", "Custom wipers", "Crypto miners"], "persistence": ["bootkit", "mbr_modification", "service_creation"], }, } return apt_configs.get(apt_profile, {}) def apply_threat_hunting_profile( self, config: dict[str, Any], hunting_profile: str, siem_type: str = "wazuh" ) -> dict[str, Any]: """Apply threat hunting profile for proactive detection.""" hunting_configs = { "hunter_lite": { "capabilities": ["scheduled_searches", "basic_queries", "known_ioc_matching"], "tools": ["sigma_rules", "yara_basic"], "automation_level": "low", "hunting_frequency": "weekly", }, "hunter_advanced": { "capabilities": [ "behavioral_analytics", "anomaly_detection", "baseline_deviation", "peer_group_analysis", "threat_intel_enrichment" ], "tools": ["sigma_rules", "yara_advanced", "osquery", "velociraptor"], "automation_level": "medium", "hunting_frequency": "daily", }, "hunter_elite": { "capabilities": [ "automated_hunting", "ml_anomaly_detection", "ttp_correlation", "threat_actor_tracking", "custom_detection_logic", "hypothesis_driven_hunting" ], "tools": [ "sigma_rules", "yara_advanced", "osquery", "velociraptor", "misp", "opencti", "jupyter_notebooks", "elastic_ml" ], "automation_level": "high", "hunting_frequency": "continuous", }, } hunting_config = hunting_configs.get(hunting_profile, hunting_configs["hunter_lite"]) # Add hunting capabilities to the configuration modified_config = copy.deepcopy(config) vms = modified_config.get("ludus", []) # Add threat hunting VM if elite level if hunting_profile == "hunter_elite": hunting_vm = { "vm_name": "{{{{ range_id }}}}-threat-hunting", "hostname": "HUNTER", "template": "ubuntu-22-x64-server-template", "vlan": 10, "ip_last_octet": 110, "ram_gb": 16, "cpus": 8, "roles": [ "ludus_velociraptor", "ludus_jupyter_hunting", "ludus_misp", "ludus_opencti", ], } vms.append(hunting_vm) return { "status": "success", "profile_type": "threat_hunting", "hunting_level": hunting_profile, "modified_config": modified_config, "capabilities": hunting_config["capabilities"], "tools": hunting_config["tools"], "automation_level": hunting_config["automation_level"], "hunting_frequency": hunting_config["hunting_frequency"], } def apply_incident_response_profile( self, config: dict[str, Any], ir_profile: str ) -> dict[str, Any]: """Apply incident response profile for IR readiness.""" ir_configs = { "ir_preparation": { "capabilities": [ "centralized_logging", "baseline_configuration", "ir_tools_preinstalled", "network_diagrams", "asset_inventory" ], "tools": ["velociraptor", "ftk_imager", "volatility", "wireshark"], "playbooks": ["malware_infection", "data_breach", "ransomware"], }, "ir_detection": { "capabilities": [ "real_time_alerting", "correlation_rules", "automated_playbooks", "threat_intel_integration", "incident_ticketing" ], "tools": [ "velociraptor", "ftk_imager", "volatility", "wireshark", "suricata", "zeek", "theHive", "cortex" ], "playbooks": [ "malware_infection", "data_breach", "ransomware", "apt_intrusion", "insider_threat", "ddos_attack" ], }, "ir_containment": { "capabilities": [ "network_segmentation", "automated_isolation", "forensic_imaging", "memory_acquisition", "timeline_generation", "ioc_extraction" ], "tools": [ "velociraptor", "ftk_imager", "volatility", "wireshark", "suricata", "zeek", "theHive", "cortex", "graylog", "plaso", "timeline_explorer" ], "playbooks": [ "malware_infection", "data_breach", "ransomware", "apt_intrusion", "insider_threat", "ddos_attack", "business_email_compromise", "supply_chain_attack" ], }, "ir_active_breach": { "capabilities": [ "full_packet_capture", "memory_forensics", "malware_analysis", "threat_hunting", "ioc_pivoting", "attribution_analysis", "evidence_preservation", "chain_of_custody" ], "tools": [ "velociraptor", "ftk_imager", "volatility", "wireshark", "suricata", "zeek", "theHive", "cortex", "graylog", "plaso", "timeline_explorer", "remnux", "flare_vm", "cuckoo_sandbox", "yara", "ghidra", "ida_pro" ], "playbooks": [ "malware_infection", "data_breach", "ransomware", "apt_intrusion", "insider_threat", "ddos_attack", "business_email_compromise", "supply_chain_attack", "cryptojacking", "web_defacement" ], }, } ir_config = ir_configs.get(ir_profile, ir_configs["ir_preparation"]) modified_config = copy.deepcopy(config) vms = modified_config.get("ludus", []) # Add IR server for advanced profiles if ir_profile in ["ir_containment", "ir_active_breach"]: ir_vm = { "vm_name": "{{{{ range_id }}}}-ir-workstation", "hostname": "IR-ANALYST", "template": "ubuntu-22-x64-desktop-template", "vlan": 10, "ip_last_octet": 120, "ram_gb": 32, "cpus": 8, "roles": [ "ludus_forensic_tools", "ludus_thehive", "ludus_cortex", "ludus_volatility", "ludus_remnux" if ir_profile == "ir_active_breach" else None, ], } ir_vm["roles"] = [r for r in ir_vm["roles"] if r] # Remove None vms.append(ir_vm) return { "status": "success", "profile_type": "incident_response", "ir_level": ir_profile, "modified_config": modified_config, "capabilities": ir_config["capabilities"], "tools": ir_config["tools"], "playbooks": ir_config["playbooks"], } def apply_malware_analysis_profile( self, config: dict[str, Any], malware_profile: str ) -> dict[str, Any]: """Apply malware analysis profile for reverse engineering training. Args: config: Original Ludus configuration malware_profile: Level of malware analysis capabilities Returns: Dictionary with modified config and malware analysis setup """ malware_configs = { "malware_basic": { "capabilities": [ "static_analysis", "strings_extraction", "file_identification", "hash_analysis", "peid_detection", "basic_disassembly" ], "tools": [ "pestudio", "detect_it_easy", "exeinfo_pe", "strings", "file_command", "md5deep", "ssdeep", "7zip" ], "vm_resources": {"ram_gb": 8, "cpus": 4}, "automation_level": "manual", "sample_types": ["executables", "scripts", "documents"], }, "malware_intermediate": { "capabilities": [ "static_analysis", "strings_extraction", "file_identification", "hash_analysis", "peid_detection", "advanced_disassembly", "dynamic_analysis", "behavioral_monitoring", "api_monitoring", "network_monitoring", "sandbox_analysis" ], "tools": [ # Static analysis "pestudio", "detect_it_easy", "exeinfo_pe", "strings", "file_command", "md5deep", "ssdeep", "7zip", "pe_bear", # Dynamic analysis "procmon", "process_hacker", "wireshark", "fakenet_ng", "regshot", "autoruns", "process_explorer" ], "vm_resources": {"ram_gb": 16, "cpus": 4}, "automation_level": "semi-automated", "sample_types": ["executables", "scripts", "documents", "mobile_apps"], }, "malware_advanced": { "capabilities": [ "static_analysis", "strings_extraction", "file_identification", "hash_analysis", "peid_detection", "advanced_disassembly", "dynamic_analysis", "behavioral_monitoring", "api_monitoring", "network_monitoring", "sandbox_analysis", "debugger_analysis", "unpacking", "deobfuscation", "code_emulation", "yara_rules", "kernel_debugging", "memory_forensics" ], "tools": [ # Static analysis "pestudio", "detect_it_easy", "exeinfo_pe", "strings", "ida_free", "ghidra", "binary_ninja", "radare2", "pe_bear", "capa", "floss", "retdec", "yara", # Dynamic analysis "procmon", "process_hacker", "wireshark", "fakenet_ng", "x64dbg", "windbg", "ollydbg", "immunity_debugger", "scylla", "de4dot", "upx", "volatility", # Network "inetsim", "burp_suite", "mitmproxy" ], "vm_resources": {"ram_gb": 32, "cpus": 8}, "automation_level": "automated", "sample_types": ["executables", "scripts", "documents", "mobile_apps", "firmware"], }, "malware_expert": { "capabilities": [ # All advanced capabilities plus: "automated_detonation", "multi_os_analysis", "exploit_analysis", "shellcode_analysis", "rootkit_analysis", "bootkit_analysis", "cloud_malware_analysis", "apt_malware_analysis", "threat_intel_integration", "ioc_extraction", "yara_rule_generation", "signature_generation" ], "tools": [ # All advanced tools plus: "cuckoo_sandbox", "cape_sandbox", "joe_sandbox", "vmray", "ida_pro", "binary_ninja_pro", "hopper", "x64dbg_advanced", "volatility3", "rekall", "mandiant_redline", "axiom", "misp", "opencti", "malware_bazaar", "virustotal_api", "remnux_full", "flare_vm_full", "android_analysis_tools" ], "infrastructure": [ "remnux_vm", "flare_vm", "android_analysis_vm", "linux_analysis_vm", "network_simulation", "isolated_network" ], "vm_resources": {"ram_gb": 64, "cpus": 12}, "automation_level": "fully_automated", "sample_types": [ "executables", "scripts", "documents", "mobile_apps", "firmware", "exploits", "rootkits", "bootkits" ], }, } malware_config = malware_configs.get(malware_profile, malware_configs["malware_basic"]) modified_config = copy.deepcopy(config) vms = modified_config.get("ludus", []) # Determine VMs to add based on profile level if malware_profile == "malware_basic": # Single Windows analysis VM analysis_vm = { "vm_name": "{{{{ range_id }}}}-malware-analysis", "hostname": "MALWARE-LAB", "template": "win10-22h2-x64-enterprise-template", "vlan": 99, # Isolated VLAN "ip_last_octet": 10, "ram_gb": malware_config["vm_resources"]["ram_gb"], "cpus": malware_config["vm_resources"]["cpus"], "roles": [ "ludus_malware_basic_tools", "ludus_isolated_network", ], } vms.append(analysis_vm) elif malware_profile == "malware_intermediate": # Windows analysis VM + REMnux windows_vm = { "vm_name": "{{{{ range_id }}}}-malware-win", "hostname": "MALWARE-WIN", "template": "win10-22h2-x64-enterprise-template", "vlan": 99, "ip_last_octet": 10, "ram_gb": 8, "cpus": 4, "roles": [ "ludus_malware_intermediate_tools", "ludus_dynamic_analysis", "ludus_isolated_network", ], } remnux_vm = { "vm_name": "{{{{ range_id }}}}-remnux", "hostname": "REMNUX", "template": "ubuntu-22-x64-server-template", "vlan": 99, "ip_last_octet": 20, "ram_gb": 8, "cpus": 4, "roles": [ "ludus_remnux_basic", "ludus_isolated_network", ], } vms.extend([windows_vm, remnux_vm]) elif malware_profile == "malware_advanced": # Full analysis lab: Windows, Linux, REMnux, Network sim windows_vm = { "vm_name": "{{{{ range_id }}}}-malware-win", "hostname": "MALWARE-WIN", "template": "win10-22h2-x64-enterprise-template", "vlan": 99, "ip_last_octet": 10, "ram_gb": 16, "cpus": 8, "roles": [ "ludus_flare_vm", "ludus_advanced_debugging", "ludus_isolated_network", ], } remnux_vm = { "vm_name": "{{{{ range_id }}}}-remnux", "hostname": "REMNUX", "template": "ubuntu-22-x64-server-template", "vlan": 99, "ip_last_octet": 20, "ram_gb": 16, "cpus": 4, "roles": [ "ludus_remnux_full", "ludus_isolated_network", ], } network_sim = { "vm_name": "{{{{ range_id }}}}-inetsim", "hostname": "INETSIM", "template": "ubuntu-22-x64-server-template", "vlan": 99, "ip_last_octet": 1, "ram_gb": 4, "cpus": 2, "roles": [ "ludus_inetsim", "ludus_fakenet", ], } vms.extend([windows_vm, remnux_vm, network_sim]) elif malware_profile == "malware_expert": # Complete malware lab: Multiple OS analysis VMs + Sandbox + TI flare_vm = { "vm_name": "{{{{ range_id }}}}-flare-vm", "hostname": "FLARE-VM", "template": "win10-22h2-x64-enterprise-template", "vlan": 99, "ip_last_octet": 10, "ram_gb": 24, "cpus": 8, "roles": [ "ludus_flare_vm_full", "ludus_advanced_debugging", "ludus_isolated_network", ], } remnux_vm = { "vm_name": "{{{{ range_id }}}}-remnux", "hostname": "REMNUX", "template": "ubuntu-22-x64-server-template", "vlan": 99, "ip_last_octet": 20, "ram_gb": 16, "cpus": 4, "roles": [ "ludus_remnux_full", "ludus_isolated_network", ], } sandbox_vm = { "vm_name": "{{{{ range_id }}}}-cuckoo", "hostname": "CUCKOO", "template": "ubuntu-22-x64-server-template", "vlan": 99, "ip_last_octet": 30, "ram_gb": 16, "cpus": 8, "roles": [ "ludus_cuckoo_sandbox", "ludus_cape_sandbox", ], } network_sim = { "vm_name": "{{{{ range_id }}}}-inetsim", "hostname": "INETSIM", "template": "ubuntu-22-x64-server-template", "vlan": 99, "ip_last_octet": 1, "ram_gb": 4, "cpus": 2, "roles": [ "ludus_inetsim", "ludus_fakenet", ], } threat_intel_vm = { "vm_name": "{{{{ range_id }}}}-threat-intel", "hostname": "THREAT-INTEL", "template": "ubuntu-22-x64-server-template", "vlan": 10, # Management network "ip_last_octet": 150, "ram_gb": 8, "cpus": 4, "roles": [ "ludus_misp", "ludus_malware_bazaar", ], } vms.extend([flare_vm, remnux_vm, sandbox_vm, network_sim, threat_intel_vm]) return { "status": "success", "profile_type": "malware_analysis", "malware_level": malware_profile, "modified_config": modified_config, "capabilities": malware_config["capabilities"], "tools": malware_config["tools"], "vm_resources": malware_config["vm_resources"], "automation_level": malware_config["automation_level"], "sample_types": malware_config["sample_types"], "infrastructure": malware_config.get("infrastructure", []), "next_steps": [ "1. Review the malware analysis setup", f"2. Verify isolated network (VLAN 99) configuration", "3. Deploy with: deploy_range(config=modified_config)", "4. Configure INetSim for internet simulation", "5. Test sample detonation in isolated environment", "6. Never connect analysis VMs to production network", ], "warnings": [ "⚠️ Analysis VMs are in ISOLATED NETWORK (VLAN 99)", "⚠️ Never connect these systems to production networks", "⚠️ Use only for malware analysis training", "⚠️ Ensure proper snapshot/restore procedures", "⚠️ Handle malware samples with extreme caution", ], } def _generate_adversary_documentation( self, injections: list[dict[str, Any]], threat_level: str ) -> str: """Generate educational documentation for adversary profile.""" if not injections: return "No vulnerabilities were injected." doc = f"""# Adversary Profile Documentation ## Threat Level: {threat_level.upper()} This configuration has been enhanced with {len(injections)} realistic vulnerabilities for red team training. ## ⚠️ WARNING - These vulnerabilities are for EDUCATIONAL PURPOSES ONLY - Deploy ONLY in isolated lab environments - NEVER expose these systems to production networks or the internet ## Injected Vulnerabilities """ for idx, inj in enumerate(injections, 1): doc += f"""### {idx}. {inj['vulnerability_type'].replace('_', ' ').title()} - **VM**: {inj['vm_name']} - **Category**: {inj['category']} - **Severity**: {inj['severity'].upper()} - **Description**: {inj['description']} - **Exploitation**: {inj['exploitation']} - **Ansible Role**: `{inj['ansible_role']}` """ doc += """## Exploitation Practice Use these vulnerabilities to practice: 1. Reconnaissance and enumeration 2. Vulnerability identification 3. Exploitation techniques 4. Post-exploitation and pivoting 5. Privilege escalation 6. Lateral movement ## Remediation After exploitation practice, remediate vulnerabilities to practice blue team skills: - Apply security patches - Strengthen password policies - Configure proper ACLs and permissions - Implement network segmentation - Enable security logging and monitoring """ return doc def _generate_defender_documentation( self, enhancements: list[dict[str, Any]], siem_type: str, monitoring_level: str, detection_rules: list[dict[str, Any]], ) -> str: """Generate documentation for defender profile.""" if not enhancements: return "No monitoring enhancements were added." doc = f"""# Defender Profile Documentation ## Monitoring Level: {monitoring_level.upper()} ## SIEM: {siem_type.title()} This configuration has been enhanced with {len(enhancements)} monitoring capabilities. ## Monitoring Enhancements """ for idx, enh in enumerate(enhancements, 1): doc += f"""### {idx}. {enh['enhancement_type'].replace('_', ' ').title()} - **VM**: {enh['vm_name']} - **Category**: {enh['category']} - **Description**: {enh['description']} - **Capabilities**: {', '.join(enh['capabilities'])} - **Ansible Role**: `{enh['ansible_role']}` """ doc += f"""## Detection Rules {len(detection_rules)} detection rules have been configured: """ for idx, rule in enumerate(detection_rules, 1): doc += f"""### {idx}. {rule['rule_name']} - **Category**: {rule['category']} - **Severity**: {rule['severity'].upper()} - **Description**: {rule['description']} - **Indicators**: {', '.join(rule['indicators'])} """ doc += f"""## {siem_type.title()} Configuration After deployment: 1. Access {siem_type.title()} dashboard 2. Verify agent connectivity from all endpoints 3. Configure alerting thresholds 4. Create custom detection rules 5. Set up dashboards for SOC operations ## Blue Team Practice Use this environment to practice: 1. Log analysis and correlation 2. Threat hunting 3. Incident detection and response 4. Security monitoring 5. Alert triage and investigation 6. Forensic analysis """ return doc

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server