Skip to main content
Glama
allocation_manager.py18.2 kB
""" Resource Allocation Manager Handles resource allocation, tracking, and lifecycle management for cortex jobs. """ import uuid import time from dataclasses import dataclass, field, asdict from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from enum import Enum class AllocationState(str, Enum): """Allocation lifecycle states""" PENDING = "pending" ACTIVE = "active" RELEASING = "releasing" RELEASED = "released" FAILED = "failed" class Priority(str, Enum): """Job priority levels""" LOW = "low" NORMAL = "normal" HIGH = "high" CRITICAL = "critical" @dataclass class WorkerSpec: """Worker specification""" worker_id: str worker_type: str cpu: float memory: int # MB status: str = "pending" endpoint: Optional[str] = None @dataclass class MCPServerSpec: """MCP Server specification""" server_name: str endpoint: Optional[str] = None status: str = "pending" port: Optional[int] = None @dataclass class ResourceAllocation: """Resource allocation record""" allocation_id: str job_id: str state: AllocationState priority: Priority # Requested resources mcp_servers: List[str] workers_requested: Optional[int] = None # Allocated resources mcp_server_specs: List[MCPServerSpec] = field(default_factory=list) workers_allocated: List[WorkerSpec] = field(default_factory=list) # Resource usage cpu_allocated: float = 0.0 memory_allocated: int = 0 # MB # Timestamps created_at: datetime = field(default_factory=datetime.utcnow) activated_at: Optional[datetime] = None released_at: Optional[datetime] = None ttl_seconds: int = 3600 # 1 hour default # Metadata metadata: Dict[str, Any] = field(default_factory=dict) def is_expired(self) -> bool: """Check if allocation has exceeded TTL""" if self.state in [AllocationState.RELEASED, AllocationState.FAILED]: return False reference_time = self.activated_at or self.created_at expiry_time = reference_time + timedelta(seconds=self.ttl_seconds) return datetime.utcnow() > expiry_time def age_seconds(self) -> float: """Get allocation age in seconds""" return (datetime.utcnow() - self.created_at).total_seconds() def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization""" data = asdict(self) # Convert enums to strings data['state'] = self.state.value data['priority'] = self.priority.value # Convert datetime objects data['created_at'] = self.created_at.isoformat() data['activated_at'] = self.activated_at.isoformat() if self.activated_at else None data['released_at'] = self.released_at.isoformat() if self.released_at else None return data @dataclass class ClusterCapacity: """Cluster resource capacity""" total_cpu: float total_memory: int # MB total_workers: int allocated_cpu: float = 0.0 allocated_memory: int = 0 allocated_workers: int = 0 running_mcp_servers: List[str] = field(default_factory=list) active_allocations: int = 0 @property def available_cpu(self) -> float: return self.total_cpu - self.allocated_cpu @property def available_memory(self) -> int: return self.total_memory - self.allocated_memory @property def available_workers(self) -> int: return self.total_workers - self.allocated_workers def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { 'total_cpu': self.total_cpu, 'total_memory': self.total_memory, 'total_workers': self.total_workers, 'allocated_cpu': self.allocated_cpu, 'allocated_memory': self.allocated_memory, 'allocated_workers': self.allocated_workers, 'available_cpu': self.available_cpu, 'available_memory': self.available_memory, 'available_workers': self.available_workers, 'running_mcp_servers': self.running_mcp_servers, 'active_allocations': self.active_allocations } class AllocationManager: """Manages resource allocations and cluster capacity""" def __init__( self, total_cpu: float = 16.0, total_memory: int = 32768, # 32GB total_workers: int = 10 ): self.capacity = ClusterCapacity( total_cpu=total_cpu, total_memory=total_memory, total_workers=total_workers ) self.allocations: Dict[str, ResourceAllocation] = {} self.mcp_server_registry: Dict[str, MCPServerSpec] = {} # Configuration self.worker_cpu = 1.0 # CPU per worker self.worker_memory = 2048 # MB per worker self.mcp_server_ports = list(range(9000, 9100)) # Port pool self.next_port_idx = 0 def _allocate_port(self) -> int: """Allocate next available port""" port = self.mcp_server_ports[self.next_port_idx] self.next_port_idx = (self.next_port_idx + 1) % len(self.mcp_server_ports) return port def _start_mcp_server(self, server_name: str) -> MCPServerSpec: """Start or reuse MCP server""" # Check if server already running if server_name in self.mcp_server_registry: spec = self.mcp_server_registry[server_name] if spec.status == "running": return spec # Allocate new server port = self._allocate_port() spec = MCPServerSpec( server_name=server_name, endpoint=f"http://localhost:{port}", status="running", port=port ) self.mcp_server_registry[server_name] = spec # Add to capacity tracking if server_name not in self.capacity.running_mcp_servers: self.capacity.running_mcp_servers.append(server_name) return spec def _provision_workers(self, count: int, job_id: str) -> List[WorkerSpec]: """Provision workers for job""" workers = [] for i in range(count): worker_id = f"worker-{job_id}-{i:03d}" worker = WorkerSpec( worker_id=worker_id, worker_type="cortex-worker", cpu=self.worker_cpu, memory=self.worker_memory, status="active", endpoint=f"http://localhost:{8000 + len(self.allocations) * 10 + i}" ) workers.append(worker) return workers def _check_capacity(self, workers_count: int) -> tuple[bool, Optional[str]]: """Check if resources are available""" if workers_count > self.capacity.available_workers: return False, f"Insufficient workers: requested {workers_count}, available {self.capacity.available_workers}" cpu_needed = workers_count * self.worker_cpu if cpu_needed > self.capacity.available_cpu: return False, f"Insufficient CPU: needed {cpu_needed}, available {self.capacity.available_cpu}" memory_needed = workers_count * self.worker_memory if memory_needed > self.capacity.available_memory: return False, f"Insufficient memory: needed {memory_needed}MB, available {self.capacity.available_memory}MB" return True, None def request_resources( self, job_id: str, mcp_servers: List[str], workers: Optional[int] = None, priority: str = "normal", ttl_seconds: int = 3600, metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Request resources for a job. Args: job_id: Unique job identifier mcp_servers: List of MCP server names to start workers: Number of workers to provision (optional) priority: Job priority (low, normal, high, critical) ttl_seconds: Time-to-live for allocation metadata: Additional metadata Returns: Dict with allocation details """ allocation_id = f"alloc-{uuid.uuid4().hex[:12]}" # Validate priority try: priority_enum = Priority(priority) except ValueError: priority_enum = Priority.NORMAL # Create allocation allocation = ResourceAllocation( allocation_id=allocation_id, job_id=job_id, state=AllocationState.PENDING, priority=priority_enum, mcp_servers=mcp_servers, workers_requested=workers, ttl_seconds=ttl_seconds, metadata=metadata or {} ) try: # Check capacity if workers requested if workers and workers > 0: can_allocate, error_msg = self._check_capacity(workers) if not can_allocate: allocation.state = AllocationState.FAILED allocation.metadata['error'] = error_msg self.allocations[allocation_id] = allocation return { 'allocation_id': allocation_id, 'status': 'failed', 'error': error_msg } # Start MCP servers for server_name in mcp_servers: server_spec = self._start_mcp_server(server_name) allocation.mcp_server_specs.append(server_spec) # Provision workers if requested if workers and workers > 0: worker_specs = self._provision_workers(workers, job_id) allocation.workers_allocated = worker_specs allocation.cpu_allocated = workers * self.worker_cpu allocation.memory_allocated = workers * self.worker_memory # Update capacity self.capacity.allocated_workers += workers self.capacity.allocated_cpu += allocation.cpu_allocated self.capacity.allocated_memory += allocation.memory_allocated # Activate allocation allocation.state = AllocationState.ACTIVE allocation.activated_at = datetime.utcnow() self.capacity.active_allocations += 1 # Store allocation self.allocations[allocation_id] = allocation return { 'allocation_id': allocation_id, 'status': 'active', 'job_id': job_id, 'mcp_servers': [ { 'name': spec.server_name, 'endpoint': spec.endpoint, 'status': spec.status } for spec in allocation.mcp_server_specs ], 'workers_allocated': [ { 'worker_id': w.worker_id, 'endpoint': w.endpoint, 'cpu': w.cpu, 'memory': w.memory } for w in allocation.workers_allocated ], 'resources': { 'cpu': allocation.cpu_allocated, 'memory': allocation.memory_allocated, 'workers': len(allocation.workers_allocated) }, 'ttl_seconds': ttl_seconds, 'created_at': allocation.created_at.isoformat() } except Exception as e: allocation.state = AllocationState.FAILED allocation.metadata['error'] = str(e) self.allocations[allocation_id] = allocation return { 'allocation_id': allocation_id, 'status': 'failed', 'error': str(e) } def release_resources(self, allocation_id: str) -> Dict[str, Any]: """ Release resources for an allocation. Args: allocation_id: Allocation identifier Returns: Dict with release status """ if allocation_id not in self.allocations: return { 'status': 'error', 'error': f'Allocation {allocation_id} not found' } allocation = self.allocations[allocation_id] if allocation.state in [AllocationState.RELEASED, AllocationState.RELEASING]: return { 'status': 'already_released', 'allocation_id': allocation_id, 'released_at': allocation.released_at.isoformat() if allocation.released_at else None } # Mark as releasing allocation.state = AllocationState.RELEASING try: # Release workers workers_released = len(allocation.workers_allocated) if workers_released > 0: self.capacity.allocated_workers -= workers_released self.capacity.allocated_cpu -= allocation.cpu_allocated self.capacity.allocated_memory -= allocation.memory_allocated # Mark workers for destruction (simulate queuing) for worker in allocation.workers_allocated: worker.status = "destroying" # MCP servers remain running (idle timeout will handle them) # In production, you'd mark them for scale-down after idle period # Mark as released allocation.state = AllocationState.RELEASED allocation.released_at = datetime.utcnow() self.capacity.active_allocations -= 1 return { 'status': 'released', 'allocation_id': allocation_id, 'job_id': allocation.job_id, 'workers_released': workers_released, 'cpu_freed': allocation.cpu_allocated, 'memory_freed': allocation.memory_allocated, 'released_at': allocation.released_at.isoformat(), 'duration_seconds': allocation.age_seconds() } except Exception as e: allocation.state = AllocationState.FAILED allocation.metadata['release_error'] = str(e) return { 'status': 'error', 'error': str(e) } def get_capacity(self) -> Dict[str, Any]: """ Get current cluster capacity and utilization. Returns: Dict with capacity information """ return self.capacity.to_dict() def get_allocation(self, allocation_id: str) -> Optional[Dict[str, Any]]: """ Get details of a specific allocation. Args: allocation_id: Allocation identifier Returns: Dict with allocation details or None if not found """ if allocation_id not in self.allocations: return None allocation = self.allocations[allocation_id] return { 'allocation_id': allocation.allocation_id, 'job_id': allocation.job_id, 'state': allocation.state.value, 'priority': allocation.priority.value, 'resources': { 'cpu_allocated': allocation.cpu_allocated, 'memory_allocated': allocation.memory_allocated, 'workers': len(allocation.workers_allocated) }, 'mcp_servers': [ { 'name': spec.server_name, 'endpoint': spec.endpoint, 'status': spec.status } for spec in allocation.mcp_server_specs ], 'workers': [ { 'worker_id': w.worker_id, 'endpoint': w.endpoint, 'status': w.status, 'cpu': w.cpu, 'memory': w.memory } for w in allocation.workers_allocated ], 'timestamps': { 'created_at': allocation.created_at.isoformat(), 'activated_at': allocation.activated_at.isoformat() if allocation.activated_at else None, 'released_at': allocation.released_at.isoformat() if allocation.released_at else None, 'age_seconds': allocation.age_seconds() }, 'ttl_seconds': allocation.ttl_seconds, 'is_expired': allocation.is_expired(), 'metadata': allocation.metadata } def cleanup_expired_allocations(self) -> List[str]: """ Clean up expired allocations. Returns: List of cleaned up allocation IDs """ expired = [] for allocation_id, allocation in list(self.allocations.items()): if allocation.is_expired() and allocation.state == AllocationState.ACTIVE: self.release_resources(allocation_id) expired.append(allocation_id) return expired def list_allocations( self, state: Optional[str] = None, job_id: Optional[str] = None ) -> List[Dict[str, Any]]: """ List allocations with optional filtering. Args: state: Filter by allocation state job_id: Filter by job ID Returns: List of allocation summaries """ results = [] for allocation_id, allocation in self.allocations.items(): # Apply filters if state and allocation.state.value != state: continue if job_id and allocation.job_id != job_id: continue results.append({ 'allocation_id': allocation_id, 'job_id': allocation.job_id, 'state': allocation.state.value, 'priority': allocation.priority.value, 'workers': len(allocation.workers_allocated), 'age_seconds': allocation.age_seconds(), 'is_expired': allocation.is_expired() }) return results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ry-ops/cortex-resource-manager'

If you have feedback or need assistance with the MCP directory API, please join our Discord server