Skip to main content
Glama
vms.py16.3 kB
""" TrueNAS Legacy VM management tools (bhyve-based) Provides tools for managing legacy bhyve VMs on TrueNAS SCALE. Uses the /api/v2.0/vm endpoints. NOTE: For new deployments, consider using Incus VMs (/api/v2.0/virt/instance) instead. Legacy VMs are maintained for backward compatibility. """ import asyncio from typing import Any, Dict, List, Optional from .base import BaseTool, tool_handler class LegacyVMTools(BaseTool): """Tools for managing TrueNAS legacy bhyve VMs""" # Timeout for VM operations VM_OPERATION_TIMEOUT = 120 # seconds POLL_INTERVAL = 5 # seconds def get_tool_definitions(self) -> list: """Get tool definitions for legacy VM management""" return [ ("list_legacy_vms", self.list_legacy_vms, "List all legacy bhyve VMs", {"limit": {"type": "integer", "required": False, "description": "Max items to return (default: 100, max: 500)"}, "offset": {"type": "integer", "required": False, "description": "Items to skip for pagination"}}), ("get_legacy_vm", self.get_legacy_vm, "Get detailed information about a legacy VM", {"vm_id": {"type": "integer", "required": True, "description": "Numeric ID of the VM"}, "include_raw": {"type": "boolean", "required": False, "description": "Include full API response for debugging (default: false)"}}), ("start_legacy_vm", self.start_legacy_vm, "Start a legacy VM", {"vm_id": {"type": "integer", "required": True, "description": "Numeric ID of the VM to start"}}), ("stop_legacy_vm", self.stop_legacy_vm, "Stop a legacy VM", {"vm_id": {"type": "integer", "required": True, "description": "Numeric ID of the VM to stop"}, "force": {"type": "boolean", "required": False, "description": "Force stop (poweroff) without graceful shutdown"}}), ("restart_legacy_vm", self.restart_legacy_vm, "Restart a legacy VM", {"vm_id": {"type": "integer", "required": True, "description": "Numeric ID of the VM to restart"}}), ("update_legacy_vm", self.update_legacy_vm, "Update legacy VM configuration", {"vm_id": {"type": "integer", "required": True, "description": "Numeric ID of the VM to update"}, "name": {"type": "string", "required": False, "description": "New VM name"}, "vcpus": {"type": "integer", "required": False, "description": "Number of virtual CPUs"}, "memory": {"type": "integer", "required": False, "description": "Memory in MB"}, "autostart": {"type": "boolean", "required": False, "description": "Start on boot"}}), ("get_legacy_vm_status", self.get_legacy_vm_status, "Get the runtime status of a legacy VM", {"vm_id": {"type": "integer", "required": True, "description": "Numeric ID of the VM"}}), ] @tool_handler async def list_legacy_vms( self, limit: int = BaseTool.DEFAULT_LIMIT, offset: int = 0 ) -> Dict[str, Any]: """ List all legacy bhyve VMs Args: limit: Maximum number of items to return (default: 100, max: 500) offset: Number of items to skip for pagination Returns: Dictionary containing list of VMs with their status """ await self.ensure_initialized() vms = await self.client.get("/vm") vm_list = [] for vm in vms: # Get status for each VM vm_id = vm.get("id") status = await self._get_vm_status(vm_id) vm_info = { "id": vm_id, "name": vm.get("name"), "description": vm.get("description"), "vcpus": vm.get("vcpus", 1), "memory_mb": vm.get("memory", 0), "autostart": vm.get("autostart", False), "status": status, "bootloader": vm.get("bootloader"), } vm_list.append(vm_info) # Count by status (before pagination) status_counts = {} for vm in vm_list: status = vm["status"] status_counts[status] = status_counts.get(status, 0) + 1 total_vms = len(vm_list) # Apply pagination paginated_vms, pagination = self.apply_pagination(vm_list, limit, offset) return { "success": True, "vms": paginated_vms, "metadata": { "total_vms": total_vms, "status_counts": status_counts, }, "pagination": pagination } @tool_handler async def get_legacy_vm( self, vm_id: int, include_raw: bool = False ) -> Dict[str, Any]: """ Get detailed information about a legacy VM Args: vm_id: Numeric ID of the VM include_raw: Include full API response for debugging (default: false) Returns: Dictionary containing VM details """ await self.ensure_initialized() try: vm = await self.client.get(f"/vm/id/{vm_id}") except Exception: return { "success": False, "error": f"VM with ID {vm_id} not found" } # Get runtime status status = await self._get_vm_status(vm_id) # Parse device information devices = [] for device in vm.get("devices", []): devices.append({ "id": device.get("id"), "type": device.get("dtype"), "order": device.get("order", 1000), "attributes": device.get("attributes", {}), }) result = { "success": True, "vm": { "id": vm.get("id"), "name": vm.get("name"), "description": vm.get("description"), "vcpus": vm.get("vcpus"), "memory_mb": vm.get("memory"), "min_memory": vm.get("min_memory"), "autostart": vm.get("autostart"), "bootloader": vm.get("bootloader"), "time": vm.get("time"), "shutdown_timeout": vm.get("shutdown_timeout"), "cpu_mode": vm.get("cpu_mode"), "cpu_model": vm.get("cpu_model"), "status": status, "devices": devices, } } if include_raw: result["raw"] = vm return result @tool_handler async def start_legacy_vm(self, vm_id: int) -> Dict[str, Any]: """ Start a legacy VM Args: vm_id: Numeric ID of the VM to start Returns: Dictionary containing operation result """ await self.ensure_initialized() # Check current status status = await self._get_vm_status(vm_id) if status == "RUNNING": return { "success": True, "message": f"VM {vm_id} is already running", "status": status } # Start the VM try: result = await self.client.post(f"/vm/id/{vm_id}/start") except Exception as e: return { "success": False, "error": f"Failed to start VM {vm_id}: {str(e)}" } # Poll for running state final_status = await self._wait_for_vm_status(vm_id, "RUNNING") return { "success": final_status == "RUNNING", "vm_id": vm_id, "status": final_status, "message": ( f"VM {vm_id} started successfully" if final_status == "RUNNING" else f"VM {vm_id} may still be starting (current status: {final_status})" ) } @tool_handler async def stop_legacy_vm( self, vm_id: int, force: bool = False ) -> Dict[str, Any]: """ Stop a legacy VM Args: vm_id: Numeric ID of the VM to stop force: Force stop (poweroff) without graceful shutdown Returns: Dictionary containing operation result """ await self.ensure_initialized() # Check current status status = await self._get_vm_status(vm_id) if status == "STOPPED": return { "success": True, "message": f"VM {vm_id} is already stopped", "status": status } # Stop the VM endpoint = f"/vm/id/{vm_id}/stop" body = {} if force: body["force"] = True try: result = await self.client.post(endpoint, body if body else None) except Exception as e: return { "success": False, "error": f"Failed to stop VM {vm_id}: {str(e)}" } # Poll for stopped state final_status = await self._wait_for_vm_status(vm_id, "STOPPED") return { "success": final_status == "STOPPED", "vm_id": vm_id, "status": final_status, "message": ( f"VM {vm_id} stopped successfully" if final_status == "STOPPED" else f"VM {vm_id} may still be stopping (current status: {final_status})" ) } @tool_handler async def restart_legacy_vm(self, vm_id: int) -> Dict[str, Any]: """ Restart a legacy VM Args: vm_id: Numeric ID of the VM to restart Returns: Dictionary containing operation result """ await self.ensure_initialized() # Check if VM exists try: vm = await self.client.get(f"/vm/id/{vm_id}") except Exception: return { "success": False, "error": f"VM with ID {vm_id} not found" } initial_status = await self._get_vm_status(vm_id) # Stop if running if initial_status == "RUNNING": try: await self.client.post(f"/vm/id/{vm_id}/stop") await self._wait_for_vm_status(vm_id, "STOPPED") except Exception as e: return { "success": False, "error": f"Failed to stop VM {vm_id}: {str(e)}" } # Start the VM try: await self.client.post(f"/vm/id/{vm_id}/start") except Exception as e: return { "success": False, "error": f"Failed to start VM {vm_id}: {str(e)}" } final_status = await self._wait_for_vm_status(vm_id, "RUNNING") return { "success": final_status == "RUNNING", "vm_id": vm_id, "status": final_status, "message": ( f"VM {vm_id} restarted successfully" if final_status == "RUNNING" else f"VM {vm_id} restart may still be in progress" ) } @tool_handler async def update_legacy_vm( self, vm_id: int, name: Optional[str] = None, vcpus: Optional[int] = None, memory: Optional[int] = None, autostart: Optional[bool] = None ) -> Dict[str, Any]: """ Update legacy VM configuration NOTE: Some changes may require the VM to be stopped first. Args: vm_id: Numeric ID of the VM to update name: New VM name vcpus: Number of virtual CPUs memory: Memory in MB autostart: Start on boot Returns: Dictionary containing operation result """ await self.ensure_initialized() # Check if VM exists try: vm = await self.client.get(f"/vm/id/{vm_id}") except Exception: return { "success": False, "error": f"VM with ID {vm_id} not found" } was_running = await self._get_vm_status(vm_id) == "RUNNING" # Build update body - only include provided fields update_body: Dict[str, Any] = {} if name is not None: update_body["name"] = name if vcpus is not None: update_body["vcpus"] = vcpus if memory is not None: update_body["memory"] = memory if autostart is not None: update_body["autostart"] = autostart if not update_body: return { "success": False, "error": "No update parameters provided" } # Update the VM try: result = await self.client.put(f"/vm/id/{vm_id}", update_body) except Exception as e: return { "success": False, "error": f"Failed to update VM {vm_id}: {str(e)}" } # Get updated VM info try: updated_vm = await self.client.get(f"/vm/id/{vm_id}") except Exception: updated_vm = {} return { "success": True, "vm_id": vm_id, "message": f"VM {vm_id} configuration updated", "updated_values": update_body, "current_config": { "name": updated_vm.get("name"), "vcpus": updated_vm.get("vcpus"), "memory_mb": updated_vm.get("memory"), "autostart": updated_vm.get("autostart"), }, "note": ( "VM may need restart for CPU/memory changes to take effect" if was_running and (vcpus or memory) else None ) } @tool_handler async def get_legacy_vm_status(self, vm_id: int) -> Dict[str, Any]: """ Get the runtime status of a legacy VM Args: vm_id: Numeric ID of the VM Returns: Dictionary containing VM status """ await self.ensure_initialized() status = await self._get_vm_status(vm_id) return { "success": True, "vm_id": vm_id, "status": status } async def _get_vm_status(self, vm_id: int) -> str: """ Get the status of a VM Args: vm_id: Numeric ID of the VM Returns: Status string (RUNNING, STOPPED, etc.) """ try: result = await self.client.get(f"/vm/id/{vm_id}/status") if isinstance(result, dict): return result.get("state", "UNKNOWN") return str(result) if result else "UNKNOWN" except Exception as e: self.logger.warning(f"Failed to get VM {vm_id} status: {e}") return "UNKNOWN" async def _wait_for_vm_status( self, vm_id: int, target_status: str, timeout: Optional[int] = None ) -> str: """ Wait for a VM to reach a target status Args: vm_id: Numeric ID of the VM target_status: Status to wait for (e.g., "RUNNING", "STOPPED") timeout: Optional timeout in seconds Returns: Final status of the VM """ timeout = timeout or self.VM_OPERATION_TIMEOUT max_attempts = timeout // self.POLL_INTERVAL for _ in range(max_attempts): status = await self._get_vm_status(vm_id) if status == target_status: return status # If we hit an error, return immediately if status == "ERROR": return status await asyncio.sleep(self.POLL_INTERVAL) # Return last known status return await self._get_vm_status(vm_id)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vespo92/TrueNasCoreMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server