create_infrastructure_backup
Create backups of homelab infrastructure state to protect configurations, services, and data from loss or corruption.
Instructions
Create a backup of current infrastructure state
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| backup_scope | No | Scope of the backup | full |
| device_ids | No | Specific device IDs to backup (for partial/device_specific) | |
| include_data | No | Include application data in backup | |
| backup_name | No | Name for the backup (auto-generated if not provided) |
Implementation Reference
- Tool handler function that wraps the create_infrastructure_backup implementation. Extracts arguments and calls the underlying CRUD function, returning results in MCP tool response format.
async def handle_create_infrastructure_backup( arguments: dict[str, Any], ) -> dict[str, Any]: """Handle create_infrastructure_backup tool.""" result = await create_infrastructure_backup( backup_scope=arguments.get("backup_scope", "full"), device_ids=arguments.get("device_ids"), include_data=arguments.get("include_data", False), backup_name=arguments.get("backup_name"), ) return {"content": [{"type": "text", "text": result}]} - Main implementation of create_infrastructure_backup. Creates backup IDs, determines scope (full/partial/device_specific), backs up devices, network topology, saves to /tmp, and returns backup details including backup_id, path, size, and device count.
async def create_infrastructure_backup( backup_scope: str = "full", device_ids: list[int] | None = None, include_data: bool = False, backup_name: str | None = None, ) -> str: """Create a backup of current infrastructure state.""" try: manager = InfrastructureManager() # Generate backup ID backup_id = backup_name or f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}" backup_data = { "backup_id": backup_id, "created_at": datetime.now().isoformat(), "scope": backup_scope, "include_data": include_data, "devices": {}, "network_topology": {}, "services": {}, } # Determine which devices to backup if backup_scope == "full": devices = manager.sitemap.get_all_devices() target_device_ids = [device["id"] for device in devices] elif device_ids: target_device_ids = device_ids else: return json.dumps( { "status": "error", "message": "Device IDs required for partial or device-specific backup", } ) # Backup each device for device_id in target_device_ids: device_backup = await _backup_device(manager, device_id, include_data) if isinstance(backup_data["devices"], dict): backup_data["devices"][str(device_id)] = device_backup # Backup network topology backup_data["network_topology"] = await _backup_network_topology(manager) # Save backup (in a real implementation, this would go to persistent storage) backup_path = f"/tmp/infrastructure_backup_{backup_id}.json" with open(backup_path, "w") as f: json.dump(backup_data, f, indent=2) return json.dumps( { "status": "success", "message": "Infrastructure backup created successfully", "backup_id": backup_id, "backup_path": backup_path, "scope": backup_scope, "devices_backed_up": len(target_device_ids), "backup_size_mb": round(len(json.dumps(backup_data)) / 1024 / 1024, 2), "include_data": include_data, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": f"Infrastructure backup failed: {str(e)}"}) - Input schema definition for create_infrastructure_backup tool. Defines parameters: backup_scope (full/partial/device_specific), device_ids (array of integers), include_data (boolean), and backup_name (string, optional auto-generated).
"create_infrastructure_backup": { "description": "Create a backup of current infrastructure state", "inputSchema": { "type": "object", "properties": { "backup_scope": { "type": "string", "enum": ["full", "partial", "device_specific"], "default": "full", "description": "Scope of the backup", }, "device_ids": { "type": "array", "items": {"type": "integer"}, "description": "Specific device IDs to backup (for partial/device_specific)", }, "include_data": { "type": "boolean", "default": False, "description": "Include application data in backup", }, "backup_name": { "type": "string", "description": "Name for the backup (auto-generated if not provided)", }, }, "required": [], }, }, - src/homelab_mcp/tool_handlers/__init__.py:95-95 (registration)Tool registration mapping create_infrastructure_backup tool name to its handler function handle_create_infrastructure_backup in the TOOL_HANDLERS registry dictionary.
"create_infrastructure_backup": handle_create_infrastructure_backup, - Helper function _backup_device that backs up a single device via SSH. Collects Docker containers, LXD containers, systemd services, network config (interfaces, firewall, DNS), and system config (crontab, SSH). Optionally exports container data when include_data is True.
async def _backup_device(manager: InfrastructureManager, device_id: int, include_data: bool) -> dict[str, Any]: """Backup a single device.""" try: connection_info = await manager.get_device_connection_info(device_id) if not connection_info: return { "device_id": device_id, "backed_up": False, "error": "Device not found", } backup_data = { "device_id": device_id, "connection_info": connection_info, "services": {}, "system_config": {}, "network_config": {}, "backed_up_at": datetime.now().isoformat(), } async with asyncssh.connect( connection_info["hostname"], username=connection_info["username"], known_hosts=None, ) as conn: # Backup Docker containers docker_result = await conn.run('docker ps -a --format "{{.Names}}"') if docker_result.exit_status == 0 and docker_result.stdout: stdout_text = ( docker_result.stdout.decode() if isinstance(docker_result.stdout, bytes) else str(docker_result.stdout) ) if stdout_text.strip(): container_names = stdout_text.strip().split("\n") for container_name in container_names: if container_name.strip(): inspect_result = await conn.run(f"docker inspect {container_name}") if inspect_result.exit_status == 0: stdout_text = ( inspect_result.stdout.decode() if isinstance(inspect_result.stdout, bytes) else str(inspect_result.stdout) ) if isinstance(backup_data["services"], dict): backup_data["services"][container_name] = { "type": "docker", "config": stdout_text, "backed_up": True, } if include_data: # Export container data export_result = await conn.run( f"docker export {container_name} | gzip > /tmp/backup_{container_name}.tar.gz" ) if ( isinstance(backup_data["services"], dict) and container_name in backup_data["services"] ): backup_data["services"][container_name]["data_backup"] = ( export_result.exit_status == 0 ) # Backup LXD containers lxd_result = await conn.run("lxc list --format csv -c n") if lxd_result.exit_status == 0 and lxd_result.stdout: stdout_text = ( lxd_result.stdout.decode() if isinstance(lxd_result.stdout, bytes) else str(lxd_result.stdout) ) if stdout_text.strip(): container_names = stdout_text.strip().split("\n") for container_name in container_names: if container_name.strip(): info_result = await conn.run(f"lxc config show {container_name}") if info_result.exit_status == 0: stdout_text = ( info_result.stdout.decode() if isinstance(info_result.stdout, bytes) else str(info_result.stdout) ) if isinstance(backup_data["services"], dict): backup_data["services"][container_name] = { "type": "lxd", "config": stdout_text, "backed_up": True, } if include_data: # Export LXD container export_result = await conn.run( f"lxc export {container_name} /tmp/backup_{container_name}.tar.gz" ) if ( isinstance(backup_data["services"], dict) and container_name in backup_data["services"] ): backup_data["services"][container_name]["data_backup"] = ( export_result.exit_status == 0 ) # Backup systemd services systemd_result = await conn.run( "systemctl list-units --type=service --state=loaded --no-pager --plain | grep -v LOAD" ) if systemd_result.exit_status == 0 and systemd_result.stdout: stdout_text = ( systemd_result.stdout.decode() if isinstance(systemd_result.stdout, bytes) else str(systemd_result.stdout) ) service_lines = stdout_text.strip().split("\n") for line in service_lines: if line.strip(): parts = line.split() if not parts: continue service_name = parts[0] if not service_name.endswith(".service"): continue service_file_result = await conn.run(f"systemctl cat {service_name}") if service_file_result.exit_status == 0 and service_file_result.stdout: stdout_text = ( service_file_result.stdout.decode() if isinstance(service_file_result.stdout, bytes) else str(service_file_result.stdout) ) if isinstance(backup_data["services"], dict): backup_data["services"][service_name] = { "type": "systemd", "config": stdout_text, "backed_up": True, } # Backup network configuration network_configs = {} # Network interfaces interfaces_result = await conn.run( 'cat /etc/netplan/*.yaml 2>/dev/null || cat /etc/network/interfaces 2>/dev/null || echo "No network config found"' ) if interfaces_result.exit_status == 0: network_configs["interfaces"] = interfaces_result.stdout # Firewall rules ufw_result = await conn.run('sudo ufw status numbered 2>/dev/null || echo "UFW not available"') if ufw_result.exit_status == 0: network_configs["firewall"] = ufw_result.stdout # DNS configuration dns_result = await conn.run("cat /etc/resolv.conf") if dns_result.exit_status == 0: network_configs["dns"] = dns_result.stdout backup_data["network_config"] = network_configs # Backup system configuration system_configs = {} # Crontab cron_result = await conn.run('crontab -l 2>/dev/null || echo "No crontab"') if cron_result.exit_status == 0: system_configs["crontab"] = cron_result.stdout # SSH configuration ssh_result = await conn.run("sudo cat /etc/ssh/sshd_config") if ssh_result.exit_status == 0: system_configs["ssh"] = ssh_result.stdout backup_data["system_config"] = system_configs return backup_data except Exception as e: return {"device_id": device_id, "backed_up": False, "error": str(e)}