Skip to main content
Glama
README.md17.1 kB
# Cortex Resource Manager > Part of the [Cortex Ecosystem](https://github.com/ry-ops/cortex) - Multi-agent AI system for autonomous repository management An MCP (Model Context Protocol) server for managing resource allocation, MCP server lifecycle, and Kubernetes workers in the Cortex automation system. **Repository**: [ry-ops/cortex-resource-manager](https://github.com/ry-ops/cortex-resource-manager) **Main Cortex Repository**: [ry-ops/cortex](https://github.com/ry-ops/cortex) ## Features ### Resource Allocation (Core Orchestration) - Request resources for jobs (MCP servers + workers) - Release resources after job completion - Track allocations with unique IDs - Get current cluster capacity - Query allocation details - Automatic TTL/expiry handling - In-memory allocation tracking ### MCP Server Lifecycle Management - List all registered MCP servers with status - Get detailed status of individual MCP servers - Start MCP servers (scale from 0 to 1) - Stop MCP servers (scale to 0) - Scale MCP servers horizontally (0-10 replicas) - Automatic health checking and readiness waiting - Graceful and forced shutdown options ### Worker Management - List Kubernetes workers (permanent and burst) with filtering - Provision burst workers with configurable TTL and size - Drain workers gracefully before destruction - Destroy burst workers safely with protection for permanent workers - Get detailed worker information including resources and status - Integration with Talos MCP and Proxmox MCP for VM provisioning ## Overview The Cortex Resource Manager provides 16 tools organized into 3 categories. This MCP server is part of Cortex's infrastructure division, enabling dynamic resource allocation across the multi-divisional organization. See the [Cortex Holdings Structure](https://github.com/ry-ops/cortex/blob/main/coordination/divisions/README.md) for more information about how Cortex operates as a multi-divisional automation system. ### Tool Categories 1. **Resource Allocation (5 tools)**: Core orchestration API for managing cortex job resources - `request_resources` - Request MCP servers and workers for a job - `release_resources` - Release allocated resources - `get_allocation` - Query allocation details - `get_capacity` - Check cluster capacity - `list_allocations` - List all active allocations 2. **MCP Server Lifecycle (5 tools)**: Manage MCP server deployments in Kubernetes - `list_mcp_servers` - List all MCP servers with status - `get_mcp_status` - Get detailed server status - `start_mcp` - Start an MCP server (scale to 1) - `stop_mcp` - Stop an MCP server (scale to 0) - `scale_mcp` - Scale MCP server horizontally (0-10 replicas) 3. **Worker Management (6 tools)**: Manage Kubernetes workers (permanent and burst) - `list_workers` - List all workers with filtering - `provision_workers` - Create burst workers with TTL - `drain_worker` - Gracefully drain a worker - `destroy_worker` - Safely destroy burst workers - `get_worker_details` - Get detailed worker information - `get_worker_capacity` - Check worker resource capacity ## Installation ```bash # Install from PyPI (when published) pip install cortex-resource-manager # Or install from source git clone https://github.com/ry-ops/cortex-resource-manager.git cd cortex-resource-manager pip install -r requirements.txt pip install -e . ``` ## Requirements - Python 3.8+ - Kubernetes cluster access - Properly configured kubeconfig or in-cluster service account ## Usage ### Resource Allocation Tools The core orchestration API for cortex job management: ```python from allocation_manager import AllocationManager # Create manager manager = AllocationManager( total_cpu=16.0, total_memory=32768, # 32GB total_workers=10 ) # Request resources for a job allocation = manager.request_resources( job_id="feature-dev-001", mcp_servers=["filesystem", "github", "database"], workers=4, priority="high", ttl_seconds=7200, metadata={"task_type": "feature_implementation"} ) print(f"Allocation ID: {allocation['allocation_id']}") print(f"MCP Servers: {allocation['mcp_servers']}") print(f"Workers: {allocation['workers_allocated']}") # Check cluster capacity capacity = manager.get_capacity() print(f"Available workers: {capacity['available_workers']}") print(f"Available CPU: {capacity['available_cpu']}") # Get allocation details details = manager.get_allocation(allocation['allocation_id']) print(f"State: {details['state']}") print(f"Age: {details['timestamps']['age_seconds']}s") # Release resources when done result = manager.release_resources(allocation['allocation_id']) print(f"Released {result['workers_released']} workers") ``` ### MCP Server Lifecycle (Convenience Functions) ```python from resource_manager_mcp_server import ( list_mcp_servers, get_mcp_status, start_mcp, stop_mcp, scale_mcp ) # List all MCP servers servers = list_mcp_servers() for server in servers: print(f"Server: {server['name']}, Status: {server['status']}, Replicas: {server['replicas']}") # Get detailed status status = get_mcp_status("example-mcp-server") print(f"Status: {status['status']}") print(f"Ready: {status['ready_replicas']}/{status['replicas']}") print(f"Endpoints: {status['endpoints']}") # Start a server (wait for ready) result = start_mcp("example-mcp-server", wait_ready=True) print(f"Started: {result['name']}, Status: {result['status']}") # Scale a server result = scale_mcp("example-mcp-server", replicas=3) print(f"Scaled to {result['replicas']} replicas") # Stop a server (graceful shutdown) result = stop_mcp("example-mcp-server") print(f"Stopped: {result['name']}") # Force stop (immediate termination) result = stop_mcp("example-mcp-server", force=True) ``` ### Advanced Usage (Manager Class) ```python from resource_manager_mcp_server import MCPLifecycleManager # Create manager instance manager = MCPLifecycleManager( namespace="production", kubeconfig_path="/path/to/kubeconfig" ) # List servers with custom label selector servers = manager.list_mcp_servers( label_selector="app.kubernetes.io/component=mcp-server,environment=prod" ) # Start server without waiting status = manager.start_mcp("my-mcp-server", wait_ready=False) # Scale with custom timeout status = manager.scale_mcp( "my-mcp-server", replicas=5, wait_ready=True, timeout=600 # 10 minutes ) ``` ## API Reference ### list_mcp_servers() List all registered MCP servers. **Parameters:** - `namespace` (str): Kubernetes namespace (default: "default") - `label_selector` (str): Label selector to filter deployments (default: "app.kubernetes.io/component=mcp-server") **Returns:** List of dictionaries with: - `name`: Server name - `status`: Current status ("running", "stopped", "scaling", "pending") - `replicas`: Desired replica count - `ready_replicas`: Number of ready replicas - `endpoints`: List of service endpoints ### get_mcp_status(name) Get detailed status of one MCP server. **Parameters:** - `name` (str): MCP server name - `namespace` (str): Kubernetes namespace (default: "default") **Returns:** Dictionary with: - `name`: Server name - `status`: Current status - `replicas`: Desired replica count - `ready_replicas`: Number of ready replicas - `available_replicas`: Number of available replicas - `updated_replicas`: Number of updated replicas - `endpoints`: List of service endpoints - `last_activity`: Timestamp of last deployment update - `conditions`: List of deployment conditions **Raises:** - `ValueError`: If server not found ### start_mcp(name, wait_ready=True) Start an MCP server by scaling from 0 to 1 replica. **Parameters:** - `name` (str): MCP server name - `wait_ready` (bool): Wait for server to be ready (default: True) - `timeout` (int): Maximum wait time in seconds (default: 300) - `namespace` (str): Kubernetes namespace (default: "default") **Returns:** Dictionary with server status after starting **Raises:** - `ValueError`: If server not found - `TimeoutError`: If wait_ready=True and server doesn't become ready ### stop_mcp(name, force=False) Stop an MCP server by scaling to 0 replicas. **Parameters:** - `name` (str): MCP server name - `force` (bool): Force immediate termination (default: False) - `namespace` (str): Kubernetes namespace (default: "default") **Returns:** Dictionary with server status after stopping **Raises:** - `ValueError`: If server not found ### scale_mcp(name, replicas) Scale an MCP server horizontally. **Parameters:** - `name` (str): MCP server name - `replicas` (int): Desired replica count (0-10) - `wait_ready` (bool): Wait for all replicas to be ready (default: False) - `timeout` (int): Maximum wait time in seconds (default: 300) - `namespace` (str): Kubernetes namespace (default: "default") **Returns:** Dictionary with server status after scaling **Raises:** - `ValueError`: If server not found or invalid replica count ### Worker Management Tools #### list_workers(type_filter=None) List all Kubernetes workers with their status, type, and resources. **Parameters:** - `type_filter` (str, optional): Filter by worker type ("permanent" or "burst") **Returns:** List of dictionaries with: - `name`: Worker node name - `status`: Worker status ("ready", "busy", "draining", "not_ready") - `type`: Worker type ("permanent" or "burst") - `resources`: Resource capacity and allocatable amounts - `labels`: Node labels - `annotations`: Node annotations - `created`: Node creation timestamp - `ttl_expires` (burst workers only): TTL expiration timestamp **Example:** ```python from worker_manager import WorkerManager manager = WorkerManager() # List all workers all_workers = manager.list_workers() print(f"Total workers: {len(all_workers)}") # List only burst workers burst_workers = manager.list_workers(type_filter="burst") print(f"Burst workers: {len(burst_workers)}") # List only permanent workers permanent_workers = manager.list_workers(type_filter="permanent") print(f"Permanent workers: {len(permanent_workers)}") ``` #### provision_workers(count, ttl, size="medium") Create burst workers by provisioning VMs and joining them to the Kubernetes cluster. **Parameters:** - `count` (int): Number of workers to provision (1-10) - `ttl` (int): Time-to-live in hours (1-168, max 1 week) - `size` (str): Worker size ("small", "medium", or "large") - small: 2 CPU, 4GB RAM, 50GB disk - medium: 4 CPU, 8GB RAM, 100GB disk - large: 8 CPU, 16GB RAM, 200GB disk **Returns:** List of provisioned worker information dictionaries **Raises:** - `WorkerManagerError`: If provisioning fails or parameters are invalid **Example:** ```python # Provision 3 medium burst workers with 24-hour TTL workers = manager.provision_workers(count=3, ttl=24, size="medium") for worker in workers: print(f"Provisioned: {worker['name']}") print(f" Status: {worker['status']}") print(f" TTL: {worker['ttl_hours']} hours") print(f" Resources: {worker['resources']}") ``` **Note:** This function integrates with Talos MCP or Proxmox MCP servers to create VMs. The VMs are automatically joined to the Kubernetes cluster and labeled as burst workers. #### drain_worker(worker_id) Gracefully drain a worker node by moving all pods to other nodes and marking it unschedulable. **Parameters:** - `worker_id` (str): Worker node name to drain **Returns:** Dictionary with drain operation status: - `worker_id`: Worker node name - `status`: Operation status ("draining") - `message`: Status message - `output`: kubectl drain command output **Raises:** - `WorkerManagerError`: If worker not found or drain fails **Example:** ```python # Drain a worker before destroying it result = manager.drain_worker("burst-worker-1234567890-0") print(f"Status: {result['status']}") print(f"Message: {result['message']}") ``` **Note:** This operation may take several minutes as pods are gracefully terminated and rescheduled to other nodes. DaemonSets are ignored, and pods with emptyDir volumes are deleted. #### destroy_worker(worker_id, force=False) Destroy a burst worker by removing it from the cluster and deleting the VM. **Parameters:** - `worker_id` (str): Worker node name to destroy - `force` (bool): Force destroy without draining first (not recommended, default: False) **Returns:** Dictionary with destroy operation status: - `worker_id`: Worker node name - `status`: Operation status ("destroyed" or "partial_destroy") - `message`: Status message - `removed_from_cluster`: Whether node was removed from cluster - `vm_deleted`: Whether VM was deleted - `error` (if failed): Error message **Raises:** - `WorkerManagerError`: If worker is permanent (SAFETY VIOLATION), not found, or not drained **SAFETY FEATURES:** - Only burst workers can be destroyed - attempting to destroy a permanent worker raises an error - Requires worker to be drained first unless force=True - Protected worker patterns prevent accidental deletion **Example:** ```python # Safe workflow: drain then destroy worker_id = "burst-worker-1234567890-0" # Step 1: Drain the worker drain_result = manager.drain_worker(worker_id) print(f"Drained: {drain_result['status']}") # Step 2: Destroy the worker destroy_result = manager.destroy_worker(worker_id) print(f"Destroyed: {destroy_result['status']}") print(f"Cluster removal: {destroy_result['removed_from_cluster']}") print(f"VM deletion: {destroy_result['vm_deleted']}") # Force destroy (not recommended - skips drain) # destroy_result = manager.destroy_worker(worker_id, force=True) ``` **WARNING:** Never destroy permanent workers! The system prevents this, but always verify worker type before destroying. #### get_worker_details(worker_id) Get detailed information about a specific worker. **Parameters:** - `worker_id` (str): Worker node name **Returns:** Dictionary with detailed worker information: - `name`: Worker node name - `status`: Worker status - `type`: Worker type - `resources`: Capacity and allocatable resources - `labels`: All node labels - `annotations`: All node annotations - `created`: Creation timestamp - `conditions`: Node conditions (Ready, MemoryPressure, DiskPressure, etc.) - `addresses`: Node IP addresses - `ttl_expires` (burst workers only): TTL expiration timestamp **Raises:** - `WorkerManagerError`: If worker not found **Example:** ```python # Get detailed information about a worker details = manager.get_worker_details("burst-worker-1234567890-0") print(f"Worker: {details['name']}") print(f"Type: {details['type']}") print(f"Status: {details['status']}") # Check resources resources = details['resources'] print(f"CPU Capacity: {resources['capacity']['cpu']}") print(f"Memory Capacity: {resources['capacity']['memory']}") # Check conditions for condition in details['conditions']: print(f"{condition['type']}: {condition['status']}") ``` ## Kubernetes Setup ### Required Labels MCP server deployments must have the label: ```yaml labels: app.kubernetes.io/component: mcp-server ``` ### Example Deployment See `config/example-mcp-deployment.yaml` for a complete example. Key requirements: 1. Deployment with `app.kubernetes.io/component: mcp-server` label 2. Service with matching selector 3. Health and readiness probes configured 4. Appropriate resource limits ### RBAC Permissions The service account needs these permissions: ```yaml apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: mcp-lifecycle-manager rules: - apiGroups: ["apps"] resources: ["deployments"] verbs: ["get", "list", "patch", "update"] - apiGroups: [""] resources: ["services", "pods"] verbs: ["get", "list", "delete"] ``` ## Error Handling All functions raise appropriate exceptions: - `ValueError`: Invalid input parameters or resource not found - `ApiException`: Kubernetes API errors - `TimeoutError`: Operations that exceed timeout limits Example error handling: ```python from kubernetes.client.rest import ApiException try: status = get_mcp_status("non-existent-server") except ValueError as e: print(f"Server not found: {e}") except ApiException as e: print(f"Kubernetes API error: {e.reason}") except Exception as e: print(f"Unexpected error: {e}") ``` ## Status Values ### Deployment Status - `running`: All replicas are ready and available - `stopped`: Scaled to 0 replicas - `scaling`: Replicas are being added or removed - `pending`: Waiting for replicas to become ready ## Development ### Running Tests ```bash # Install test dependencies pip install pytest pytest-mock # Run tests pytest tests/ ``` ### Project Structure ``` resource-manager-mcp-server/ ├── src/ │ └── resource_manager_mcp_server/ │ └── __init__.py # Main implementation ├── config/ │ └── example-mcp-deployment.yaml # Example K8s config ├── requirements.txt # Python dependencies └── README.md # This file ``` ## License MIT License ## Contributing Contributions welcome! Please submit pull requests or open issues.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ry-ops/cortex-resource-manager'

If you have feedback or need assistance with the MCP directory API, please join our Discord server