decommission_device
Safely remove devices from homelab infrastructure by migrating services to other devices or forcing removal when needed.
Instructions
Safely remove a device from the network infrastructure
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| device_id | Yes | Database ID of the device to decommission | |
| migration_plan | No | Plan for migrating services to other devices | |
| force_removal | No | Force removal without migration (data loss possible) | |
| validate_only | No | Only validate decommission plan without executing |
Implementation Reference
- MCP tool handler that wraps the decommission_network_device function. Extracts arguments and returns results in MCP format.
async def handle_decommission_device(arguments: dict[str, Any]) -> dict[str, Any]: """Handle decommission_device tool.""" result = await decommission_network_device( device_id=arguments["device_id"], migration_plan=arguments.get("migration_plan"), force_removal=arguments.get("force_removal", False), validate_only=arguments.get("validate_only", False), ) return {"content": [{"type": "text", "text": result}]} - Core business logic for decommissioning a network device. Validates device existence, analyzes dependencies, executes migration plans, stops services, and removes from clusters.
async def decommission_network_device( device_id: int, migration_plan: dict[str, Any] | None = None, force_removal: bool = False, validate_only: bool = False, ) -> str: """Safely remove a device from the network infrastructure.""" try: manager = InfrastructureManager() # Get device info connection_info = await manager.get_device_connection_info(device_id) if not connection_info: return json.dumps( { "status": "error", "message": f"Device with ID {device_id} not found in sitemap", } ) # Analyze device dependencies dependencies = await _analyze_device_dependencies(manager, device_id) if dependencies["critical_services"] and not migration_plan and not force_removal: return json.dumps( { "status": "error", "message": "Device has critical services. Migration plan required.", "critical_services": dependencies["critical_services"], "dependent_devices": dependencies["dependent_devices"], } ) if validate_only: return json.dumps( { "status": "success", "message": "Decommission plan validated", "dependencies": dependencies, "migration_required": len(dependencies["critical_services"]) > 0, "estimated_migration_time": "30-60 minutes" if migration_plan else "N/A", } ) decommission_results = [] # Execute migration plan if provided if migration_plan and not force_removal: migration_results = await _execute_migration_plan(manager, device_id, migration_plan) decommission_results.extend(migration_results) # Remove device from active service async with asyncssh.connect( connection_info["hostname"], username=connection_info["username"], known_hosts=None, ) as conn: # Stop all services stop_result = await _stop_all_device_services(conn) decommission_results.append(stop_result) # Remove from load balancers/clusters removal_result = await _remove_from_clusters(conn) decommission_results.append(removal_result) # Update sitemap to mark device as decommissioned # Note: This method doesn't exist in NetworkSiteMap, would need to be implemented # manager.sitemap.update_device_status(device_id, "decommissioned") return json.dumps( { "status": "success", "message": f"Device {device_id} successfully decommissioned", "device_id": device_id, "migration_executed": migration_plan is not None, "decommission_results": decommission_results, "next_steps": [ "Verify migrated services are running on target devices", "Update monitoring and alerting configurations", "Physically remove or repurpose the hardware", ], }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": f"Device decommissioning failed: {str(e)}"}) - Tool schema definition for decommission_device. Defines required and optional parameters: device_id (required), migration_plan, force_removal, and validate_only.
"decommission_device": { "description": "Safely remove a device from the network infrastructure", "inputSchema": { "type": "object", "properties": { "device_id": { "type": "integer", "description": "Database ID of the device to decommission", }, "migration_plan": { "type": "object", "description": "Plan for migrating services to other devices", "properties": { "target_devices": { "type": "array", "items": {"type": "integer"}, "description": "Device IDs to migrate services to", }, "service_mapping": { "type": "object", "description": "Mapping of services to target devices", }, }, }, "force_removal": { "type": "boolean", "default": False, "description": "Force removal without migration (data loss possible)", }, "validate_only": { "type": "boolean", "default": False, "description": "Only validate decommission plan without executing", }, }, "required": ["device_id"], }, - src/homelab_mcp/tool_handlers/__init__.py:92-92 (registration)Tool registration mapping the decommission_device name to its handler function handle_decommission_device.
"decommission_device": handle_decommission_device, - Helper function that analyzes device dependencies before decommissioning. Checks for running Docker containers, LXD containers, systemd services, and network listeners to identify critical services.
async def _analyze_device_dependencies(manager: InfrastructureManager, device_id: int) -> dict[str, Any]: """Analyze device dependencies.""" try: connection_info = await manager.get_device_connection_info(device_id) if not connection_info: return { "critical_services": [], "dependent_devices": [], "error": "Device not found", } critical_services = [] dependent_devices = [] async with asyncssh.connect( connection_info["hostname"], username=connection_info["username"], known_hosts=None, ) as conn: # Check for running Docker containers docker_result = await conn.run('docker ps --format "{{.Names}}"') if docker_result.exit_status == 0 and docker_result.stdout: stdout_text = ( docker_result.stdout.decode() if isinstance(docker_result.stdout, bytes) else str(docker_result.stdout) ) if stdout_text.strip(): container_names = stdout_text.strip().split("\n") for container_name in container_names: if container_name.strip(): # Check if container has exposed ports (likely critical) port_result = await conn.run(f"docker port {container_name}") if port_result.exit_status == 0 and port_result.stdout: stdout_text = ( port_result.stdout.decode() if isinstance(port_result.stdout, bytes) else str(port_result.stdout) ) if stdout_text.strip(): critical_services.append( { "name": container_name, "type": "docker", "reason": "Has exposed ports - likely provides external services", "ports": stdout_text.strip().split("\n"), } ) # Check for running LXD containers lxd_result = await conn.run("lxc list --format csv -c ns | grep RUNNING") if lxd_result.exit_status == 0 and lxd_result.stdout: stdout_text = ( lxd_result.stdout.decode() if isinstance(lxd_result.stdout, bytes) else str(lxd_result.stdout) ) if stdout_text.strip(): for line in stdout_text.strip().split("\n"): if line.strip(): container_name = line.split(",")[0] critical_services.append( { "name": container_name, "type": "lxd", "reason": "Running LXD container", } ) # Check for critical systemd services critical_service_patterns = [ "nginx", "apache2", "mysql", "postgresql", "redis", "mongodb", "docker", "k3s", "kubernetes", "prometheus", "grafana", ] for pattern in critical_service_patterns: service_result = await conn.run(f"systemctl is-active {pattern} 2>/dev/null") if service_result.exit_status == 0 and service_result.stdout: stdout_text = ( service_result.stdout.decode() if isinstance(service_result.stdout, bytes) else str(service_result.stdout) ) if stdout_text.strip() == "active": critical_services.append( { "name": pattern, "type": "systemd", "reason": "Critical infrastructure service", } ) # Check for services listening on network ports netstat_result = await conn.run("ss -tlnp 2>/dev/null | grep LISTEN") if netstat_result.exit_status == 0 and netstat_result.stdout: listening_ports: list[str] = [] stdout_text = ( netstat_result.stdout.decode() if isinstance(netstat_result.stdout, bytes) else str(netstat_result.stdout) ) for line in stdout_text.strip().split("\n"): if "LISTEN" in line: parts = line.split() if len(parts) >= 4: addr_port = parts[3] if ":" in addr_port: port = addr_port.split(":")[-1] if port not in ["22", "53"]: # Skip SSH and DNS listening_ports.append(port) if listening_ports: critical_services.append( { "name": "network_services", "type": "network", "reason": f"Listening on ports: {', '.join(listening_ports)}", "ports": listening_ports, } ) # Analyze network dependencies (simplified) # In a real implementation, this would check the network topology # and identify devices that depend on this device for routing, DNS, etc. all_devices = manager.sitemap.get_all_devices() device_ip = connection_info["hostname"] for device in all_devices: if device.get("id") != device_id: # Check if this device might be a gateway or DNS server for others device_subnet = ".".join(device_ip.split(".")[:-1]) other_ip = device.get("connection_ip", device.get("hostname", "")) if other_ip.startswith(device_subnet): # Devices in same subnet might depend on this device dependent_devices.append( { "device_id": device.get("id"), "hostname": device.get("hostname"), "reason": "Same network subnet - potential dependency", } ) return { "critical_services": critical_services, "dependent_devices": dependent_devices, "analysis_summary": { "total_critical_services": len(critical_services), "total_dependent_devices": len(dependent_devices), "migration_complexity": "high" if len(critical_services) > 3 else "medium" if len(critical_services) > 0 else "low", }, } except Exception as e: return {"critical_services": [], "dependent_devices": [], "error": str(e)}