Skip to main content
Glama
RESOURCE_ALLOCATION.md12.8 kB
# Resource Allocation API Core orchestration logic for cortex resource management. ## Overview The Resource Allocation API provides tools for managing resources across cortex jobs: - Request and release resources (MCP servers + workers) - Track allocations with unique IDs - Monitor cluster capacity - Automatic TTL/expiry handling - In-memory allocation tracking ## Quick Start ```python from allocation_manager import AllocationManager # Initialize manager manager = AllocationManager( total_cpu=16.0, total_memory=32768, # 32GB total_workers=10 ) # Request resources allocation = manager.request_resources( job_id="feature-dev-001", mcp_servers=["filesystem", "github", "database"], workers=4, priority="high", ttl_seconds=7200 ) # Use the resources... allocation_id = allocation['allocation_id'] # Release when done manager.release_resources(allocation_id) ``` ## API Reference ### request_resources() Reserve resources for a job. ```python allocation = manager.request_resources( job_id="job-001", mcp_servers=["filesystem", "github"], workers=4, priority="high", ttl_seconds=7200, metadata={"task_type": "feature"} ) ``` **Parameters:** - `job_id` (str, required): Unique job identifier - `mcp_servers` (list, required): MCP server names to start - `workers` (int, optional): Number of workers to provision - `priority` (str, optional): "low", "normal", "high", or "critical" (default: "normal") - `ttl_seconds` (int, optional): Time-to-live in seconds (default: 3600) - `metadata` (dict, optional): Additional metadata **Returns:** ```python { "allocation_id": "alloc-abc123", "status": "active", "job_id": "job-001", "mcp_servers": [ { "name": "filesystem", "endpoint": "http://localhost:9000", "status": "running" } ], "workers_allocated": [ { "worker_id": "worker-job-001-000", "endpoint": "http://localhost:8000", "cpu": 1.0, "memory": 2048 } ], "resources": { "cpu": 4.0, "memory": 8192, "workers": 4 }, "ttl_seconds": 7200, "created_at": "2025-12-08T19:00:00Z" } ``` **Failure Response:** ```python { "allocation_id": "alloc-xyz789", "status": "failed", "error": "Insufficient workers: requested 10, available 6" } ``` ### release_resources() Release resources after job completion. ```python result = manager.release_resources(allocation_id="alloc-abc123") ``` **Parameters:** - `allocation_id` (str, required): Allocation identifier **Returns:** ```python { "status": "released", "allocation_id": "alloc-abc123", "job_id": "job-001", "workers_released": 4, "cpu_freed": 4.0, "memory_freed": 8192, "released_at": "2025-12-08T21:00:00Z", "duration_seconds": 7200 } ``` ### get_capacity() Get current cluster capacity. ```python capacity = manager.get_capacity() ``` **Returns:** ```python { "total_cpu": 16.0, "total_memory": 32768, "total_workers": 10, "allocated_cpu": 4.0, "allocated_memory": 8192, "allocated_workers": 4, "available_cpu": 12.0, "available_memory": 24576, "available_workers": 6, "running_mcp_servers": ["filesystem", "github", "database"], "active_allocations": 2 } ``` ### get_allocation() Get details of a specific allocation. ```python details = manager.get_allocation(allocation_id="alloc-abc123") ``` **Returns:** ```python { "allocation_id": "alloc-abc123", "job_id": "job-001", "state": "active", "priority": "high", "resources": { "cpu_allocated": 4.0, "memory_allocated": 8192, "workers": 4 }, "mcp_servers": [...], "workers": [...], "timestamps": { "created_at": "2025-12-08T19:00:00Z", "activated_at": "2025-12-08T19:00:01Z", "released_at": null, "age_seconds": 3600 }, "ttl_seconds": 7200, "is_expired": false, "metadata": {} } ``` **Returns None if allocation not found.** ### list_allocations() List allocations with optional filtering. ```python # All allocations allocations = manager.list_allocations() # Filter by state active = manager.list_allocations(state="active") # Filter by job job_allocs = manager.list_allocations(job_id="job-001") ``` **Parameters:** - `state` (str, optional): Filter by state - `job_id` (str, optional): Filter by job ID **Returns:** ```python [ { "allocation_id": "alloc-abc123", "job_id": "job-001", "state": "active", "priority": "high", "workers": 4, "age_seconds": 3600, "is_expired": false } ] ``` ### cleanup_expired_allocations() Manually trigger cleanup of expired allocations. ```python expired = manager.cleanup_expired_allocations() print(f"Cleaned up: {expired}") ``` **Returns:** List of cleaned up allocation IDs. ## Data Structures ### Allocation States | State | Description | |-------|-------------| | `pending` | Allocation created but not yet active | | `active` | Resources allocated and active | | `releasing` | In process of being released | | `released` | Resources released | | `failed` | Allocation failed | ### Priority Levels | Priority | Description | |----------|-------------| | `low` | Best-effort, may be preempted | | `normal` | Standard priority (default) | | `high` | Preferred scheduling | | `critical` | Highest priority, reserved resources | ### Worker Specification ```python { "worker_id": "worker-job-001-000", "worker_type": "cortex-worker", "cpu": 1.0, "memory": 2048, # MB "status": "active", "endpoint": "http://localhost:8000" } ``` ### MCP Server Specification ```python { "server_name": "filesystem", "endpoint": "http://localhost:9000", "status": "running", "port": 9000 } ``` ## Configuration ### Default Cluster Capacity ```python AllocationManager( total_cpu=16.0, # 16 cores total_memory=32768, # 32GB total_workers=10 # 10 worker slots ) ``` ### Default Resource Usage Per Worker - CPU: 1.0 core - Memory: 2048 MB (2GB) ### Port Allocation - MCP servers: 9000-9099 (100 ports) - Workers: 8000+ (dynamic) ## Usage Patterns ### Check Capacity Before Allocation ```python capacity = manager.get_capacity() if capacity['available_workers'] >= 4: allocation = manager.request_resources( job_id="job-001", mcp_servers=["filesystem"], workers=4 ) else: print("Insufficient capacity") ``` ### Handle Allocation Failures ```python result = manager.request_resources( job_id="job-001", mcp_servers=["filesystem"], workers=100 # Too many ) if result['status'] == 'failed': print(f"Allocation failed: {result['error']}") else: allocation_id = result['allocation_id'] ``` ### Monitor Active Allocations ```python # List all active allocations active = manager.list_allocations(state="active") for alloc in active: print(f"Job: {alloc['job_id']}") print(f"Workers: {alloc['workers']}") print(f"Age: {alloc['age_seconds']}s") print(f"Expired: {alloc['is_expired']}") ``` ### Automatic Expiry Cleanup The manager automatically cleans up expired allocations: - Background task runs every 5 minutes (in MCP server) - Checks all active allocations for TTL expiry - Automatically releases expired allocations - Manual trigger: `cleanup_expired_allocations()` ```python # Manual cleanup expired = manager.cleanup_expired_allocations() if expired: print(f"Cleaned up {len(expired)} expired allocations") ``` ## Integration with Cortex ### Coordinator Master ```python # Request resources for a development task allocation = manager.request_resources( job_id="task-feature-auth", mcp_servers=["filesystem", "github"], workers=2, priority="normal", metadata={ "master": "development", "task_type": "feature_implementation" } ) # Hand off allocation_id to development master ``` ### Development Master ```python # Receive allocation from coordinator allocation_id = task_data['allocation_id'] # Get allocation details allocation = manager.get_allocation(allocation_id) mcp_endpoints = allocation['mcp_servers'] worker_endpoints = allocation['workers'] # Use resources for development work... # Release when done manager.release_resources(allocation_id) ``` ### Security Master ```python # High priority security scan allocation = manager.request_resources( job_id="security-scan-001", mcp_servers=["filesystem", "github"], workers=4, priority="high", ttl_seconds=1800, # 30 minutes metadata={ "master": "security", "scan_type": "vulnerability_scan" } ) ``` ## Error Handling ### Insufficient Resources ```python result = manager.request_resources( job_id="job-001", mcp_servers=["filesystem"], workers=20 # Exceeds capacity ) if result['status'] == 'failed': print(result['error']) # "Insufficient workers: requested 20, available 10" ``` ### Allocation Not Found ```python details = manager.get_allocation("nonexistent-id") if details is None: print("Allocation not found") ``` ### Release Non-Existent Allocation ```python result = manager.release_resources("nonexistent-id") if result['status'] == 'error': print(result['error']) # "Allocation nonexistent-id not found" ``` ## Best Practices ### 1. Check Capacity First Always check capacity before requesting resources: ```python capacity = manager.get_capacity() workers_needed = 4 if capacity['available_workers'] >= workers_needed: allocation = manager.request_resources(...) ``` ### 2. Set Appropriate TTL Choose TTL based on expected job duration: ```python # Short task: 30 minutes ttl_seconds=1800 # Normal task: 1 hour (default) ttl_seconds=3600 # Long task: 4 hours ttl_seconds=14400 ``` ### 3. Use Priority Correctly - `low`: Background/non-urgent tasks - `normal`: Standard tasks (default) - `high`: Important/time-sensitive tasks - `critical`: Emergency/critical tasks only ### 4. Always Release Resources ```python try: allocation = manager.request_resources(...) allocation_id = allocation['allocation_id'] # Do work... finally: manager.release_resources(allocation_id) ``` ### 5. Add Metadata for Tracking ```python allocation = manager.request_resources( job_id="job-001", mcp_servers=["filesystem"], metadata={ "master": "development", "task_type": "feature_implementation", "assigned_to": "dev-worker-001", "project": "authentication_system" } ) ``` ## Performance Considerations ### Resource Limits - Max workers per allocation: Limited by cluster capacity - Max concurrent allocations: Unlimited (memory permitting) - MCP server reuse: Servers are shared across allocations - Worker isolation: Each worker is dedicated to one allocation ### Memory Usage In-memory tracking means: - Fast allocation/release operations - No database overhead - State lost on server restart - Suitable for transient resource management ### Scalability For production deployment: - Consider persistent storage (SQLite/PostgreSQL) - Implement allocation recovery on restart - Add metrics and monitoring - Consider distributed allocation management ## MCP Server Integration The allocation manager is exposed via MCP tools in `server.py`: ### Available MCP Tools 1. `request_resources` - Request resources for a job 2. `release_resources` - Release resources 3. `get_capacity` - Get cluster capacity 4. `get_allocation` - Get allocation details 5. `list_allocations` - List allocations with filtering 6. `cleanup_expired` - Trigger manual cleanup ### Running the MCP Server ```bash python src/server.py ``` ### Using from MCP Client ```python # Via MCP protocol result = mcp_client.call_tool("request_resources", { "job_id": "job-001", "mcp_servers": ["filesystem", "github"], "workers": 4, "priority": "high", "ttl_seconds": 7200 }) ``` ## Future Enhancements Planned improvements: 1. **Persistent Storage**: SQLite/PostgreSQL backend 2. **Resource Quotas**: Per-job-type resource limits 3. **Advanced Scheduling**: Bin packing, affinity rules 4. **Preemption**: Low-priority job preemption 5. **Metrics**: Prometheus integration 6. **Health Checks**: MCP server/worker health monitoring 7. **Auto-Scaling**: Dynamic capacity adjustment 8. **Allocation Recovery**: Recover state on restart 9. **Resource Reservation**: Pre-reserve resources 10. **Cost Tracking**: Track resource costs per job ## See Also - [Main README](README.md) - MCP server lifecycle management - [Worker Management](src/worker_manager.py) - Worker provisioning and management - [MCP Server](src/server.py) - MCP server implementation

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ry-ops/cortex-resource-manager'

If you have feedback or need assistance with the MCP directory API, please join our Discord server