Skip to main content
Glama
worker_manager.py16.8 kB
""" Worker Management Module for Resource Manager MCP Server Provides tools for managing Kubernetes workers including: - Listing workers with filtering - Provisioning burst workers - Draining workers gracefully - Destroying burst workers safely """ import json import subprocess import time from typing import Dict, List, Optional, Any from datetime import datetime, timedelta from enum import Enum class WorkerType(Enum): """Worker type classification""" PERMANENT = "permanent" BURST = "burst" class WorkerStatus(Enum): """Worker status states""" READY = "ready" BUSY = "busy" DRAINING = "draining" NOT_READY = "not_ready" class WorkerSize(Enum): """Worker VM size configurations""" SMALL = "small" MEDIUM = "medium" LARGE = "large" # Worker size specifications WORKER_SIZES = { "small": { "cpu": 2, "memory_gb": 4, "disk_gb": 50 }, "medium": { "cpu": 4, "memory_gb": 8, "disk_gb": 100 }, "large": { "cpu": 8, "memory_gb": 16, "disk_gb": 200 } } class WorkerManagerError(Exception): """Base exception for worker manager errors""" pass class WorkerManager: """Manages Kubernetes workers and their lifecycle""" def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize the worker manager Args: config: Configuration dictionary with MCP server endpoints """ self.config = config or {} self.talos_mcp_endpoint = self.config.get("talos_mcp_endpoint") self.proxmox_mcp_endpoint = self.config.get("proxmox_mcp_endpoint") self.kubectl_context = self.config.get("kubectl_context") def _run_kubectl(self, args: List[str]) -> Dict[str, Any]: """ Run kubectl command and return parsed JSON output Args: args: kubectl command arguments Returns: Parsed JSON output Raises: WorkerManagerError: If kubectl command fails """ cmd = ["kubectl"] if self.kubectl_context: cmd.extend(["--context", self.kubectl_context]) cmd.extend(args) try: result = subprocess.run( cmd, capture_output=True, text=True, check=True ) return json.loads(result.stdout) if result.stdout else {} except subprocess.CalledProcessError as e: raise WorkerManagerError(f"kubectl command failed: {e.stderr}") except json.JSONDecodeError as e: raise WorkerManagerError(f"Failed to parse kubectl output: {e}") def _get_node_type(self, node: Dict[str, Any]) -> WorkerType: """ Determine if a node is permanent or burst Args: node: Node object from kubectl Returns: WorkerType enum value """ labels = node.get("metadata", {}).get("labels", {}) # Check for burst worker label if labels.get("worker-type") == "burst": return WorkerType.BURST # Check for TTL annotation (burst workers have TTL) annotations = node.get("metadata", {}).get("annotations", {}) if "worker-ttl" in annotations: return WorkerType.BURST return WorkerType.PERMANENT def _get_node_status(self, node: Dict[str, Any]) -> WorkerStatus: """ Determine node status Args: node: Node object from kubectl Returns: WorkerStatus enum value """ # Check if node is schedulable spec = node.get("spec", {}) if spec.get("unschedulable", False): return WorkerStatus.DRAINING # Check node conditions conditions = node.get("status", {}).get("conditions", []) for condition in conditions: if condition.get("type") == "Ready": if condition.get("status") == "True": # Check if node is busy (has pods) # This is a simplified check - could be enhanced return WorkerStatus.READY else: return WorkerStatus.NOT_READY return WorkerStatus.NOT_READY def _get_node_resources(self, node: Dict[str, Any]) -> Dict[str, Any]: """ Extract node resource information Args: node: Node object from kubectl Returns: Dictionary with resource information """ capacity = node.get("status", {}).get("capacity", {}) allocatable = node.get("status", {}).get("allocatable", {}) return { "capacity": { "cpu": capacity.get("cpu", "0"), "memory": capacity.get("memory", "0"), "pods": capacity.get("pods", "0") }, "allocatable": { "cpu": allocatable.get("cpu", "0"), "memory": allocatable.get("memory", "0"), "pods": allocatable.get("pods", "0") } } def list_workers(self, type_filter: Optional[str] = None) -> List[Dict[str, Any]]: """ List all Kubernetes workers Args: type_filter: Optional filter by worker type ("permanent" or "burst") Returns: List of worker information dictionaries Raises: WorkerManagerError: If listing workers fails """ # Get all nodes nodes_data = self._run_kubectl(["get", "nodes", "-o", "json"]) nodes = nodes_data.get("items", []) workers = [] for node in nodes: node_name = node.get("metadata", {}).get("name", "unknown") worker_type = self._get_node_type(node) # Apply type filter if specified if type_filter and worker_type.value != type_filter: continue worker_info = { "name": node_name, "status": self._get_node_status(node).value, "type": worker_type.value, "resources": self._get_node_resources(node), "labels": node.get("metadata", {}).get("labels", {}), "annotations": node.get("metadata", {}).get("annotations", {}), "created": node.get("metadata", {}).get("creationTimestamp", "unknown") } # Add TTL info for burst workers if worker_type == WorkerType.BURST: ttl = node.get("metadata", {}).get("annotations", {}).get("worker-ttl") if ttl: worker_info["ttl_expires"] = ttl workers.append(worker_info) return workers def _call_mcp_server(self, server: str, method: str, params: Dict[str, Any]) -> Dict[str, Any]: """ Call an MCP server method Args: server: Server name ("talos" or "proxmox") method: Method name to call params: Parameters to pass Returns: Response from MCP server Raises: WorkerManagerError: If MCP call fails """ # This is a placeholder - actual implementation would use MCP protocol # For now, we'll use subprocess to call the MCP server CLI endpoint = None if server == "talos": endpoint = self.talos_mcp_endpoint elif server == "proxmox": endpoint = self.proxmox_mcp_endpoint else: raise WorkerManagerError(f"Unknown MCP server: {server}") if not endpoint: raise WorkerManagerError(f"No endpoint configured for {server} MCP server") # Placeholder implementation raise NotImplementedError( f"MCP server integration not yet implemented. " f"Would call {server} MCP server at {endpoint} with method {method}" ) def provision_workers( self, count: int, ttl: int, size: str = "medium" ) -> List[Dict[str, Any]]: """ Create burst workers Args: count: Number of workers to create ttl: Time-to-live in hours size: Worker size ("small", "medium", or "large") Returns: List of provisioned worker information Raises: WorkerManagerError: If provisioning fails """ # Validate inputs if count < 1 or count > 10: raise WorkerManagerError("Worker count must be between 1 and 10") if ttl < 1 or ttl > 168: # Max 1 week raise WorkerManagerError("TTL must be between 1 and 168 hours") if size not in WORKER_SIZES: raise WorkerManagerError(f"Invalid size. Must be one of: {list(WORKER_SIZES.keys())}") size_spec = WORKER_SIZES[size] ttl_expiry = datetime.utcnow() + timedelta(hours=ttl) provisioned_workers = [] for i in range(count): worker_name = f"burst-worker-{int(time.time())}-{i}" try: # Step 1: Create VM via Talos or Proxmox MCP # This is a placeholder - actual implementation would call MCP server vm_params = { "name": worker_name, "cpu": size_spec["cpu"], "memory_gb": size_spec["memory_gb"], "disk_gb": size_spec["disk_gb"], "labels": { "worker-type": "burst", "provisioned-by": "resource-manager-mcp" } } # Placeholder for MCP call # vm_info = self._call_mcp_server("talos", "create_vm", vm_params) # Step 2: Join to Kubernetes cluster # This would involve getting the join token and running kubeadm on the new VM # For now, we'll document the expected process worker_info = { "name": worker_name, "status": "provisioning", "type": WorkerType.BURST.value, "size": size, "resources": size_spec, "ttl_hours": ttl, "ttl_expires": ttl_expiry.isoformat(), "created_at": datetime.utcnow().isoformat() } provisioned_workers.append(worker_info) # In real implementation, we would: # 1. Call Talos/Proxmox MCP to create VM # 2. Wait for VM to boot # 3. Get kubeadm join token # 4. Run kubeadm join on new node # 5. Label the node with worker-type=burst # 6. Annotate with TTL information # 7. Set up automatic cleanup job except Exception as e: raise WorkerManagerError( f"Failed to provision worker {worker_name}: {str(e)}" ) return provisioned_workers def drain_worker(self, worker_id: str) -> Dict[str, Any]: """ Gracefully drain a worker Args: worker_id: Worker node name Returns: Drain operation status Raises: WorkerManagerError: If drain fails """ # Verify worker exists try: node = self._run_kubectl(["get", "node", worker_id, "-o", "json"]) except WorkerManagerError: raise WorkerManagerError(f"Worker {worker_id} not found") # Drain the node try: # Use kubectl drain with proper flags cmd = ["kubectl"] if self.kubectl_context: cmd.extend(["--context", self.kubectl_context]) cmd.extend([ "drain", worker_id, "--ignore-daemonsets", # Ignore DaemonSets "--delete-emptydir-data", # Delete pods with emptyDir volumes "--force", # Force deletion of standalone pods "--grace-period=300" # 5 minute grace period ]) result = subprocess.run( cmd, capture_output=True, text=True, check=True ) return { "worker_id": worker_id, "status": "draining", "message": "Worker drain initiated successfully", "output": result.stdout } except subprocess.CalledProcessError as e: raise WorkerManagerError(f"Failed to drain worker {worker_id}: {e.stderr}") def destroy_worker(self, worker_id: str, force: bool = False) -> Dict[str, Any]: """ Destroy a burst worker Args: worker_id: Worker node name force: Force destroy even if not drained (use with caution) Returns: Destroy operation status Raises: WorkerManagerError: If destroy fails or worker is permanent """ # Get worker information try: node = self._run_kubectl(["get", "node", worker_id, "-o", "json"]) except WorkerManagerError: raise WorkerManagerError(f"Worker {worker_id} not found") # SAFETY CHECK: Verify this is a burst worker worker_type = self._get_node_type(node) if worker_type != WorkerType.BURST: raise WorkerManagerError( f"SAFETY VIOLATION: Cannot destroy permanent worker {worker_id}. " f"Only burst workers can be destroyed." ) # Check if worker is drained (unless force is True) if not force: spec = node.get("spec", {}) if not spec.get("unschedulable", False): raise WorkerManagerError( f"Worker {worker_id} is not drained. " f"Run drain_worker first or use force=True (not recommended)" ) # Step 1: Remove from Kubernetes cluster try: self._run_kubectl(["delete", "node", worker_id]) except WorkerManagerError as e: raise WorkerManagerError( f"Failed to remove worker {worker_id} from cluster: {str(e)}" ) # Step 2: Destroy the VM via Talos/Proxmox MCP try: # Placeholder for MCP call # vm_delete_result = self._call_mcp_server( # "talos", # "delete_vm", # {"name": worker_id} # ) return { "worker_id": worker_id, "status": "destroyed", "message": f"Burst worker {worker_id} successfully destroyed", "removed_from_cluster": True, "vm_deleted": True # Would be actual result from MCP } except Exception as e: # Node removed from cluster but VM deletion failed return { "worker_id": worker_id, "status": "partial_destroy", "message": f"Worker removed from cluster but VM deletion failed: {str(e)}", "removed_from_cluster": True, "vm_deleted": False, "error": str(e) } def get_worker_details(self, worker_id: str) -> Dict[str, Any]: """ Get detailed information about a specific worker Args: worker_id: Worker node name Returns: Detailed worker information Raises: WorkerManagerError: If worker not found """ try: node = self._run_kubectl(["get", "node", worker_id, "-o", "json"]) except WorkerManagerError: raise WorkerManagerError(f"Worker {worker_id} not found") worker_type = self._get_node_type(node) details = { "name": worker_id, "status": self._get_node_status(node).value, "type": worker_type.value, "resources": self._get_node_resources(node), "labels": node.get("metadata", {}).get("labels", {}), "annotations": node.get("metadata", {}).get("annotations", {}), "created": node.get("metadata", {}).get("creationTimestamp", "unknown"), "conditions": node.get("status", {}).get("conditions", []), "addresses": node.get("status", {}).get("addresses", []) } # Add TTL info for burst workers if worker_type == WorkerType.BURST: ttl = node.get("metadata", {}).get("annotations", {}).get("worker-ttl") if ttl: details["ttl_expires"] = ttl return details

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ry-ops/cortex-resource-manager'

If you have feedback or need assistance with the MCP directory API, please join our Discord server