Skip to main content
Glama
server.py18.4 kB
""" Resource Manager MCP Server Exposes resource allocation tools via MCP protocol. """ import asyncio import json from typing import Any, Optional from mcp.server.models import InitializationOptions from mcp.server import NotificationOptions, Server from mcp.server.stdio import stdio_server from mcp.types import ( Tool, TextContent, ImageContent, EmbeddedResource, ) from allocation_manager import AllocationManager from worker_manager import WorkerManager, WorkerManagerError class ResourceManagerServer: """MCP Server for resource management""" def __init__(self): self.server = Server("resource-manager") self.allocation_manager = AllocationManager() self.worker_manager = WorkerManager() self._setup_handlers() # Background task for cleanup self.cleanup_task: Optional[asyncio.Task] = None def _setup_handlers(self): """Setup MCP protocol handlers""" @self.server.list_tools() async def handle_list_tools() -> list[Tool]: """List available resource management tools""" return [ Tool( name="request_resources", description=( "Reserve resources for a job. Starts required MCP servers, " "provisions workers if requested, and tracks allocation with unique ID. " "Returns allocation details including endpoints and worker information." ), inputSchema={ "type": "object", "properties": { "job_id": { "type": "string", "description": "Unique job identifier" }, "mcp_servers": { "type": "array", "items": {"type": "string"}, "description": "List of MCP server names to start (e.g., ['filesystem', 'github', 'database'])" }, "workers": { "type": "integer", "description": "Number of workers to provision (optional)", "minimum": 0 }, "priority": { "type": "string", "enum": ["low", "normal", "high", "critical"], "description": "Job priority level", "default": "normal" }, "ttl_seconds": { "type": "integer", "description": "Time-to-live for allocation in seconds", "default": 3600, "minimum": 60 }, "metadata": { "type": "object", "description": "Additional metadata for the allocation" } }, "required": ["job_id", "mcp_servers"] } ), Tool( name="release_resources", description=( "Release resources after job completion. Scales down MCP servers " "(or marks for idle timeout), queues burst workers for destruction, " "and updates allocation status." ), inputSchema={ "type": "object", "properties": { "allocation_id": { "type": "string", "description": "Allocation identifier to release" } }, "required": ["allocation_id"] } ), Tool( name="get_capacity", description=( "Return current cluster capacity including available CPU, memory, " "workers, running MCP servers, and active allocations." ), inputSchema={ "type": "object", "properties": {} } ), Tool( name="get_allocation", description=( "Get details of a specific allocation including status, resources, " "age, and job information." ), inputSchema={ "type": "object", "properties": { "allocation_id": { "type": "string", "description": "Allocation identifier to query" } }, "required": ["allocation_id"] } ), Tool( name="list_allocations", description=( "List allocations with optional filtering by state or job_id. " "Returns summary information for each allocation." ), inputSchema={ "type": "object", "properties": { "state": { "type": "string", "enum": ["pending", "active", "releasing", "released", "failed"], "description": "Filter by allocation state (optional)" }, "job_id": { "type": "string", "description": "Filter by job ID (optional)" } } } ), Tool( name="cleanup_expired", description=( "Manually trigger cleanup of expired allocations. " "Returns list of cleaned up allocation IDs." ), inputSchema={ "type": "object", "properties": {} } ), Tool( name="list_workers", description=( "List all Kubernetes workers with their status, type, and resources. " "Optionally filter by worker type (permanent or burst)." ), inputSchema={ "type": "object", "properties": { "type_filter": { "type": "string", "enum": ["permanent", "burst"], "description": "Filter workers by type (optional)" } } } ), Tool( name="provision_workers", description=( "Create burst workers by provisioning VMs and joining them to the Kubernetes cluster. " "Burst workers are temporary and will be automatically cleaned up after TTL expires. " "Uses Talos or Proxmox MCP to create VMs." ), inputSchema={ "type": "object", "properties": { "count": { "type": "integer", "minimum": 1, "maximum": 10, "description": "Number of workers to provision (1-10)" }, "ttl": { "type": "integer", "minimum": 1, "maximum": 168, "description": "Time-to-live in hours (1-168, max 1 week)" }, "size": { "type": "string", "enum": ["small", "medium", "large"], "default": "medium", "description": "Worker size (small: 2 CPU/4GB, medium: 4 CPU/8GB, large: 8 CPU/16GB)" } }, "required": ["count", "ttl"] } ), Tool( name="drain_worker", description=( "Gracefully drain a worker node by moving all pods to other nodes and marking it as unschedulable. " "This should be done before destroying a worker to ensure no service disruption." ), inputSchema={ "type": "object", "properties": { "worker_id": { "type": "string", "description": "Worker node name to drain" } }, "required": ["worker_id"] } ), Tool( name="destroy_worker", description=( "Destroy a burst worker by removing it from the cluster and deleting the VM. " "SAFETY: Only burst workers can be destroyed - permanent workers are protected. " "Worker should be drained first unless force=True (not recommended)." ), inputSchema={ "type": "object", "properties": { "worker_id": { "type": "string", "description": "Worker node name to destroy" }, "force": { "type": "boolean", "default": False, "description": "Force destroy without draining first (use with caution)" } }, "required": ["worker_id"] } ), Tool( name="get_worker_details", description=( "Get detailed information about a specific worker including status, " "resources, labels, annotations, and conditions." ), inputSchema={ "type": "object", "properties": { "worker_id": { "type": "string", "description": "Worker node name" } }, "required": ["worker_id"] } ) ] @self.server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool execution""" try: if name == "request_resources": result = self.allocation_manager.request_resources( job_id=arguments["job_id"], mcp_servers=arguments["mcp_servers"], workers=arguments.get("workers"), priority=arguments.get("priority", "normal"), ttl_seconds=arguments.get("ttl_seconds", 3600), metadata=arguments.get("metadata") ) elif name == "release_resources": result = self.allocation_manager.release_resources( allocation_id=arguments["allocation_id"] ) elif name == "get_capacity": result = self.allocation_manager.get_capacity() elif name == "get_allocation": result = self.allocation_manager.get_allocation( allocation_id=arguments["allocation_id"] ) if result is None: result = { "error": f"Allocation {arguments['allocation_id']} not found" } elif name == "list_allocations": result = self.allocation_manager.list_allocations( state=arguments.get("state"), job_id=arguments.get("job_id") ) elif name == "cleanup_expired": expired_ids = self.allocation_manager.cleanup_expired_allocations() result = { "cleaned_up": expired_ids, "count": len(expired_ids) } elif name == "list_workers": type_filter = arguments.get("type_filter") workers = self.worker_manager.list_workers(type_filter=type_filter) result = { "workers": workers, "count": len(workers), "filter": type_filter or "none" } elif name == "provision_workers": count = arguments.get("count") ttl = arguments.get("ttl") size = arguments.get("size", "medium") workers = self.worker_manager.provision_workers( count=count, ttl=ttl, size=size ) result = { "provisioned_workers": workers, "count": len(workers), "size": size, "ttl_hours": ttl } elif name == "drain_worker": worker_id = arguments.get("worker_id") result = self.worker_manager.drain_worker(worker_id) elif name == "destroy_worker": worker_id = arguments.get("worker_id") force = arguments.get("force", False) result = self.worker_manager.destroy_worker( worker_id=worker_id, force=force ) elif name == "get_worker_details": worker_id = arguments.get("worker_id") result = self.worker_manager.get_worker_details(worker_id) else: result = {"error": f"Unknown tool: {name}"} return [ TextContent( type="text", text=json.dumps(result, indent=2) ) ] except WorkerManagerError as e: return [ TextContent( type="text", text=json.dumps({ "error": str(e), "error_type": "WorkerManagerError", "tool": name, "arguments": arguments }, indent=2) ) ] except Exception as e: return [ TextContent( type="text", text=json.dumps({ "error": str(e), "error_type": type(e).__name__, "tool": name, "arguments": arguments }, indent=2) ) ] async def _periodic_cleanup(self): """Periodically clean up expired allocations""" while True: try: await asyncio.sleep(300) # Every 5 minutes expired = self.allocation_manager.cleanup_expired_allocations() if expired: print(f"Cleaned up {len(expired)} expired allocations: {expired}") except asyncio.CancelledError: break except Exception as e: print(f"Error in periodic cleanup: {e}") async def run(self): """Run the MCP server""" # Start background cleanup task self.cleanup_task = asyncio.create_task(self._periodic_cleanup()) try: async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name="resource-manager", server_version="0.1.0", capabilities=self.server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={} ) ) ) finally: # Cancel cleanup task if self.cleanup_task: self.cleanup_task.cancel() try: await self.cleanup_task except asyncio.CancelledError: pass async def main(): """Main entry point""" server = ResourceManagerServer() await server.run() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ry-ops/cortex-resource-manager'

If you have feedback or need assistance with the MCP directory API, please join our Discord server