Skip to main content
Glama
network_analysis.py17.7 kB
"""Handler for network analysis and troubleshooting operations.""" from datetime import datetime from typing import Any import json from ludus_mcp.core.client import LudusAPIClient from ludus_mcp.utils.logging import get_logger logger = get_logger(__name__) class NetworkAnalysisHandler: """Handler for network analysis and troubleshooting.""" def __init__(self, client: LudusAPIClient) -> None: """Initialize the network analysis handler.""" self.client = client async def test_network_connectivity( self, source_vm: str | None = None, target_vm: str | None = None, test_type: str = "comprehensive", user_id: str | None = None ) -> dict[str, Any]: """ Test network connectivity between VMs. Args: source_vm: Source VM name (if None, tests from all VMs) target_vm: Target VM name (if None, tests to all VMs) test_type: Type of test (ping, comprehensive, port_scan) user_id: Optional user ID (admin only) Returns: Dictionary with connectivity test results """ try: range_info = await self.client.get_range(user_id) vms = range_info.get("VMs", []) if not vms: return { "status": "error", "error": "No VMs found in range" } # Get VM names vm_names = [vm.get("name") for vm in vms if vm.get("name")] # Determine source and target VMs sources = [source_vm] if source_vm else vm_names targets = [target_vm] if target_vm else vm_names connectivity_tests = [] for src in sources: for tgt in targets: if src == tgt: continue # Find VM IP addresses src_vm = next((vm for vm in vms if vm.get("name") == src), None) tgt_vm = next((vm for vm in vms if vm.get("name") == tgt), None) if not src_vm or not tgt_vm: continue src_ip = self._get_vm_ip(src_vm) tgt_ip = self._get_vm_ip(tgt_vm) test_result = { "source": { "name": src, "ip": src_ip, "network": self._get_vm_network(src_vm) }, "target": { "name": tgt, "ip": tgt_ip, "network": self._get_vm_network(tgt_vm) }, "test_type": test_type, "timestamp": datetime.now().isoformat(), "results": self._simulate_connectivity_test( src_vm, tgt_vm, test_type ) } connectivity_tests.append(test_result) # Calculate connectivity summary total_tests = len(connectivity_tests) reachable = sum( 1 for test in connectivity_tests if test["results"].get("reachable", False) ) result = { "status": "success", "timestamp": datetime.now().isoformat(), "test_configuration": { "source_vm": source_vm or "all", "target_vm": target_vm or "all", "test_type": test_type }, "summary": { "total_tests": total_tests, "reachable": reachable, "unreachable": total_tests - reachable, "success_rate": round((reachable / total_tests * 100), 2) if total_tests > 0 else 0 }, "tests": connectivity_tests, "note": "This is a connectivity analysis based on network configuration. For actual ping tests, use Ansible playbooks or SSH commands." } logger.info(f"Network connectivity test: {reachable}/{total_tests} reachable") return result except Exception as e: logger.error(f"Error testing network connectivity: {e}") return { "status": "error", "error": str(e) } def _get_vm_ip(self, vm: dict) -> str: """Extract VM IP address.""" # Try to get IP from various possible fields if "ip" in vm: return vm["ip"] if "ipAddress" in vm: return vm["ipAddress"] if "network" in vm and isinstance(vm["network"], dict): return vm["network"].get("ip", "unknown") return "unknown" def _get_vm_network(self, vm: dict) -> str: """Extract VM network name.""" if "network" in vm: if isinstance(vm["network"], str): return vm["network"] if isinstance(vm["network"], dict): return vm["network"].get("name", "unknown") return "unknown" def _simulate_connectivity_test( self, src_vm: dict, tgt_vm: dict, test_type: str ) -> dict[str, Any]: """Simulate connectivity test based on network configuration.""" src_network = self._get_vm_network(src_vm) tgt_network = self._get_vm_network(tgt_vm) # Same network = reachable same_network = src_network == tgt_network result = { "reachable": same_network, "same_network": same_network, "latency_ms": 1.5 if same_network else "N/A", "packet_loss": "0%" if same_network else "100%" } if test_type == "comprehensive": result["icmp_ping"] = same_network result["tcp_connectivity"] = same_network result["routing"] = "direct" if same_network else "requires_routing" return result async def get_network_topology(self, user_id: str | None = None) -> dict[str, Any]: """ Generate network topology visualization data. Args: user_id: Optional user ID (admin only) Returns: Dictionary with network topology data """ try: range_info = await self.client.get_range(user_id) range_config = await self.client.get_range_config(user_id) vms = range_info.get("VMs", []) networks = range_config.get("ludus", {}).get("network", []) # Build network topology topology = { "networks": [], "vms": [], "connections": [] } # Process networks for net in networks: network_info = { "name": net.get("name", "unknown"), "vlan": net.get("vlan"), "subnet": net.get("subnet", "unknown"), "gateway": net.get("gateway"), "type": net.get("type", "internal") } topology["networks"].append(network_info) # Process VMs and their connections for vm in vms: vm_info = { "name": vm.get("name", "unknown"), "ip": self._get_vm_ip(vm), "network": self._get_vm_network(vm), "status": vm.get("status", "unknown"), "template": vm.get("template", "unknown") } topology["vms"].append(vm_info) # Create connection to network topology["connections"].append({ "from": vm_info["name"], "to": vm_info["network"], "type": "vm_to_network" }) # Generate visualization data result = { "status": "success", "timestamp": datetime.now().isoformat(), "topology": topology, "statistics": { "total_networks": len(topology["networks"]), "total_vms": len(topology["vms"]), "total_connections": len(topology["connections"]) }, "mermaid_diagram": self._generate_mermaid_diagram(topology), "dot_graph": self._generate_dot_graph(topology) } logger.info(f"Generated network topology: {len(topology['networks'])} networks, {len(topology['vms'])} VMs") return result except Exception as e: logger.error(f"Error generating network topology: {e}") return { "status": "error", "error": str(e) } def _generate_mermaid_diagram(self, topology: dict) -> str: """Generate Mermaid diagram syntax for topology.""" lines = ["graph TD"] # Add networks for net in topology["networks"]: net_id = net["name"].replace("-", "_").replace(" ", "_") lines.append(f' {net_id}["{net["name"]}<br/>{net["subnet"]}"]') lines.append(f' style {net_id} fill:#e1f5ff') # Add VMs for vm in topology["vms"]: vm_id = vm["name"].replace("-", "_").replace(" ", "_") lines.append(f' {vm_id}["{vm["name"]}<br/>{vm["ip"]}"]') # Add connections for conn in topology["connections"]: from_id = conn["from"].replace("-", "_").replace(" ", "_") to_id = conn["to"].replace("-", "_").replace(" ", "_") lines.append(f' {from_id} --> {to_id}') return "\n".join(lines) def _generate_dot_graph(self, topology: dict) -> str: """Generate Graphviz DOT format for topology.""" lines = ["digraph network_topology {"] lines.append(' rankdir=TB;') lines.append(' node [shape=box];') # Add networks for net in topology["networks"]: net_id = net["name"].replace("-", "_").replace(" ", "_") lines.append(f' {net_id} [label="{net["name"]}\\n{net["subnet"]}" style=filled fillcolor=lightblue];') # Add VMs for vm in topology["vms"]: vm_id = vm["name"].replace("-", "_").replace(" ", "_") lines.append(f' {vm_id} [label="{vm["name"]}\\n{vm["ip"]}"];') # Add connections for conn in topology["connections"]: from_id = conn["from"].replace("-", "_").replace(" ", "_") to_id = conn["to"].replace("-", "_").replace(" ", "_") lines.append(f' {from_id} -> {to_id};') lines.append("}") return "\n".join(lines) async def diagnose_network_issues(self, user_id: str | None = None) -> dict[str, Any]: """ Automated network troubleshooting. Args: user_id: Optional user ID (admin only) Returns: Dictionary with diagnostic results """ try: range_info = await self.client.get_range(user_id) range_config = await self.client.get_range_config(user_id) logs = await self.client.get_range_logs(user_id) vms = range_info.get("VMs", []) networks = range_config.get("ludus", {}).get("network", []) issues = [] warnings = [] recommendations = [] # Check for VMs without IPs vms_without_ip = [ vm for vm in vms if self._get_vm_ip(vm) == "unknown" ] if vms_without_ip: issues.append({ "severity": "high", "type": "missing_ip", "message": f"{len(vms_without_ip)} VMs without IP addresses", "affected_vms": [vm.get("name") for vm in vms_without_ip] }) recommendations.append("Deploy or redeploy range to assign IP addresses") # Check for network isolation network_groups = {} for vm in vms: net = self._get_vm_network(vm) if net not in network_groups: network_groups[net] = [] network_groups[net].append(vm.get("name")) if len(network_groups) > 1: warnings.append({ "severity": "info", "type": "network_segmentation", "message": f"Range has {len(network_groups)} separate networks", "networks": network_groups }) recommendations.append("Verify network segmentation is intentional") # Check for stopped VMs stopped_vms = [vm for vm in vms if vm.get("status") != "running"] if stopped_vms: warnings.append({ "severity": "medium", "type": "stopped_vms", "message": f"{len(stopped_vms)} VMs are not running", "affected_vms": [vm.get("name") for vm in stopped_vms] }) recommendations.append("Use ludus.power_on_range to start all VMs") # Check logs for network errors if logs: log_lines = logs.lower() if "network" in log_lines and ("error" in log_lines or "failed" in log_lines): issues.append({ "severity": "high", "type": "network_errors_in_logs", "message": "Network-related errors found in deployment logs" }) recommendations.append("Review deployment logs for network configuration errors") # Calculate health status health_status = "healthy" if issues: health_status = "critical" elif warnings: health_status = "warning" result = { "status": "success", "timestamp": datetime.now().isoformat(), "health_status": health_status, "summary": { "total_vms": len(vms), "total_networks": len(networks), "issues_found": len(issues), "warnings_found": len(warnings) }, "issues": issues, "warnings": warnings, "recommendations": recommendations, "network_statistics": { "network_count": len(network_groups), "vms_per_network": { net: len(vms_list) for net, vms_list in network_groups.items() } } } logger.info(f"Network diagnostics: {health_status}, {len(issues)} issues, {len(warnings)} warnings") return result except Exception as e: logger.error(f"Error diagnosing network issues: {e}") return { "status": "error", "error": str(e) } async def capture_network_traffic( self, vm_name: str, duration_seconds: int = 60, filter_expression: str | None = None, user_id: str | None = None ) -> dict[str, Any]: """ Initiate packet capture on specific VMs. Args: vm_name: Name of the VM to capture traffic on duration_seconds: Duration of capture in seconds filter_expression: BPF filter expression (e.g., "tcp port 80") user_id: Optional user ID (admin only) Returns: Dictionary with capture configuration """ try: range_info = await self.client.get_range(user_id) vms = range_info.get("VMs", []) # Find target VM target_vm = next((vm for vm in vms if vm.get("name") == vm_name), None) if not target_vm: return { "status": "error", "error": f"VM not found: {vm_name}" } capture_config = { "status": "success", "capture_id": f"capture-{vm_name}-{datetime.now().strftime('%Y%m%d-%H%M%S')}", "vm_name": vm_name, "vm_ip": self._get_vm_ip(target_vm), "duration_seconds": duration_seconds, "filter_expression": filter_expression or "all traffic", "start_time": datetime.now().isoformat(), "estimated_end_time": (datetime.now()).isoformat(), "configuration": { "interface": "auto-detect", "snaplen": 65535, "promiscuous_mode": True, "buffer_size_mb": 100 }, "implementation_notes": { "ansible_playbook": f"Use Ansible to run tcpdump on {vm_name}", "command": f"tcpdump -i any -w /tmp/capture.pcap {'-f ' + filter_expression if filter_expression else ''}", "retrieval": "Use ansible.builtin.fetch to retrieve the pcap file" } } logger.info(f"Configured packet capture for {vm_name}: {duration_seconds}s") return capture_config except Exception as e: logger.error(f"Error configuring packet capture: {e}") return { "status": "error", "error": str(e) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server