Skip to main content
Glama
washyu
by washyu

decommission_device

Safely remove devices from homelab infrastructure by migrating services to other devices or forcing removal when needed.

Instructions

Safely remove a device from the network infrastructure

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
device_idYesDatabase ID of the device to decommission
migration_planNoPlan for migrating services to other devices
force_removalNoForce removal without migration (data loss possible)
validate_onlyNoOnly validate decommission plan without executing

Implementation Reference

  • MCP tool handler that wraps the decommission_network_device function. Extracts arguments and returns results in MCP format.
    async def handle_decommission_device(arguments: dict[str, Any]) -> dict[str, Any]:
        """Handle decommission_device tool."""
        result = await decommission_network_device(
            device_id=arguments["device_id"],
            migration_plan=arguments.get("migration_plan"),
            force_removal=arguments.get("force_removal", False),
            validate_only=arguments.get("validate_only", False),
        )
        return {"content": [{"type": "text", "text": result}]}
  • Core business logic for decommissioning a network device. Validates device existence, analyzes dependencies, executes migration plans, stops services, and removes from clusters.
    async def decommission_network_device(
        device_id: int,
        migration_plan: dict[str, Any] | None = None,
        force_removal: bool = False,
        validate_only: bool = False,
    ) -> str:
        """Safely remove a device from the network infrastructure."""
    
        try:
            manager = InfrastructureManager()
    
            # Get device info
            connection_info = await manager.get_device_connection_info(device_id)
            if not connection_info:
                return json.dumps(
                    {
                        "status": "error",
                        "message": f"Device with ID {device_id} not found in sitemap",
                    }
                )
    
            # Analyze device dependencies
            dependencies = await _analyze_device_dependencies(manager, device_id)
    
            if dependencies["critical_services"] and not migration_plan and not force_removal:
                return json.dumps(
                    {
                        "status": "error",
                        "message": "Device has critical services. Migration plan required.",
                        "critical_services": dependencies["critical_services"],
                        "dependent_devices": dependencies["dependent_devices"],
                    }
                )
    
            if validate_only:
                return json.dumps(
                    {
                        "status": "success",
                        "message": "Decommission plan validated",
                        "dependencies": dependencies,
                        "migration_required": len(dependencies["critical_services"]) > 0,
                        "estimated_migration_time": "30-60 minutes" if migration_plan else "N/A",
                    }
                )
    
            decommission_results = []
    
            # Execute migration plan if provided
            if migration_plan and not force_removal:
                migration_results = await _execute_migration_plan(manager, device_id, migration_plan)
                decommission_results.extend(migration_results)
    
            # Remove device from active service
            async with asyncssh.connect(
                connection_info["hostname"],
                username=connection_info["username"],
                known_hosts=None,
            ) as conn:
                # Stop all services
                stop_result = await _stop_all_device_services(conn)
                decommission_results.append(stop_result)
    
                # Remove from load balancers/clusters
                removal_result = await _remove_from_clusters(conn)
                decommission_results.append(removal_result)
    
            # Update sitemap to mark device as decommissioned
            # Note: This method doesn't exist in NetworkSiteMap, would need to be implemented
            # manager.sitemap.update_device_status(device_id, "decommissioned")
    
            return json.dumps(
                {
                    "status": "success",
                    "message": f"Device {device_id} successfully decommissioned",
                    "device_id": device_id,
                    "migration_executed": migration_plan is not None,
                    "decommission_results": decommission_results,
                    "next_steps": [
                        "Verify migrated services are running on target devices",
                        "Update monitoring and alerting configurations",
                        "Physically remove or repurpose the hardware",
                    ],
                },
                indent=2,
            )
    
        except Exception as e:
            return json.dumps({"status": "error", "message": f"Device decommissioning failed: {str(e)}"})
  • Tool schema definition for decommission_device. Defines required and optional parameters: device_id (required), migration_plan, force_removal, and validate_only.
    "decommission_device": {
        "description": "Safely remove a device from the network infrastructure",
        "inputSchema": {
            "type": "object",
            "properties": {
                "device_id": {
                    "type": "integer",
                    "description": "Database ID of the device to decommission",
                },
                "migration_plan": {
                    "type": "object",
                    "description": "Plan for migrating services to other devices",
                    "properties": {
                        "target_devices": {
                            "type": "array",
                            "items": {"type": "integer"},
                            "description": "Device IDs to migrate services to",
                        },
                        "service_mapping": {
                            "type": "object",
                            "description": "Mapping of services to target devices",
                        },
                    },
                },
                "force_removal": {
                    "type": "boolean",
                    "default": False,
                    "description": "Force removal without migration (data loss possible)",
                },
                "validate_only": {
                    "type": "boolean",
                    "default": False,
                    "description": "Only validate decommission plan without executing",
                },
            },
            "required": ["device_id"],
        },
  • Tool registration mapping the decommission_device name to its handler function handle_decommission_device.
    "decommission_device": handle_decommission_device,
  • Helper function that analyzes device dependencies before decommissioning. Checks for running Docker containers, LXD containers, systemd services, and network listeners to identify critical services.
    async def _analyze_device_dependencies(manager: InfrastructureManager, device_id: int) -> dict[str, Any]:
        """Analyze device dependencies."""
        try:
            connection_info = await manager.get_device_connection_info(device_id)
            if not connection_info:
                return {
                    "critical_services": [],
                    "dependent_devices": [],
                    "error": "Device not found",
                }
    
            critical_services = []
            dependent_devices = []
    
            async with asyncssh.connect(
                connection_info["hostname"],
                username=connection_info["username"],
                known_hosts=None,
            ) as conn:
                # Check for running Docker containers
                docker_result = await conn.run('docker ps --format "{{.Names}}"')
                if docker_result.exit_status == 0 and docker_result.stdout:
                    stdout_text = (
                        docker_result.stdout.decode()
                        if isinstance(docker_result.stdout, bytes)
                        else str(docker_result.stdout)
                    )
                    if stdout_text.strip():
                        container_names = stdout_text.strip().split("\n")
                    for container_name in container_names:
                        if container_name.strip():
                            # Check if container has exposed ports (likely critical)
                            port_result = await conn.run(f"docker port {container_name}")
                            if port_result.exit_status == 0 and port_result.stdout:
                                stdout_text = (
                                    port_result.stdout.decode()
                                    if isinstance(port_result.stdout, bytes)
                                    else str(port_result.stdout)
                                )
                                if stdout_text.strip():
                                    critical_services.append(
                                        {
                                            "name": container_name,
                                            "type": "docker",
                                            "reason": "Has exposed ports - likely provides external services",
                                            "ports": stdout_text.strip().split("\n"),
                                        }
                                    )
    
                # Check for running LXD containers
                lxd_result = await conn.run("lxc list --format csv -c ns | grep RUNNING")
                if lxd_result.exit_status == 0 and lxd_result.stdout:
                    stdout_text = (
                        lxd_result.stdout.decode() if isinstance(lxd_result.stdout, bytes) else str(lxd_result.stdout)
                    )
                    if stdout_text.strip():
                        for line in stdout_text.strip().split("\n"):
                            if line.strip():
                                container_name = line.split(",")[0]
                                critical_services.append(
                                    {
                                        "name": container_name,
                                        "type": "lxd",
                                        "reason": "Running LXD container",
                                    }
                                )
    
                # Check for critical systemd services
                critical_service_patterns = [
                    "nginx",
                    "apache2",
                    "mysql",
                    "postgresql",
                    "redis",
                    "mongodb",
                    "docker",
                    "k3s",
                    "kubernetes",
                    "prometheus",
                    "grafana",
                ]
    
                for pattern in critical_service_patterns:
                    service_result = await conn.run(f"systemctl is-active {pattern} 2>/dev/null")
                    if service_result.exit_status == 0 and service_result.stdout:
                        stdout_text = (
                            service_result.stdout.decode()
                            if isinstance(service_result.stdout, bytes)
                            else str(service_result.stdout)
                        )
                        if stdout_text.strip() == "active":
                            critical_services.append(
                                {
                                    "name": pattern,
                                    "type": "systemd",
                                    "reason": "Critical infrastructure service",
                                }
                            )
    
                # Check for services listening on network ports
                netstat_result = await conn.run("ss -tlnp 2>/dev/null | grep LISTEN")
                if netstat_result.exit_status == 0 and netstat_result.stdout:
                    listening_ports: list[str] = []
                    stdout_text = (
                        netstat_result.stdout.decode()
                        if isinstance(netstat_result.stdout, bytes)
                        else str(netstat_result.stdout)
                    )
                    for line in stdout_text.strip().split("\n"):
                        if "LISTEN" in line:
                            parts = line.split()
                            if len(parts) >= 4:
                                addr_port = parts[3]
                                if ":" in addr_port:
                                    port = addr_port.split(":")[-1]
                                    if port not in ["22", "53"]:  # Skip SSH and DNS
                                        listening_ports.append(port)
    
                    if listening_ports:
                        critical_services.append(
                            {
                                "name": "network_services",
                                "type": "network",
                                "reason": f"Listening on ports: {', '.join(listening_ports)}",
                                "ports": listening_ports,
                            }
                        )
    
            # Analyze network dependencies (simplified)
            # In a real implementation, this would check the network topology
            # and identify devices that depend on this device for routing, DNS, etc.
            all_devices = manager.sitemap.get_all_devices()
            device_ip = connection_info["hostname"]
    
            for device in all_devices:
                if device.get("id") != device_id:
                    # Check if this device might be a gateway or DNS server for others
                    device_subnet = ".".join(device_ip.split(".")[:-1])
                    other_ip = device.get("connection_ip", device.get("hostname", ""))
                    if other_ip.startswith(device_subnet):
                        # Devices in same subnet might depend on this device
                        dependent_devices.append(
                            {
                                "device_id": device.get("id"),
                                "hostname": device.get("hostname"),
                                "reason": "Same network subnet - potential dependency",
                            }
                        )
    
            return {
                "critical_services": critical_services,
                "dependent_devices": dependent_devices,
                "analysis_summary": {
                    "total_critical_services": len(critical_services),
                    "total_dependent_devices": len(dependent_devices),
                    "migration_complexity": "high"
                    if len(critical_services) > 3
                    else "medium"
                    if len(critical_services) > 0
                    else "low",
                },
            }
    
        except Exception as e:
            return {"critical_services": [], "dependent_devices": [], "error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/washyu/mcp_python_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server