Skip to main content
Glama
dynamic_server_loader.py17 kB
""" Dynamic MCP Server Loader Load and call MCP servers dynamically at runtime without requiring Claude Code restart. Servers can be installed, loaded, and called on-the-fly. """ import json import subprocess import sys import threading from pathlib import Path from typing import Any, Dict, List, Optional import time class DynamicServerLoader: """Load and manage MCP servers dynamically at runtime.""" def __init__(self, config_file: Optional[Path] = None): """ Initialize the dynamic server loader. Args: config_file: Path to .mcp.json (default: ./.mcp.json) """ self.config_file = config_file or Path.cwd() / ".mcp.json" self.server_processes = {} # server_name -> subprocess self.server_tools = {} # server_name -> {tool_name: tool_schema} self.lock = threading.RLock() # Reentrant lock to allow nested acquisition def _load_config(self) -> Dict: """Load .mcp.json configuration.""" if self.config_file.exists(): with open(self.config_file, 'r') as f: return json.load(f) return {"mcpServers": {}} def get_available_servers(self) -> List[str]: """Get list of all configured server names.""" config = self._load_config() return list(config.get("mcpServers", {}).keys()) def is_server_loaded(self, server_name: str) -> bool: """Check if a server is currently loaded.""" with self.lock: return server_name in self.server_processes def load_server(self, server_name: str) -> Dict: """ Load an MCP server dynamically and discover its tools. Args: server_name: Name of the server to load Returns: Dictionary with server info and available tools """ with self.lock: # Check if already loaded if server_name in self.server_processes: return { "success": True, "message": f"Server '{server_name}' already loaded", "tools": list(self.server_tools.get(server_name, {}).keys()) } # Load config config = self._load_config() server_config = config.get("mcpServers", {}).get(server_name) if not server_config: return { "success": False, "error": f"Server '{server_name}' not found in .mcp.json" } try: # Start the server process command = server_config.get("command") args = server_config.get("args", []) env = {**server_config.get("env", {})} # Start server in background # Use line buffering and redirect stderr to avoid blocking process = subprocess.Popen( [command] + args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, # Discard stderr to avoid blocking env={**subprocess.os.environ, **env}, text=True, bufsize=1 # Line buffered ) # Give server time to start time.sleep(0.5) # Check if process started successfully if process.poll() is not None: stderr = process.stderr.read() return { "success": False, "error": f"Server failed to start: {stderr}" } # Discover tools using MCP protocol tools = self._discover_tools(process) # Store process and tools self.server_processes[server_name] = process self.server_tools[server_name] = tools return { "success": True, "message": f"Server '{server_name}' loaded successfully", "server_name": server_name, "tools": list(tools.keys()), "tool_count": len(tools) } except Exception as e: return { "success": False, "error": f"Failed to load server: {str(e)}" } def _initialize_server(self, process: subprocess.Popen) -> bool: """ Initialize MCP server with proper handshake. Args: process: Server subprocess Returns: True if initialization successful, False otherwise """ try: # Step 1: Send initialize request init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2025-03-26", "capabilities": { "tools": {}, "resources": {}, "prompts": {} }, "clientInfo": { "name": "multiverse-proxy", "version": "1.0.0" } } } process.stdin.write(json.dumps(init_request) + "\n") process.stdin.flush() # Step 2: Read initialize response response_line = process.stdout.readline() response = json.loads(response_line) if "error" in response: print(f"Initialization error: {response['error']}") return False # Step 3: Send initialized notification # NOTE: Notifications do NOT get responses - don't try to read one! initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized" } process.stdin.write(json.dumps(initialized_notification) + "\n") process.stdin.flush() # Small delay to let server process the notification time.sleep(0.1) return True except Exception as e: print(f"Failed to initialize server: {e}") return False def _discover_tools(self, process: subprocess.Popen) -> Dict[str, Dict]: """ Discover available tools from an MCP server using the MCP protocol. Args: process: Server subprocess Returns: Dictionary mapping tool names to their schemas """ try: # Initialize server first if not self._initialize_server(process): return {} # Now send list_tools request request = { "jsonrpc": "2.0", "method": "tools/list", "id": 2 } process.stdin.write(json.dumps(request) + "\n") process.stdin.flush() # Read response response_line = process.stdout.readline() response = json.loads(response_line) if "result" in response and "tools" in response["result"]: tools = {} for tool in response["result"]["tools"]: tool_name = tool.get("name") tools[tool_name] = { "description": tool.get("description", ""), "inputSchema": tool.get("inputSchema", {}) } return tools return {} except Exception as e: print(f"Warning: Could not discover tools: {e}") return {} def call_tool( self, server_name: str, tool_name: str, parameters: Optional[Dict[str, Any]] = None ) -> Dict: """ Call a tool on a dynamically loaded MCP server. Args: server_name: Name of the server tool_name: Name of the tool to call parameters: Tool parameters Returns: Tool execution result """ with self.lock: # Load server if not already loaded if server_name not in self.server_processes: load_result = self.load_server(server_name) if not load_result.get("success"): return load_result # Get process process = self.server_processes.get(server_name) if not process: return { "success": False, "error": f"Server '{server_name}' not loaded" } # Check if process is still alive if process.poll() is not None: # Process died, remove from cache del self.server_processes[server_name] if server_name in self.server_tools: del self.server_tools[server_name] return { "success": False, "error": f"Server '{server_name}' process terminated" } try: # Call tool using MCP JSON-RPC protocol # Use incrementing ID to avoid conflicts call_id = int(time.time() * 1000) # millisecond timestamp as ID request = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": tool_name, "arguments": parameters or {} }, "id": call_id } process.stdin.write(json.dumps(request) + "\n") process.stdin.flush() # Read response with timeout using select import select ready, _, _ = select.select([process.stdout], [], [], 5.0) if not ready: return { "success": False, "error": "Tool call timed out after 5 seconds" } response_line = process.stdout.readline() if not response_line: return { "success": False, "error": "No response from server" } response = json.loads(response_line) if "result" in response: # MCP tools return: {"result": {"content": [{"type": "text", "text": "..."}]}} # Extract the actual content for easier use result = response["result"] # If result has content array, extract text from first content item if isinstance(result, dict) and "content" in result: content_items = result.get("content", []) if content_items and len(content_items) > 0: first_item = content_items[0] if isinstance(first_item, dict) and "text" in first_item: # Try to parse as JSON if it looks like JSON text = first_item["text"] try: parsed = json.loads(text) return parsed # Return the parsed JSON directly except: return { "success": True, "result": text # Return as plain text } # Fallback: return raw result return { "success": True, "result": result } elif "error" in response: return { "success": False, "error": response["error"].get("message", "Unknown error") } else: return { "success": False, "error": "Invalid response from server" } except Exception as e: return { "success": False, "error": f"Failed to call tool: {str(e)}" } def unload_server(self, server_name: str) -> Dict: """ Unload a dynamically loaded server. Args: server_name: Name of the server to unload Returns: Unload status """ with self.lock: if server_name not in self.server_processes: return { "success": False, "error": f"Server '{server_name}' not loaded" } try: process = self.server_processes[server_name] process.terminate() process.wait(timeout=5) del self.server_processes[server_name] if server_name in self.server_tools: del self.server_tools[server_name] return { "success": True, "message": f"Server '{server_name}' unloaded successfully" } except Exception as e: return { "success": False, "error": f"Failed to unload server: {str(e)}" } def reload_server(self, server_name: str) -> Dict: """ Reload a server (unload then load). Args: server_name: Name of the server to reload Returns: Reload status """ # Unload if loaded if self.is_server_loaded(server_name): unload_result = self.unload_server(server_name) if not unload_result.get("success"): return unload_result # Load return self.load_server(server_name) def get_loaded_servers(self) -> Dict: """ Get information about all loaded servers. Returns: Dictionary with loaded server information """ with self.lock: servers = [] for server_name, process in self.server_processes.items(): tools = self.server_tools.get(server_name, {}) servers.append({ "name": server_name, "status": "running" if process.poll() is None else "terminated", "tools": list(tools.keys()), "tool_count": len(tools) }) return { "success": True, "servers": servers, "count": len(servers) } def refresh_tools(self, server_name: str) -> Dict: """ Refresh the tool list for a loaded server. Args: server_name: Name of the server Returns: Refreshed tool list """ with self.lock: if server_name not in self.server_processes: return { "success": False, "error": f"Server '{server_name}' not loaded" } process = self.server_processes[server_name] if process.poll() is not None: return { "success": False, "error": f"Server '{server_name}' process terminated" } try: tools = self._discover_tools(process) self.server_tools[server_name] = tools return { "success": True, "server_name": server_name, "tools": list(tools.keys()), "tool_count": len(tools) } except Exception as e: return { "success": False, "error": f"Failed to refresh tools: {str(e)}" } def cleanup(self): """Cleanup all loaded servers.""" with self.lock: for server_name in list(self.server_processes.keys()): try: process = self.server_processes[server_name] process.terminate() process.wait(timeout=5) except: pass self.server_processes.clear() self.server_tools.clear() def __del__(self): """Cleanup on destruction.""" self.cleanup() # Global instance _loader = None def get_loader() -> DynamicServerLoader: """Get the global dynamic server loader instance.""" global _loader if _loader is None: _loader = DynamicServerLoader() return _loader

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/lizTheDeveloper/mcp_proxy'

If you have feedback or need assistance with the MCP directory API, please join our Discord server