Skip to main content
Glama
washyu
by washyu

decommission_device

Safely remove devices from homelab infrastructure by migrating services to other devices or forcing removal when needed.

Instructions

Safely remove a device from the network infrastructure

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
device_idYesDatabase ID of the device to decommission
migration_planNoPlan for migrating services to other devices
force_removalNoForce removal without migration (data loss possible)
validate_onlyNoOnly validate decommission plan without executing

Implementation Reference

  • MCP tool handler that wraps the decommission_network_device function. Extracts arguments and returns results in MCP format.
    async def handle_decommission_device(arguments: dict[str, Any]) -> dict[str, Any]:
        """Handle decommission_device tool."""
        result = await decommission_network_device(
            device_id=arguments["device_id"],
            migration_plan=arguments.get("migration_plan"),
            force_removal=arguments.get("force_removal", False),
            validate_only=arguments.get("validate_only", False),
        )
        return {"content": [{"type": "text", "text": result}]}
  • Core business logic for decommissioning a network device. Validates device existence, analyzes dependencies, executes migration plans, stops services, and removes from clusters.
    async def decommission_network_device(
        device_id: int,
        migration_plan: dict[str, Any] | None = None,
        force_removal: bool = False,
        validate_only: bool = False,
    ) -> str:
        """Safely remove a device from the network infrastructure."""
    
        try:
            manager = InfrastructureManager()
    
            # Get device info
            connection_info = await manager.get_device_connection_info(device_id)
            if not connection_info:
                return json.dumps(
                    {
                        "status": "error",
                        "message": f"Device with ID {device_id} not found in sitemap",
                    }
                )
    
            # Analyze device dependencies
            dependencies = await _analyze_device_dependencies(manager, device_id)
    
            if dependencies["critical_services"] and not migration_plan and not force_removal:
                return json.dumps(
                    {
                        "status": "error",
                        "message": "Device has critical services. Migration plan required.",
                        "critical_services": dependencies["critical_services"],
                        "dependent_devices": dependencies["dependent_devices"],
                    }
                )
    
            if validate_only:
                return json.dumps(
                    {
                        "status": "success",
                        "message": "Decommission plan validated",
                        "dependencies": dependencies,
                        "migration_required": len(dependencies["critical_services"]) > 0,
                        "estimated_migration_time": "30-60 minutes" if migration_plan else "N/A",
                    }
                )
    
            decommission_results = []
    
            # Execute migration plan if provided
            if migration_plan and not force_removal:
                migration_results = await _execute_migration_plan(manager, device_id, migration_plan)
                decommission_results.extend(migration_results)
    
            # Remove device from active service
            async with asyncssh.connect(
                connection_info["hostname"],
                username=connection_info["username"],
                known_hosts=None,
            ) as conn:
                # Stop all services
                stop_result = await _stop_all_device_services(conn)
                decommission_results.append(stop_result)
    
                # Remove from load balancers/clusters
                removal_result = await _remove_from_clusters(conn)
                decommission_results.append(removal_result)
    
            # Update sitemap to mark device as decommissioned
            # Note: This method doesn't exist in NetworkSiteMap, would need to be implemented
            # manager.sitemap.update_device_status(device_id, "decommissioned")
    
            return json.dumps(
                {
                    "status": "success",
                    "message": f"Device {device_id} successfully decommissioned",
                    "device_id": device_id,
                    "migration_executed": migration_plan is not None,
                    "decommission_results": decommission_results,
                    "next_steps": [
                        "Verify migrated services are running on target devices",
                        "Update monitoring and alerting configurations",
                        "Physically remove or repurpose the hardware",
                    ],
                },
                indent=2,
            )
    
        except Exception as e:
            return json.dumps({"status": "error", "message": f"Device decommissioning failed: {str(e)}"})
  • Tool schema definition for decommission_device. Defines required and optional parameters: device_id (required), migration_plan, force_removal, and validate_only.
    "decommission_device": {
        "description": "Safely remove a device from the network infrastructure",
        "inputSchema": {
            "type": "object",
            "properties": {
                "device_id": {
                    "type": "integer",
                    "description": "Database ID of the device to decommission",
                },
                "migration_plan": {
                    "type": "object",
                    "description": "Plan for migrating services to other devices",
                    "properties": {
                        "target_devices": {
                            "type": "array",
                            "items": {"type": "integer"},
                            "description": "Device IDs to migrate services to",
                        },
                        "service_mapping": {
                            "type": "object",
                            "description": "Mapping of services to target devices",
                        },
                    },
                },
                "force_removal": {
                    "type": "boolean",
                    "default": False,
                    "description": "Force removal without migration (data loss possible)",
                },
                "validate_only": {
                    "type": "boolean",
                    "default": False,
                    "description": "Only validate decommission plan without executing",
                },
            },
            "required": ["device_id"],
        },
  • Tool registration mapping the decommission_device name to its handler function handle_decommission_device.
    "decommission_device": handle_decommission_device,
  • Helper function that analyzes device dependencies before decommissioning. Checks for running Docker containers, LXD containers, systemd services, and network listeners to identify critical services.
    async def _analyze_device_dependencies(manager: InfrastructureManager, device_id: int) -> dict[str, Any]:
        """Analyze device dependencies."""
        try:
            connection_info = await manager.get_device_connection_info(device_id)
            if not connection_info:
                return {
                    "critical_services": [],
                    "dependent_devices": [],
                    "error": "Device not found",
                }
    
            critical_services = []
            dependent_devices = []
    
            async with asyncssh.connect(
                connection_info["hostname"],
                username=connection_info["username"],
                known_hosts=None,
            ) as conn:
                # Check for running Docker containers
                docker_result = await conn.run('docker ps --format "{{.Names}}"')
                if docker_result.exit_status == 0 and docker_result.stdout:
                    stdout_text = (
                        docker_result.stdout.decode()
                        if isinstance(docker_result.stdout, bytes)
                        else str(docker_result.stdout)
                    )
                    if stdout_text.strip():
                        container_names = stdout_text.strip().split("\n")
                    for container_name in container_names:
                        if container_name.strip():
                            # Check if container has exposed ports (likely critical)
                            port_result = await conn.run(f"docker port {container_name}")
                            if port_result.exit_status == 0 and port_result.stdout:
                                stdout_text = (
                                    port_result.stdout.decode()
                                    if isinstance(port_result.stdout, bytes)
                                    else str(port_result.stdout)
                                )
                                if stdout_text.strip():
                                    critical_services.append(
                                        {
                                            "name": container_name,
                                            "type": "docker",
                                            "reason": "Has exposed ports - likely provides external services",
                                            "ports": stdout_text.strip().split("\n"),
                                        }
                                    )
    
                # Check for running LXD containers
                lxd_result = await conn.run("lxc list --format csv -c ns | grep RUNNING")
                if lxd_result.exit_status == 0 and lxd_result.stdout:
                    stdout_text = (
                        lxd_result.stdout.decode() if isinstance(lxd_result.stdout, bytes) else str(lxd_result.stdout)
                    )
                    if stdout_text.strip():
                        for line in stdout_text.strip().split("\n"):
                            if line.strip():
                                container_name = line.split(",")[0]
                                critical_services.append(
                                    {
                                        "name": container_name,
                                        "type": "lxd",
                                        "reason": "Running LXD container",
                                    }
                                )
    
                # Check for critical systemd services
                critical_service_patterns = [
                    "nginx",
                    "apache2",
                    "mysql",
                    "postgresql",
                    "redis",
                    "mongodb",
                    "docker",
                    "k3s",
                    "kubernetes",
                    "prometheus",
                    "grafana",
                ]
    
                for pattern in critical_service_patterns:
                    service_result = await conn.run(f"systemctl is-active {pattern} 2>/dev/null")
                    if service_result.exit_status == 0 and service_result.stdout:
                        stdout_text = (
                            service_result.stdout.decode()
                            if isinstance(service_result.stdout, bytes)
                            else str(service_result.stdout)
                        )
                        if stdout_text.strip() == "active":
                            critical_services.append(
                                {
                                    "name": pattern,
                                    "type": "systemd",
                                    "reason": "Critical infrastructure service",
                                }
                            )
    
                # Check for services listening on network ports
                netstat_result = await conn.run("ss -tlnp 2>/dev/null | grep LISTEN")
                if netstat_result.exit_status == 0 and netstat_result.stdout:
                    listening_ports: list[str] = []
                    stdout_text = (
                        netstat_result.stdout.decode()
                        if isinstance(netstat_result.stdout, bytes)
                        else str(netstat_result.stdout)
                    )
                    for line in stdout_text.strip().split("\n"):
                        if "LISTEN" in line:
                            parts = line.split()
                            if len(parts) >= 4:
                                addr_port = parts[3]
                                if ":" in addr_port:
                                    port = addr_port.split(":")[-1]
                                    if port not in ["22", "53"]:  # Skip SSH and DNS
                                        listening_ports.append(port)
    
                    if listening_ports:
                        critical_services.append(
                            {
                                "name": "network_services",
                                "type": "network",
                                "reason": f"Listening on ports: {', '.join(listening_ports)}",
                                "ports": listening_ports,
                            }
                        )
    
            # Analyze network dependencies (simplified)
            # In a real implementation, this would check the network topology
            # and identify devices that depend on this device for routing, DNS, etc.
            all_devices = manager.sitemap.get_all_devices()
            device_ip = connection_info["hostname"]
    
            for device in all_devices:
                if device.get("id") != device_id:
                    # Check if this device might be a gateway or DNS server for others
                    device_subnet = ".".join(device_ip.split(".")[:-1])
                    other_ip = device.get("connection_ip", device.get("hostname", ""))
                    if other_ip.startswith(device_subnet):
                        # Devices in same subnet might depend on this device
                        dependent_devices.append(
                            {
                                "device_id": device.get("id"),
                                "hostname": device.get("hostname"),
                                "reason": "Same network subnet - potential dependency",
                            }
                        )
    
            return {
                "critical_services": critical_services,
                "dependent_devices": dependent_devices,
                "analysis_summary": {
                    "total_critical_services": len(critical_services),
                    "total_dependent_devices": len(dependent_devices),
                    "migration_complexity": "high"
                    if len(critical_services) > 3
                    else "medium"
                    if len(critical_services) > 0
                    else "low",
                },
            }
    
        except Exception as e:
            return {"critical_services": [], "dependent_devices": [], "error": str(e)}
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries full burden for behavioral disclosure. It mentions 'safely' but doesn't explain what that entails (e.g., data migration, service downtime, permissions required). The input schema hints at migration and validation options, but the description doesn't elaborate on these behaviors or potential side effects like data loss with force_removal.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence that states the core purpose without unnecessary words. It's appropriately sized and front-loaded with the essential information.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

For a destructive tool with 4 parameters, no annotations, and no output schema, the description is insufficient. It doesn't cover behavioral aspects like what 'safely' means, potential impacts, or response format. The input schema provides parameter details, but the description lacks context about execution flow or results.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 100%, so parameters are well-documented in the schema itself. The description adds no additional parameter semantics beyond what's in the schema (e.g., doesn't explain migration_plan structure or validate_only implications). Baseline 3 is appropriate when schema does the heavy lifting.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the action ('safely remove') and target ('device from the network infrastructure'), which distinguishes it from sibling tools like 'remove_server' or 'remove_vm' that target different resources. However, it doesn't explicitly differentiate from all siblings (e.g., 'delete_proxmox_vm'), so it's not a perfect 5.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives like 'remove_server' or 'delete_proxmox_vm'. It mentions 'safely' but doesn't explain what makes it safe or when force_removal might be appropriate. No prerequisites or exclusions are stated.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/washyu/mcp_python_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server