Skip to main content
Glama
visualization.py16.8 kB
"""ASCII visualization utilities for Ludus ranges.""" from typing import Any def generate_ascii_topology(config: dict) -> str: """ Generate ASCII art visualization of range topology. Args: config: Ludus range configuration dictionary Returns: Formatted ASCII art string showing the range topology """ lines = [] # Extract range info range_name = config.get("name", "Ludus Range") range_desc = config.get("description", "") # Header header = f" Range: {range_name} " if range_desc: header += f"\n {range_desc} " border_len = max(len(line) for line in header.split('\n')) + 4 lines.append("╔" + "═" * border_len + "╗") for line in header.split('\n'): lines.append("║ " + line.ljust(border_len - 2) + "║") lines.append("╚" + "═" * border_len + "╝") lines.append("") # Extract ludus config - can be a list or dict ludus_config = config.get("ludus", []) # Handle both formats: list of VMs or dict with networks/vms if isinstance(ludus_config, list): # Direct list of VMs vms = ludus_config networks = [] elif isinstance(ludus_config, dict): # Dict format with networks and vms networks = ludus_config.get("networks", []) vms = ludus_config.get("vms", []) else: networks = [] vms = [] # Group VMs by network vms_by_network = {} for vm in vms: network = vm.get("network", "unknown") if network not in vms_by_network: vms_by_network[network] = [] vms_by_network[network].append(vm) # Display networks and VMs for network in networks: net_name = network.get("name", "unknown") net_cidr = network.get("cidr", "") net_desc = network.get("description", "") lines.append(f"📡 Network: {net_name} ({net_cidr})") if net_desc: lines.append(f" {net_desc}") # VMs in this network network_vms = vms_by_network.get(net_name, []) if network_vms: for i, vm in enumerate(network_vms): is_last = i == len(network_vms) - 1 prefix = " └─" if is_last else " ├─" vm_name = vm.get("hostname", vm.get("name", "unknown")) vm_template = vm.get("template", "unknown") vm_role = vm.get("role", "") # Icon based on role or template icon = get_vm_icon(vm_template, vm_role) lines.append(f"{prefix} {icon} {vm_name} ({vm_template})") # Role info if vm_role: role_prefix = " " if is_last else " │ " lines.append(f"{role_prefix}└─ Role: {vm_role}") # Roles (Ansible) vm_roles = vm.get("roles", []) if vm_roles: role_prefix = " " if is_last else " │ " lines.append(f"{role_prefix}└─ Ansible: {', '.join(vm_roles)}") else: lines.append(" └─ (no VMs)") lines.append("") # VMs not assigned to networks unassigned_vms = [vm for vm in vms if vm.get("network", "unknown") not in [n.get("name") for n in networks]] if unassigned_vms: lines.append("📡 Network: unassigned") for i, vm in enumerate(unassigned_vms): is_last = i == len(unassigned_vms) - 1 prefix = " └─" if is_last else " ├─" vm_name = vm.get("hostname", vm.get("name", "unknown")) vm_template = vm.get("template", "unknown") icon = get_vm_icon(vm_template, "") lines.append(f"{prefix} {icon} {vm_name} ({vm_template})") lines.append("") # Summary lines.append("═" * 65) lines.append(f"Total VMs: {len(vms)}") # Estimate deployment time estimated_time = estimate_deployment_time(len(vms)) lines.append(f"Estimated Deploy Time: {estimated_time}") # Resource estimates total_memory, total_disk = estimate_resources(vms) lines.append(f"Estimated Memory: {total_memory}GB") lines.append(f"Estimated Disk: {total_disk}GB") lines.append("═" * 65) return "\n".join(lines) def get_vm_icon(template: str, role: str) -> str: """Get emoji icon for VM based on template and role.""" template_lower = template.lower() role_lower = role.lower() # Role-based icons if "domain controller" in role_lower or "dc" in role_lower: return "🏰" if "siem" in role_lower or "wazuh" in role_lower or "splunk" in role_lower: return "🛡️" if "attacker" in role_lower or "red team" in role_lower: return "🐉" if "firewall" in role_lower or "router" in role_lower: return "🔥" if "web" in role_lower or "apache" in role_lower or "nginx" in role_lower: return "🌐" if "database" in role_lower or "sql" in role_lower or "mysql" in role_lower: return "🗄️" # Template-based icons if "windows-server" in template_lower or "win-server" in template_lower: return "🖥️" if "windows-11" in template_lower or "win11" in template_lower: return "💻" if "windows-10" in template_lower or "win10" in template_lower: return "💻" if "kali" in template_lower: return "🐉" if "ubuntu" in template_lower or "debian" in template_lower: return "🐧" if "centos" in template_lower or "rocky" in template_lower or "rhel" in template_lower: return "🎩" # Default return "🖥️" def estimate_deployment_time(vm_count: int) -> str: """Estimate deployment time based on VM count.""" if vm_count <= 2: return "8-12 minutes" elif vm_count <= 5: return "15-20 minutes" elif vm_count <= 10: return "25-35 minutes" else: return f"{vm_count * 3}-{vm_count * 4} minutes" def estimate_resources(vms: list[dict]) -> tuple[int, int]: """ Estimate total memory and disk requirements. Args: vms: List of VM configurations Returns: Tuple of (total_memory_gb, total_disk_gb) """ total_memory = 0 total_disk = 0 for vm in vms: # Memory (convert MB to GB) memory_mb = vm.get("ram_mb", vm.get("memory", 0)) if memory_mb == 0: # Estimate based on template template = vm.get("template", "").lower() if "server" in template: memory_mb = 4096 # 4GB for servers elif "windows" in template: memory_mb = 4096 # 4GB for Windows desktops else: memory_mb = 2048 # 2GB for Linux total_memory += memory_mb / 1024 # Convert to GB # Disk disk_gb = vm.get("disk_size_gb", vm.get("disk", 0)) if disk_gb == 0: # Estimate based on template template = vm.get("template", "").lower() if "windows" in template: disk_gb = 60 # 60GB for Windows else: disk_gb = 30 # 30GB for Linux total_disk += disk_gb return (int(total_memory), int(total_disk)) def format_scenario_preview( scenario_key: str, config: dict, siem_type: str = "wazuh", resource_profile: str = "recommended" ) -> str: """ Format a complete scenario preview with detailed information. Args: scenario_key: Scenario identifier config: Ludus range configuration siem_type: SIEM type being used resource_profile: Resource profile (minimal, recommended, maximum) Returns: Formatted preview string """ lines = [] # Title lines.append("=" * 80) lines.append(f" SCENARIO PREVIEW: {scenario_key.upper()}") lines.append("=" * 80) lines.append("") # Configuration summary lines.append("[CONFIGURATION]") lines.append("-" * 80) if siem_type and siem_type != "none": lines.append(f" SIEM Platform: {siem_type.title()}") else: lines.append(f" SIEM Platform: None") lines.append(f" Resource Profile: {resource_profile.title()}") lines.append("") # Extract VMs vms = config.get("ludus", []) if not isinstance(vms, list): vms = [] # Group VMs by VLAN vms_by_vlan = {} for vm in vms: vlan = vm.get("vlan", "unknown") if vlan not in vms_by_vlan: vms_by_vlan[vlan] = [] vms_by_vlan[vlan].append(vm) # VM Details lines.append("[VIRTUAL MACHINES]") lines.append("-" * 80) lines.append(f" Total VMs: {len(vms)}") lines.append("") # Show VMs grouped by VLAN for vlan in sorted(vms_by_vlan.keys(), key=lambda x: x if isinstance(x, int) else 0): vlan_vms = vms_by_vlan[vlan] lines.append(f" VLAN {vlan} ({len(vlan_vms)} VM{'s' if len(vlan_vms) != 1 else ''}):") for vm in vlan_vms: vm_name = vm.get("hostname", vm.get("vm_name", "unknown")) template = vm.get("template", "unknown") ram_gb = vm.get("ram_gb", 0) cpus = vm.get("cpus", 0) ip_octet = vm.get("ip_last_octet", "?") ip_base = f"192.168.{vlan}.{ip_octet}" if isinstance(vlan, int) else "N/A" # Get roles roles = vm.get("roles", []) if not roles and "ansible_roles" in vm: ansible_roles = vm.get("ansible_roles", []) if isinstance(ansible_roles, list): roles = [r.get("name") if isinstance(r, dict) else r for r in ansible_roles] lines.append(f" • {vm_name}") lines.append(f" Template: {template}") lines.append(f" Resources: {ram_gb}GB RAM, {cpus} CPUs") lines.append(f" IP: {ip_base}") if roles: lines.append(f" Roles: {', '.join(roles[:3])}") if len(roles) > 3: lines.append(f" + {len(roles) - 3} more") lines.append("") # Network Rules network_rules = config.get("network", {}).get("rules", []) if network_rules: lines.append("[NETWORK RULES]") lines.append("-" * 80) lines.append(f" Total Rules: {len(network_rules)}") for rule in network_rules[:5]: # Show first 5 name = rule.get("name", "unnamed") src = rule.get("vlan_src", "?") dst = rule.get("vlan_dst", "?") action = rule.get("action", "ACCEPT") lines.append(f" • {name}: VLAN {src} -> VLAN {dst} ({action})") if len(network_rules) > 5: lines.append(f" ... and {len(network_rules) - 5} more rules") lines.append("") # Resource Summary total_ram = sum(vm.get("ram_gb", 0) for vm in vms) total_cpus = sum(vm.get("cpus", 0) for vm in vms) estimated_time = estimate_deployment_time(len(vms)) estimated_memory_gb, estimated_disk_gb = estimate_resources(vms) lines.append("[RESOURCE SUMMARY]") lines.append("-" * 80) lines.append(f" Total RAM: {total_ram}GB") lines.append(f" Total CPUs: {total_cpus}") lines.append(f" Estimated Disk: {estimated_disk_gb}GB") lines.append(f" Estimated Deployment Time: {estimated_time}") lines.append("") # Deployment Commands lines.append("[DEPLOYMENT]") lines.append("-" * 80) lines.append(" To deploy this scenario:") lines.append(f" ludus.deploy_scenario(scenario_key='{scenario_key}', siem_type='{siem_type}')") lines.append("") lines.append(" Or use smart deploy for auto-monitoring:") lines.append(f" ludus.smart_deploy(scenario_key='{scenario_key}', siem_type='{siem_type}')") lines.append("") lines.append("=" * 80) return "\n".join(lines) def format_deployment_status(status_data: dict) -> str: """ Format deployment status with visual indicators. Args: status_data: Deployment status dictionary Returns: Formatted status string """ state = status_data.get("rangeState", "UNKNOWN") vm_count = status_data.get("numberOfVMs", 0) # Status indicator status_indicator = { "SUCCESS": "[OK]", "DEPLOYING": "[...]", "FAILED": "[ERROR]", "DELETED": "[DELETED]", "UNKNOWN": "[?]" } indicator = status_indicator.get(state, "[?]") # Build status line status_line = f"{indicator} Range Status: {state}" if vm_count > 0: status_line += f" | VMs: {vm_count}" # Add timing info if available if "lastDeployment" in status_data: last_deploy = status_data["lastDeployment"] status_line += f" | Deployed: {last_deploy}" return status_line def format_progress_bar(percentage: float, width: int = 40) -> str: """ Generate ASCII progress bar. Args: percentage: Progress percentage (0-100) width: Width of progress bar in characters Returns: Formatted progress bar string """ filled = int(width * percentage / 100) empty = width - filled bar = "█" * filled + "░" * empty return f"[{bar}] {percentage:.0f}%" def format_vm_list(vms: list[dict]) -> str: """ Format VM list with status indicators. Args: vms: List of VM dictionaries Returns: Formatted VM list string """ if not vms: return "No VMs found" lines = [] lines.append("Virtual Machines:") lines.append("-" * 60) for vm in vms: name = vm.get("hostname", vm.get("name", "unknown")) template = vm.get("template", "unknown") network = vm.get("network", "unknown") status = vm.get("status", "unknown") # Status icon status_icon = "[OK]" if status == "running" else "[STOPPED]" if status == "stopped" else "[?]" lines.append(f"{status_icon} {name}") lines.append(f" Template: {template}") lines.append(f" Network: {network}") if status: lines.append(f" Status: {status}") lines.append("") return "\n".join(lines) def format_network_list(networks: list[dict]) -> str: """ Format network list. Args: networks: List of network dictionaries Returns: Formatted network list string """ if not networks: return "No networks found" lines = [] lines.append("Networks:") lines.append("-" * 60) for net in networks: name = net.get("name", "unknown") cidr = net.get("cidr", "unknown") vlan = net.get("vlan", "") lines.append(f"📡 {name} ({cidr})") if vlan: lines.append(f" VLAN: {vlan}") lines.append("") return "\n".join(lines) def generate_deployment_summary(range_data: dict, logs_preview: str = "") -> str: """ Generate comprehensive deployment summary. Args: range_data: Range information dictionary logs_preview: Optional logs preview Returns: Formatted summary string """ lines = [] # Header lines.append("╔" + "═" * 63 + "╗") lines.append("║" + " Deployment Summary".ljust(63) + "║") lines.append("╚" + "═" * 63 + "╝") lines.append("") # Status state = range_data.get("rangeState", "UNKNOWN") status_indicator = { "SUCCESS": "[OK]", "DEPLOYING": "[...]", "FAILED": "[ERROR]", "DELETED": "[DELETED]", "UNKNOWN": "[?]" } indicator = status_indicator.get(state, "[?]") lines.append(f"Status: {indicator} {state}") lines.append(f"VMs: {range_data.get('numberOfVMs', 0)}") lines.append("") # Logs preview if logs_preview: lines.append("Recent Logs:") lines.append("-" * 60) lines.append(logs_preview) lines.append("") # Next steps based on state lines.append("Next Steps:") if state == "DEPLOYING": lines.append(" • Wait for deployment to complete (check status periodically)") lines.append(" • Use: ludus.get_deployment_status") lines.append(" • Check health: ludus.check_deployment_health") elif state == "SUCCESS": lines.append(" • Get access info: ludus.get_range_sshconfig") lines.append(" • Get RDP configs: ludus.get_range_rdpconfigs") lines.append(" • Start testing: ludus.start_testing") elif state == "FAILED": lines.append(" • Check logs: ludus.get_full_logs") lines.append(" • Check health: ludus.check_deployment_health") lines.append(" • Get recovery help: ludus.recover_deployment") return "\n".join(lines)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server