Skip to main content
Glama
custom_scenarios.py15.5 kB
"""Custom user-defined scenarios - build and save your own lab configurations.""" import json import os from pathlib import Path from typing import Any, Optional from .base import BaseScenarioBuilder class CustomScenarioBuilder(BaseScenarioBuilder): """Builder for custom user-defined scenarios.""" def __init__( self, range_id: str = "{{ range_id }}", siem_type: str = "wazuh", resource_profile: str = "minimal", ): """Initialize custom scenario builder.""" super().__init__(range_id, siem_type, resource_profile) self.metadata = { "name": "Custom Scenario", "description": "User-defined custom scenario", "author": "user", "created_at": None, "updated_at": None, "tags": [], } def set_metadata( self, name: str, description: str, author: str = "user", tags: Optional[list[str]] = None, ) -> "CustomScenarioBuilder": """Set scenario metadata. Args: name: Scenario name description: Scenario description author: Author name tags: Optional tags for categorization Returns: Self for method chaining """ import datetime self.metadata["name"] = name self.metadata["description"] = description self.metadata["author"] = author self.metadata["tags"] = tags or [] self.metadata["updated_at"] = datetime.datetime.now().isoformat() if self.metadata["created_at"] is None: self.metadata["created_at"] = self.metadata["updated_at"] return self def add_domain_controller( self, hostname: str, domain: str, vlan: int = 10, ip_last_octet: int = 10, ram_gb: Optional[int] = None, cpus: Optional[int] = None, ) -> "CustomScenarioBuilder": """Add a domain controller to the scenario. Args: hostname: Hostname for the DC domain: Domain FQDN (e.g., 'corp.local') vlan: VLAN number ip_last_octet: Last octet of IP address ram_gb: Optional RAM override (uses profile default if not set) cpus: Optional CPU override (uses profile default if not set) Returns: Self for method chaining """ dc_ram, dc_cpus = self.get_resources("dc") self.add_vm( vm_name=f"{self.range_id}-dc-{hostname.lower()}", hostname=hostname, template="win2022-server-x64-template", vlan=vlan, ip_last_octet=ip_last_octet, ram_gb=ram_gb or dc_ram, cpus=cpus or dc_cpus, windows={"sysprep": False}, domain={"fqdn": domain, "role": "primary-dc"}, ) return self def add_workstation( self, hostname: str, domain: Optional[str] = None, vlan: int = 10, ip_last_octet: int = 20, ram_gb: Optional[int] = None, cpus: Optional[int] = None, packages: Optional[list[str]] = None, ) -> "CustomScenarioBuilder": """Add a Windows 11 workstation to the scenario. Args: hostname: Hostname for the workstation domain: Optional domain to join vlan: VLAN number ip_last_octet: Last octet of IP address ram_gb: Optional RAM override cpus: Optional CPU override packages: Optional Chocolatey packages to install Returns: Self for method chaining """ ws_ram, ws_cpus = self.get_resources("workstation") vm_config = { "vm_name": f"{self.range_id}-{hostname.lower()}", "hostname": hostname, "template": "win11-22h2-x64-enterprise-template", "vlan": vlan, "ip_last_octet": ip_last_octet, "ram_gb": ram_gb or ws_ram, "cpus": cpus or ws_cpus, } if packages: vm_config["windows"] = { "chocolatey_packages": packages, "chocolatey_ignore_checksums": True, } if domain: vm_config["domain"] = {"fqdn": domain, "role": "member"} self.add_vm(**vm_config) return self def add_server( self, hostname: str, server_type: str = "fileserver", domain: Optional[str] = None, vlan: int = 10, ip_last_octet: int = 15, ram_gb: Optional[int] = None, cpus: Optional[int] = None, ) -> "CustomScenarioBuilder": """Add a Windows Server to the scenario. Args: hostname: Hostname for the server server_type: Type of server (fileserver, sql, exchange, web) domain: Optional domain to join vlan: VLAN number ip_last_octet: Last octet of IP address ram_gb: Optional RAM override cpus: Optional CPU override Returns: Self for method chaining """ # Choose appropriate resource profile based on server type if server_type == "sql": default_ram, default_cpus = self.get_resources("sql_server") elif server_type == "exchange": default_ram, default_cpus = self.get_resources("exchange_server") else: default_ram, default_cpus = self.get_resources("server") vm_config = { "vm_name": f"{self.range_id}-{server_type}-{hostname.lower()}", "hostname": hostname, "template": "win2022-server-x64-template", "vlan": vlan, "ip_last_octet": ip_last_octet, "ram_gb": ram_gb or default_ram, "cpus": cpus or default_cpus, "windows": {"sysprep": False}, } if domain: vm_config["domain"] = {"fqdn": domain, "role": "member"} self.add_vm(**vm_config) return self def add_linux_server( self, hostname: str, vlan: int = 10, ip_last_octet: int = 50, ram_gb: Optional[int] = None, cpus: Optional[int] = None, template: str = "ubuntu-22.04-x64-server-template", ) -> "CustomScenarioBuilder": """Add a Linux server to the scenario. Args: hostname: Hostname for the server vlan: VLAN number ip_last_octet: Last octet of IP address ram_gb: Optional RAM override cpus: Optional CPU override template: Linux template to use Returns: Self for method chaining """ srv_ram, srv_cpus = self.get_resources("linux_server") self.add_vm( vm_name=f"{self.range_id}-{hostname.lower()}", hostname=hostname, template=template, vlan=vlan, ip_last_octet=ip_last_octet, ram_gb=ram_gb or srv_ram, cpus=cpus or srv_cpus, linux=True, ) return self def add_kali_attacker( self, hostname: str = "KALI", vlan: int = 99, ip_last_octet: int = 10, ram_gb: Optional[int] = None, cpus: Optional[int] = None, ) -> "CustomScenarioBuilder": """Add a Kali Linux attacker to the scenario. Args: hostname: Hostname for the attacker vlan: VLAN number (default: 99 for attacker network) ip_last_octet: Last octet of IP address ram_gb: Optional RAM override cpus: Optional CPU override Returns: Self for method chaining """ kali_ram, kali_cpus = self.get_resources("kali") self.add_vm( vm_name=f"{self.range_id}-kali-{hostname.lower()}", hostname=hostname, template="kali-x64-desktop-template", vlan=vlan, ip_last_octet=ip_last_octet, ram_gb=ram_gb or kali_ram, cpus=cpus or kali_cpus, linux=True, testing={"snapshot": False, "block_internet": False}, ) return self def allow_communication( self, name: str, from_vlan: int, to_vlan: int, protocol: str = "all", ports: Any = "all", ) -> "CustomScenarioBuilder": """Add a network rule to allow communication between VLANs. Args: name: Rule name from_vlan: Source VLAN to_vlan: Destination VLAN protocol: Protocol (tcp, udp, all) ports: Port(s) to allow (single int, list of ints, or "all") Returns: Self for method chaining """ self.add_network_rule( name=name, vlan_src=from_vlan, vlan_dst=to_vlan, protocol=protocol, ports=ports, action="ACCEPT", ) return self def add_monitoring( self, vlan: int = 10, ip_last_octet: int = 100, include_agents: bool = True, ) -> "CustomScenarioBuilder": """Add SIEM monitoring to the scenario. Args: vlan: VLAN for SIEM server ip_last_octet: Last octet of IP address include_agents: Whether to add SIEM agents to all VMs Returns: Self for method chaining """ if self.siem_type != "none": self.add_siem_server(vlan=vlan, ip_last_octet=ip_last_octet) if include_agents: self.add_siem_agents_to_all_vms() return self def to_dict_with_metadata(self) -> dict[str, Any]: """Get configuration with metadata as dictionary. Returns: Dictionary containing both config and metadata """ return { "metadata": self.metadata, "config": self.config.copy(), } class CustomScenarioManager: """Manager for custom user-defined scenarios.""" def __init__(self, storage_dir: Optional[Path] = None): """Initialize custom scenario manager. Args: storage_dir: Directory to store custom scenarios (default: ~/.ludus/custom-scenarios) """ if storage_dir is None: storage_dir = Path.home() / ".ludus" / "custom-scenarios" self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(parents=True, exist_ok=True) def save_scenario( self, scenario_id: str, builder: CustomScenarioBuilder ) -> dict[str, Any]: """Save a custom scenario to disk. Args: scenario_id: Unique identifier for the scenario builder: CustomScenarioBuilder instance Returns: Dictionary with save status and file path """ import datetime # Update timestamp builder.metadata["updated_at"] = datetime.datetime.now().isoformat() # Save to JSON file file_path = self.storage_dir / f"{scenario_id}.json" data = builder.to_dict_with_metadata() with open(file_path, "w") as f: json.dump(data, f, indent=2) return { "status": "saved", "scenario_id": scenario_id, "file_path": str(file_path), "name": builder.metadata["name"], } def load_scenario( self, scenario_id: str, resource_profile: str = "minimal" ) -> CustomScenarioBuilder: """Load a custom scenario from disk. Args: scenario_id: Unique identifier for the scenario resource_profile: Resource profile to use Returns: CustomScenarioBuilder instance Raises: FileNotFoundError: If scenario doesn't exist """ file_path = self.storage_dir / f"{scenario_id}.json" if not file_path.exists(): raise FileNotFoundError(f"Custom scenario '{scenario_id}' not found") with open(file_path, "r") as f: data = json.load(f) # Create builder and restore configuration builder = CustomScenarioBuilder(resource_profile=resource_profile) builder.metadata = data.get("metadata", {}) builder.config = data.get("config", {"ludus": [], "network": {"inter_vlan_default": "REJECT", "rules": []}}) return builder def list_scenarios(self) -> dict[str, dict[str, Any]]: """List all saved custom scenarios. Returns: Dictionary mapping scenario IDs to their metadata """ scenarios = {} for file_path in self.storage_dir.glob("*.json"): scenario_id = file_path.stem try: with open(file_path, "r") as f: data = json.load(f) scenarios[scenario_id] = data.get("metadata", {}) except Exception as e: # Skip corrupted files continue return scenarios def delete_scenario(self, scenario_id: str) -> dict[str, str]: """Delete a custom scenario. Args: scenario_id: Unique identifier for the scenario Returns: Dictionary with deletion status Raises: FileNotFoundError: If scenario doesn't exist """ file_path = self.storage_dir / f"{scenario_id}.json" if not file_path.exists(): raise FileNotFoundError(f"Custom scenario '{scenario_id}' not found") file_path.unlink() return { "status": "deleted", "scenario_id": scenario_id, } def export_scenario(self, scenario_id: str, export_path: Path) -> dict[str, str]: """Export a custom scenario to a specific location. Args: scenario_id: Unique identifier for the scenario export_path: Path to export the scenario to Returns: Dictionary with export status """ import shutil source_path = self.storage_dir / f"{scenario_id}.json" if not source_path.exists(): raise FileNotFoundError(f"Custom scenario '{scenario_id}' not found") shutil.copy(source_path, export_path) return { "status": "exported", "scenario_id": scenario_id, "export_path": str(export_path), } def import_scenario( self, scenario_id: str, import_path: Path ) -> dict[str, str]: """Import a custom scenario from a file. Args: scenario_id: Unique identifier for the scenario import_path: Path to import the scenario from Returns: Dictionary with import status """ import shutil if not Path(import_path).exists(): raise FileNotFoundError(f"Import file not found: {import_path}") # Validate JSON structure with open(import_path, "r") as f: data = json.load(f) if "metadata" not in data or "config" not in data: raise ValueError("Invalid scenario file format") # Copy to storage directory dest_path = self.storage_dir / f"{scenario_id}.json" shutil.copy(import_path, dest_path) return { "status": "imported", "scenario_id": scenario_id, "name": data.get("metadata", {}).get("name", "Unknown"), }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server