Skip to main content
Glama

MCP Server Code Execution Mode

by elusznik
mcp_server_code_execution_mode.py48.5 kB
#!/usr/bin/env python3 """MCP Server Code Execution Mode bridge backed by a containerised sandbox.""" from __future__ import annotations import asyncio import json import keyword import logging import os import re import shutil import sys import tempfile import textwrap from contextlib import suppress from dataclasses import dataclass from pathlib import Path from typing import Awaitable, Callable, Dict, List, Optional, Sequence try: # Prefer the official encoder when available from toon_format import encode as _toon_encode except ImportError: # pragma: no cover - fallback for environments without toon _toon_encode = None from mcp.client.session import ClientSession from mcp.client.stdio import StdioServerParameters, stdio_client from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool logger = logging.getLogger("mcp-server-code-execution-mode") BRIDGE_NAME = "mcp-server-code-execution-mode" DEFAULT_IMAGE = os.environ.get("MCP_BRIDGE_IMAGE", "python:3.12-slim") DEFAULT_RUNTIME = os.environ.get("MCP_BRIDGE_RUNTIME") DEFAULT_TIMEOUT = int(os.environ.get("MCP_BRIDGE_TIMEOUT", "30")) MAX_TIMEOUT = int(os.environ.get("MCP_BRIDGE_MAX_TIMEOUT", "120")) DEFAULT_MEMORY = os.environ.get("MCP_BRIDGE_MEMORY", "512m") DEFAULT_PIDS = int(os.environ.get("MCP_BRIDGE_PIDS", "128")) DEFAULT_CPUS = os.environ.get("MCP_BRIDGE_CPUS") CONTAINER_USER = os.environ.get("MCP_BRIDGE_CONTAINER_USER", "65534:65534") DEFAULT_RUNTIME_IDLE_TIMEOUT = int(os.environ.get("MCP_BRIDGE_RUNTIME_IDLE_TIMEOUT", "300")) CONFIG_DIRS = [ Path.home() / ".config" / "mcp" / "servers", Path.home() / "Library" / "Application Support" / "Claude Code" / "mcp" / "servers", Path.home() / "Library" / "Application Support" / "Claude" / "mcp" / "servers", Path.cwd() / "mcp-servers", ] CLAUDE_CONFIG_PATHS = [ Path.home() / ".claude.json", Path.home() / "Library" / "Application Support" / "Claude Code" / "claude_code_config.json", Path.home() / "Library" / "Application Support" / "Claude" / "claude_code_config.json", Path.home() / "Library" / "Application Support" / "Claude" / "claude_desktop_config.json", Path.cwd() / "claude_code_config.json", Path.cwd() / "claude_desktop_config.json", ] class SandboxError(RuntimeError): """Raised when the sandbox cannot execute user code.""" def __init__(self, message: str, *, stdout: str = "", stderr: str = "") -> None: super().__init__(message) self.stdout = stdout self.stderr = stderr class SandboxTimeout(SandboxError): """Raised when user code exceeds the configured timeout.""" @dataclass class SandboxResult: """Execution result captured from the sandbox.""" success: bool exit_code: int stdout: str stderr: str @dataclass class MCPServerInfo: """Configuration for a single MCP server binary.""" name: str command: str args: List[str] env: Dict[str, str] def _split_output_lines(stream: Optional[str]) -> List[str]: """Return a newline-preserving list for stdout/stderr fields.""" if not stream: return [] return stream.splitlines() def _render_toon_block(payload: Dict[str, object]) -> str: """Encode a payload in TOON format, falling back to JSON when unavailable.""" if _toon_encode is not None: try: body = _toon_encode(payload) except Exception: # pragma: no cover - defensive fallback logger.debug("Failed to encode payload as TOON", exc_info=True) else: body = body.rstrip() return f"```toon\n{body}\n```" if body else "```toon\n```" fallback = json.dumps(payload, indent=2, sort_keys=True) return f"```json\n{fallback}\n```" def _build_response_payload( *, status: str, summary: str, exit_code: Optional[int] = None, stdout: Optional[str] = None, stderr: Optional[str] = None, servers: Optional[Sequence[str]] = None, error: Optional[str] = None, timeout_seconds: Optional[int] = None, ) -> Dict[str, object]: """Create a structured payload for TOON encoding.""" payload: Dict[str, object] = { "status": status, "summary": summary, } if exit_code is not None: payload["exitCode"] = exit_code if servers: payload["servers"] = list(servers) payload["stdout"] = _split_output_lines(stdout) payload["stderr"] = _split_output_lines(stderr) if error: payload["error"] = error if timeout_seconds is not None: payload["timeoutSeconds"] = timeout_seconds return payload def _build_tool_response(**kwargs: object) -> List[Dict[str, object]]: """Render a TOON (or JSON fallback) message for tool responses.""" payload = _build_response_payload(**kwargs) message = _render_toon_block(payload) return [{"content": [{"type": "text", "text": message}]}] def _sanitize_identifier(value: str, *, default: str) -> str: """Convert an arbitrary string into a valid Python identifier.""" cleaned = re.sub(r"[^0-9a-zA-Z_]+", "_", value.strip()) cleaned = cleaned.lower() or default if cleaned[0].isdigit(): cleaned = f"_{cleaned}" if keyword.iskeyword(cleaned): cleaned = f"{cleaned}_" return cleaned class PersistentMCPClient: """Maintain a persistent MCP stdio session.""" def __init__(self, server_info: MCPServerInfo) -> None: self.server_info = server_info self._stdio_cm = None self._session: Optional[ClientSession] = None async def start(self) -> None: if self._session: return params = StdioServerParameters( command=self.server_info.command, args=self.server_info.args, env=self.server_info.env or None, ) client_cm = stdio_client(params) self._stdio_cm = client_cm read_stream, write_stream = await client_cm.__aenter__() session = ClientSession(read_stream, write_stream) await session.__aenter__() await session.initialize() self._session = session async def list_tools(self) -> List[Dict[str, object]]: if not self._session: raise SandboxError("MCP client not started") result = await self._session.list_tools() return [tool.model_dump(by_alias=True, exclude_none=True) for tool in result.tools] async def call_tool(self, name: str, arguments: Dict[str, object]) -> Dict[str, object]: if not self._session: raise SandboxError("MCP client not started") call_result = await self._session.call_tool(name=name, arguments=arguments) return call_result.model_dump(by_alias=True, exclude_none=True) async def stop(self) -> None: if self._session: try: await self._session.__aexit__(None, None, None) except* Exception as exc: # pragma: no cover - defensive cleanup logger.debug("MCP session shutdown raised %s", exc, exc_info=True) finally: self._session = None if self._stdio_cm: try: await self._stdio_cm.__aexit__(None, None, None) # type: ignore[union-attr] except* Exception as exc: # pragma: no cover - defensive cleanup logger.debug("MCP stdio shutdown raised %s", exc, exc_info=True) finally: self._stdio_cm = None class RootlessContainerSandbox: """Execute Python code in a locked-down container.""" def __init__( self, *, runtime: Optional[str] = None, image: str = DEFAULT_IMAGE, memory_limit: str = DEFAULT_MEMORY, pids_limit: int = DEFAULT_PIDS, cpu_limit: Optional[str] = DEFAULT_CPUS, runtime_idle_timeout: int = DEFAULT_RUNTIME_IDLE_TIMEOUT, ) -> None: self.runtime = detect_runtime(runtime) self.image = image self.memory_limit = memory_limit self.pids_limit = pids_limit self.cpu_limit = cpu_limit self._runtime_check_lock = asyncio.Lock() self.runtime_idle_timeout = max(0, runtime_idle_timeout) self._shutdown_task: Optional[asyncio.Task[None]] = None self._share_lock = asyncio.Lock() self._shared_paths: set[str] = set() def _base_cmd(self) -> List[str]: cmd: List[str] = [ self.runtime, "run", "--rm", "--interactive", "--network", "none", "--read-only", "--pids-limit", str(self.pids_limit), "--memory", self.memory_limit, "--tmpfs", "/tmp:rw,noexec,nosuid,nodev,size=64m", "--tmpfs", "/workspace:rw,noexec,nosuid,nodev,size=128m", "--workdir", "/workspace", "--env", "HOME=/workspace", "--env", "PYTHONUNBUFFERED=1", "--env", "PYTHONIOENCODING=utf-8", "--env", "PYTHONDONTWRITEBYTECODE=1", "--security-opt", "no-new-privileges", "--cap-drop", "ALL", "--user", CONTAINER_USER, ] if self.cpu_limit: cmd.extend(["--cpus", self.cpu_limit]) return cmd def _render_entrypoint( self, code: str, server_metadata: Sequence[Dict[str, object]], discovered_servers: Sequence[str], ) -> str: metadata_json = json.dumps(server_metadata, separators=(",", ":")) discovered_json = json.dumps(list(discovered_servers), separators=(",", ":")) template = textwrap.dedent( """ import asyncio import inspect import json import sys import traceback import types from contextlib import suppress AVAILABLE_SERVERS = json.loads(__METADATA_JSON__) DISCOVERED_SERVERS = json.loads(__DISCOVERED_JSON__) CODE = __CODE_LITERAL__ _PENDING_RESPONSES = {} _REQUEST_COUNTER = 0 _READER_TASK = None def _send_message(message): sys.__stdout__.write(json.dumps(message, separators=(",", ":")) + "\\n") sys.__stdout__.flush() class _StreamProxy: def __init__(self, kind): self._kind = kind def write(self, data): if not data: return _send_message({"type": self._kind, "data": data}) def flush(self): pass def isatty(self): return False sys.stdout = _StreamProxy("stdout") sys.stderr = _StreamProxy("stderr") async def _stdin_reader(): loop = asyncio.get_running_loop() reader = asyncio.StreamReader() protocol = asyncio.StreamReaderProtocol(reader) transport = None try: transport, _ = await loop.connect_read_pipe(lambda: protocol, sys.stdin) while True: line = await reader.readline() if not line: break try: message = json.loads(line.decode()) except Exception: continue if message.get("type") != "rpc_response": continue request_id = message.get("id") future = _PENDING_RESPONSES.pop(request_id, None) if future and not future.done(): if message.get("success", True): future.set_result(message.get("payload")) else: future.set_exception(RuntimeError(message.get("error", "RPC error"))) finally: if transport is not None: transport.close() for future in list(_PENDING_RESPONSES.values()): if not future.done(): future.set_exception(RuntimeError("RPC channel closed")) async def _ensure_reader(): global _READER_TASK if _READER_TASK is None: _READER_TASK = asyncio.create_task(_stdin_reader()) async def _rpc_call(payload): await _ensure_reader() loop = asyncio.get_running_loop() global _REQUEST_COUNTER _REQUEST_COUNTER += 1 request_id = _REQUEST_COUNTER future = loop.create_future() _PENDING_RESPONSES[request_id] = future _send_message({"type": "rpc_request", "id": request_id, "payload": payload}) return await future def _install_mcp_modules(): mcp_pkg = types.ModuleType("mcp") mcp_pkg.__path__ = [] mcp_pkg.__all__ = ["runtime", "servers"] sys.modules["mcp"] = mcp_pkg runtime_module = types.ModuleType("mcp.runtime") servers_module = types.ModuleType("mcp.servers") servers_module.__path__ = [] sys.modules["mcp.runtime"] = runtime_module sys.modules["mcp.servers"] = servers_module mcp_pkg.runtime = runtime_module mcp_pkg.servers = servers_module class MCPError(RuntimeError): 'Raised when an MCP call fails.' async def call_tool(server, tool, arguments=None): response = await _rpc_call( { "type": "call_tool", "server": server, "tool": tool, "arguments": arguments or {}, } ) if not response.get("success", True): raise MCPError(response.get("error", "MCP request failed")) return response.get("result") async def list_tools(server): response = await _rpc_call( { "type": "list_tools", "server": server, } ) if not response.get("success", True): raise MCPError(response.get("error", "MCP request failed")) return response.get("tools", []) async def list_servers(): response = await _rpc_call({"type": "list_servers"}) if not response.get("success", True): raise MCPError(response.get("error", "MCP request failed")) return tuple(response.get("servers", ())) def discovered_servers(): return tuple(DISCOVERED_SERVERS) def describe_server(name): for server in AVAILABLE_SERVERS: if server.get("name") == name: return server raise MCPError(f"Server {name!r} is not loaded") def list_loaded_server_metadata(): return tuple(AVAILABLE_SERVERS) runtime_module.MCPError = MCPError runtime_module.call_tool = call_tool runtime_module.list_tools = list_tools runtime_module.list_servers = list_servers runtime_module.discovered_servers = discovered_servers runtime_module.describe_server = describe_server runtime_module.list_loaded_server_metadata = list_loaded_server_metadata runtime_module.__all__ = [ "MCPError", "call_tool", "list_tools", "list_servers", "discovered_servers", "describe_server", "list_loaded_server_metadata", ] servers_module.__all__ = [] def _make_tool_callable(server_name, tool_name): async def _invoke(**kwargs): return await call_tool(server_name, tool_name, kwargs) return _invoke for server in AVAILABLE_SERVERS: alias = server["alias"] module_name = f"mcp.servers.{alias}" server_module = types.ModuleType(module_name) server_module.__doc__ = f"MCP server '{server['name']}' wrappers" server_module.__all__ = [] tool_map = {} for tool in server.get("tools", []): tool_alias = tool["alias"] summary = (tool.get("description") or "").strip() or f"MCP tool {tool['name']} from {server['name']}" func = _make_tool_callable(server["name"], tool["name"]) func.__name__ = tool_alias func.__doc__ = summary setattr(server_module, tool_alias, func) server_module.__all__.append(tool_alias) tool_map[tool_alias] = tool server_module.TOOLS = server.get("tools", []) server_module.TOOL_MAP = tool_map setattr(servers_module, alias, server_module) sys.modules[module_name] = server_module servers_module.__all__.append(alias) return runtime_module runtime_module = _install_mcp_modules() class _MCPProxy: def __init__(self, server_info): self._server_name = server_info["name"] self._tools = {tool["alias"]: tool for tool in server_info.get("tools", [])} async def list_tools(self): response = await _rpc_call( { "type": "list_tools", "server": self._server_name, } ) if not response.get("success", True): raise RuntimeError(response.get("error", "MCP request failed")) return response.get("tools", []) def __getattr__(self, tool_alias): tool = self._tools.get(tool_alias) target = tool.get("name") if tool else tool_alias summary = (tool.get("description") if tool else "") or "" async def _invoke(_target=target, **kwargs): response = await _rpc_call( { "type": "call_tool", "server": self._server_name, "tool": _target, "arguments": kwargs, } ) if not response.get("success", True): raise RuntimeError(response.get("error", "MCP call failed")) return response.get("result") if summary: _invoke.__doc__ = summary _invoke.__name__ = tool_alias return _invoke _SANDBOX_GLOBALS = globals() LOADED_MCP_SERVERS = tuple(server["name"] for server in AVAILABLE_SERVERS) mcp_servers = {} for server in AVAILABLE_SERVERS: proxy = _MCPProxy(server) mcp_servers[server["name"]] = proxy _SANDBOX_GLOBALS[f"mcp_{server['alias']}"] = proxy _SANDBOX_GLOBALS.setdefault("mcp_servers", {}).update(mcp_servers) alias_map = {server["name"]: server["alias"] for server in AVAILABLE_SERVERS} async def _execute(): await _ensure_reader() namespace = {"__name__": "__sandbox__"} namespace["mcp_servers"] = mcp_servers namespace["LOADED_MCP_SERVERS"] = LOADED_MCP_SERVERS for server_name, proxy in mcp_servers.items(): namespace[f"mcp_{alias_map[server_name]}"] = proxy flags = getattr(__import__("ast"), "PyCF_ALLOW_TOP_LEVEL_AWAIT", 0) compiled = compile(CODE, "<sandbox>", "exec", flags=flags) result = eval(compiled, namespace, namespace) if inspect.isawaitable(result): await result if _READER_TASK: _READER_TASK.cancel() with suppress(asyncio.CancelledError): await _READER_TASK try: asyncio.run(_execute()) except SystemExit: raise except Exception: traceback.print_exc() sys.exit(1) """ ).lstrip() return ( template.replace("__METADATA_JSON__", repr(metadata_json)) .replace("__DISCOVERED_JSON__", repr(discovered_json)) .replace("__CODE_LITERAL__", repr(code)) ) async def _run_runtime_command(self, *args: str) -> tuple[int, str, str]: process = await asyncio.create_subprocess_exec( self.runtime, *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout_bytes, stderr_bytes = await process.communicate() stdout_text = stdout_bytes.decode(errors="replace") stderr_text = stderr_bytes.decode(errors="replace") return process.returncode, stdout_text, stderr_text async def _stop_runtime(self) -> None: runtime_name = os.path.basename(self.runtime) if "podman" not in runtime_name: return code, stdout_text, stderr_text = await self._run_runtime_command("machine", "stop") if code != 0: combined = f"{stdout_text}\n{stderr_text}".lower() if "already stopped" in combined or "is not running" in combined: return logger.debug("Failed to stop podman machine: %s", stderr_text.strip()) async def _cancel_runtime_shutdown_timer(self) -> None: if not self._shutdown_task: return task = self._shutdown_task self._shutdown_task = None task.cancel() with suppress(asyncio.CancelledError): await task async def _schedule_runtime_shutdown(self) -> None: if self.runtime_idle_timeout <= 0: return await self._cancel_runtime_shutdown_timer() async def _delayed_shutdown() -> None: try: await asyncio.sleep(self.runtime_idle_timeout) await self._stop_runtime() except asyncio.CancelledError: raise except Exception: # pragma: no cover - diagnostic fallback logger.debug("Runtime shutdown task failed", exc_info=True) self._shutdown_task = asyncio.create_task(_delayed_shutdown()) async def _ensure_runtime_ready(self) -> None: async with self._runtime_check_lock: await self._cancel_runtime_shutdown_timer() runtime_name = os.path.basename(self.runtime) if "podman" not in runtime_name: return for _ in range(3): code, stdout_text, stderr_text = await self._run_runtime_command( "info", "--format", "{{json .}}", ) if code == 0: return combined = f"{stdout_text}\n{stderr_text}".lower() needs_machine = any( phrase in combined for phrase in ( "cannot connect to podman", "podman machine", "run the podman machine", "socket: connect", ) ) if not needs_machine: raise SandboxError( "Container runtime is unavailable", stdout=stdout_text, stderr=stderr_text, ) start_code, start_stdout, start_stderr = await self._run_runtime_command("machine", "start") if start_code == 0: continue start_combined = f"{start_stdout}\n{start_stderr}".lower() if "does not exist" in start_combined or "no such machine" in start_combined: init_code, init_stdout, init_stderr = await self._run_runtime_command("machine", "init") if init_code != 0: raise SandboxError( "Failed to initialize Podman machine", stdout=init_stdout, stderr=init_stderr, ) # After init, loop will retry info/start sequence continue raise SandboxError( "Failed to start Podman machine", stdout=start_stdout, stderr=start_stderr, ) raise SandboxError( "Unable to prepare Podman runtime", stdout="", stderr="Repeated podman machine start attempts failed", ) async def execute( self, code: str, *, timeout: int = DEFAULT_TIMEOUT, servers_metadata: Sequence[Dict[str, object]] = (), discovered_servers: Sequence[str] = (), container_env: Optional[Dict[str, str]] = None, volume_mounts: Optional[Sequence[str]] = None, host_dir: Optional[Path] = None, rpc_handler: Optional[Callable[[Dict[str, object]], Awaitable[Dict[str, object]]]] = None, ) -> SandboxResult: await self._ensure_runtime_ready() if host_dir is None: raise SandboxError("Sandbox host directory is not available") entrypoint_path = host_dir / "entrypoint.py" entrypoint_path.write_text(self._render_entrypoint(code, servers_metadata, discovered_servers)) entrypoint_target = f"/ipc/{entrypoint_path.name}" stdout_chunks: List[str] = [] stderr_chunks: List[str] = [] cmd = self._base_cmd() if volume_mounts: for mount in volume_mounts: cmd.extend(["--volume", mount]) if container_env: for key, value in container_env.items(): cmd.extend(["--env", f"{key}={value}"]) cmd.extend([self.image, "python3", "-u", entrypoint_target]) process = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) async def _handle_stdout() -> None: if not process.stdout: return while True: line = await process.stdout.readline() if not line: break try: message = json.loads(line.decode()) except Exception: stderr_chunks.append(line.decode(errors="replace")) continue msg_type = message.get("type") if msg_type == "stdout": stdout_chunks.append(message.get("data", "")) elif msg_type == "stderr": stderr_chunks.append(message.get("data", "")) elif msg_type == "rpc_request": if process.stdin is None: continue if rpc_handler is None: response: Dict[str, object] = {"success": False, "error": "RPC handler unavailable"} else: try: payload = message.get("payload", {}) response = await rpc_handler(payload if isinstance(payload, dict) else {}) except Exception as exc: logger.debug("RPC handler failed", exc_info=True) response = {"success": False, "error": str(exc)} reply: Dict[str, object] = { "type": "rpc_response", "id": message.get("id"), "success": response.get("success", True), "payload": response, } if not reply["success"]: reply["error"] = response.get("error", "RPC error") try: data = json.dumps(reply, separators=(",", ":")).encode("utf-8") + b"\n" process.stdin.write(data) await process.stdin.drain() except Exception: stderr_chunks.append("Failed to deliver RPC response\n") break else: stderr_chunks.append(json.dumps(message, separators=(",", ":"))) async def _read_stderr() -> None: if not process.stderr: return while True: chunk = await process.stderr.read(4096) if not chunk: break stderr_chunks.append(chunk.decode(errors="replace")) stdout_task = asyncio.create_task(_handle_stdout()) stderr_task = asyncio.create_task(_read_stderr()) try: await asyncio.wait_for(process.wait(), timeout=timeout) except asyncio.TimeoutExpired as exc: process.kill() await process.wait() stdout_task.cancel() stderr_task.cancel() with suppress(asyncio.CancelledError): await stdout_task with suppress(asyncio.CancelledError): await stderr_task raise SandboxTimeout( f"Execution timed out after {timeout}s", stdout="".join(stdout_chunks), stderr="".join(stderr_chunks), ) from exc finally: if process.stdin: process.stdin.close() with suppress(Exception): await process.stdin.wait_closed() await stdout_task await stderr_task stdout_text = "".join(stdout_chunks) stderr_text = "".join(stderr_chunks) try: return SandboxResult(process.returncode == 0, process.returncode, stdout_text, stderr_text) finally: await self._schedule_runtime_shutdown() async def ensure_shared_directory(self, path: Path) -> None: resolved = path.expanduser().resolve() resolved.mkdir(parents=True, exist_ok=True) path_str = str(resolved) if path_str in self._shared_paths: return async with self._share_lock: if path_str in self._shared_paths: return shared = True runtime_name = os.path.basename(self.runtime) if "podman" in runtime_name: shared = await self._ensure_podman_volume_shared(resolved) if shared: self._shared_paths.add(path_str) async def _ensure_podman_volume_shared(self, path: Path) -> bool: share_spec = f"{path}:{path}" try: process = await asyncio.create_subprocess_exec( self.runtime, "machine", "set", "--rootful", "--now", "--volume", share_spec, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) except FileNotFoundError: logger.debug("Podman binary not found while ensuring volume share for %s", path) return False stdout_bytes, stderr_bytes = await process.communicate() stderr_text = stderr_bytes.decode(errors="replace") if process.returncode == 0: return True lower = stderr_text.lower() if "already exists" in lower or "would overwrite" in lower: return True logger.debug( "Failed to ensure podman shared volume for %s (exit %s): %s", path, process.returncode, stderr_text.strip() or stdout_bytes.decode(errors="replace").strip(), ) return False def detect_runtime(preferred: Optional[str] = None) -> str: """Return the first available container runtime.""" candidates: List[Optional[str]] = [] if preferred: candidates.append(preferred) if DEFAULT_RUNTIME and DEFAULT_RUNTIME not in candidates: candidates.append(DEFAULT_RUNTIME) candidates.extend(["podman", "docker"]) for candidate in candidates: if candidate and shutil.which(candidate): return candidate raise SandboxError( "No container runtime found. Install podman or rootless docker and set " "MCP_BRIDGE_RUNTIME if multiple runtimes are available." ) class SandboxInvocation: """Context manager that prepares IPC resources for a sandbox invocation.""" def __init__(self, bridge: "MCPBridge", active_servers: Sequence[str]) -> None: self.bridge = bridge self.active_servers = list(dict.fromkeys(active_servers)) self._temp_dir: Optional[tempfile.TemporaryDirectory[str]] = None self.host_dir: Optional[Path] = None self.container_env: Dict[str, str] = {} self.volume_mounts: List[str] = [] self.server_metadata: List[Dict[str, object]] = [] self.allowed_servers: set[str] = set() self.discovered_servers: List[str] = [] async def __aenter__(self) -> "SandboxInvocation": self.server_metadata = [] for server_name in self.active_servers: alias = self.bridge._alias_for(server_name) client = self.bridge.clients.get(server_name) if not client: raise SandboxError(f"MCP server {server_name} is not loaded") tool_specs = await client.list_tools() alias_counts: Dict[str, int] = {} tools: List[Dict[str, object]] = [] for spec in tool_specs: raw_name = spec.get("name") or "tool" base_alias = _sanitize_identifier(str(raw_name), default="tool") alias_counts[base_alias] = alias_counts.get(base_alias, 0) + 1 count = alias_counts[base_alias] tool_alias = base_alias if count == 1 else f"{base_alias}_{count}" input_schema = spec.get("input_schema") or spec.get("inputSchema") description = spec.get("description") or "" tools.append( { "name": raw_name, "alias": tool_alias, "description": description, "input_schema": input_schema, } ) self.server_metadata.append( { "name": server_name, "alias": alias, "tools": tools, } ) self.allowed_servers = {meta["name"] for meta in self.server_metadata} self.discovered_servers = sorted(self.bridge.servers.keys()) state_dir_env = os.environ.get("MCP_BRIDGE_STATE_DIR") if state_dir_env: base_dir = Path(state_dir_env).expanduser() else: base_dir = Path.cwd() / ".mcp-bridge" base_dir = base_dir.resolve() base_dir.mkdir(parents=True, exist_ok=True) ensure_share = getattr(self.bridge.sandbox, "ensure_shared_directory", None) if ensure_share: await ensure_share(base_dir) self._temp_dir = tempfile.TemporaryDirectory(prefix="mcp-bridge-ipc-", dir=str(base_dir)) host_dir = Path(self._temp_dir.name) os.chmod(host_dir, 0o755) self.host_dir = host_dir self.volume_mounts.append(f"{host_dir}:/ipc:rw") self.container_env["MCP_AVAILABLE_SERVERS"] = json.dumps( self.server_metadata, separators=(",", ":"), ) self.container_env["MCP_DISCOVERED_SERVERS"] = json.dumps( self.discovered_servers, separators=(",", ":"), ) return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: if self._temp_dir: self._temp_dir.cleanup() async def handle_rpc(self, request: Dict[str, object]) -> Dict[str, object]: req_type = request.get("type") if req_type == "list_servers": return { "success": True, "servers": sorted(self.allowed_servers), } if req_type not in {"list_tools", "call_tool"}: return { "success": False, "error": f"Unknown RPC type: {req_type}", } server = request.get("server") if not isinstance(server, str) or server not in self.allowed_servers: return { "success": False, "error": f"Server {server!r} is not available", } client = self.bridge.clients.get(server) if not client: return { "success": False, "error": f"Server {server} is not loaded", } try: if req_type == "list_tools": tools = await client.list_tools() return {"success": True, "tools": tools} tool_name = request.get("tool") arguments = request.get("arguments", {}) if not isinstance(tool_name, str): return {"success": False, "error": "Missing tool name"} if not isinstance(arguments, dict): return {"success": False, "error": "Arguments must be an object"} result = await client.call_tool(tool_name, arguments) return {"success": True, "result": result} except Exception as exc: # pragma: no cover logger.debug("MCP proxy call failed", exc_info=True) return {"success": False, "error": str(exc)} class MCPBridge: """Expose the secure sandbox as an MCP tool with MCP proxying.""" def __init__(self, sandbox: Optional[RootlessContainerSandbox] = None) -> None: self.sandbox = sandbox or RootlessContainerSandbox() self.servers: Dict[str, MCPServerInfo] = {} self.clients: Dict[str, PersistentMCPClient] = {} self.loaded_servers: set[str] = set() self._aliases: Dict[str, str] = {} self._discovered = False async def discover_servers(self) -> None: if self._discovered: return self._discovered = True for config_path in CLAUDE_CONFIG_PATHS: if not config_path.exists(): continue try: with config_path.open() as fh: config = json.load(fh) for name, value in config.get("mcpServers", {}).items(): info = self._parse_server_config(name, value) if info: self.servers[name] = info logger.info("Found MCP server %s in %s", name, config_path) except Exception as exc: # pragma: no cover logger.warning("Failed to read %s: %s", config_path, exc) for config_dir in CONFIG_DIRS: if not config_dir.exists(): continue for config_file in config_dir.glob("*.json"): try: with config_file.open() as fh: config = json.load(fh) for name, value in config.get("mcpServers", {}).items(): if name in self.servers: continue info = self._parse_server_config(name, value) if info: self.servers[name] = info logger.info("Found MCP server %s in %s", name, config_file) except Exception as exc: # pragma: no cover logger.warning("Failed to read %s: %s", config_file, exc) logger.info("Discovered %d MCP servers", len(self.servers)) def _parse_server_config(self, name: str, raw: Dict[str, object]) -> Optional[MCPServerInfo]: command = raw.get("command") if not isinstance(command, str): return None args = raw.get("args", []) if not isinstance(args, list): args = [] env = raw.get("env", {}) if not isinstance(env, dict): env = {} str_env = {str(k): str(v) for k, v in env.items()} str_args = [str(arg) for arg in args] return MCPServerInfo(name=name, command=command, args=str_args, env=str_env) async def load_server(self, server_name: str) -> None: if server_name in self.loaded_servers: return info = self.servers.get(server_name) if not info: raise SandboxError(f"Unknown MCP server: {server_name}") client = PersistentMCPClient(info) await client.start() self.clients[server_name] = client self.loaded_servers.add(server_name) logger.info("Loaded MCP server %s", server_name) def _alias_for(self, name: str) -> str: if name in self._aliases: return self._aliases[name] base = re.sub(r"[^a-z0-9_]+", "_", name.lower()) or "server" if base[0].isdigit(): base = f"_{base}" alias = base suffix = 1 used = set(self._aliases.values()) while alias in used: suffix += 1 alias = f"{base}_{suffix}" self._aliases[name] = alias return alias async def execute_code( self, code: str, servers: Optional[Sequence[str]] = None, timeout: int = DEFAULT_TIMEOUT, ) -> SandboxResult: await self.discover_servers() request_timeout = max(1, min(MAX_TIMEOUT, timeout)) requested_servers = list(dict.fromkeys(servers or [])) for server_name in requested_servers: await self.load_server(server_name) async with SandboxInvocation(self, requested_servers) as invocation: result = await self.sandbox.execute( code, timeout=request_timeout, servers_metadata=invocation.server_metadata, discovered_servers=invocation.discovered_servers, container_env=invocation.container_env, volume_mounts=invocation.volume_mounts, host_dir=invocation.host_dir, rpc_handler=invocation.handle_rpc, ) if not result.success: raise SandboxError( f"Sandbox exited with code {result.exit_code}", stdout=result.stdout, stderr=result.stderr, ) return result bridge = MCPBridge() app = Server(BRIDGE_NAME) @app.list_tools() async def list_tools() -> List[Tool]: return [ Tool( name="run_python", description=( "Execute Python code inside a rootless container sandbox. " "Use the optional 'servers' array to load MCP servers for this execution." ), inputSchema={ "type": "object", "properties": { "code": { "type": "string", "description": "Python source code to execute", }, "servers": { "type": "array", "items": {"type": "string"}, "description": ( "Optional list of MCP servers to make available as mcp_<name> proxies" ), }, "timeout": { "type": "integer", "minimum": 1, "maximum": MAX_TIMEOUT, "default": DEFAULT_TIMEOUT, "description": "Execution timeout in seconds", }, }, "required": ["code"], }, ) ] @app.call_tool() async def call_tool(name: str, arguments: Dict[str, object]) -> List[Dict[str, object]]: if name != "run_python": return _build_tool_response( status="error", summary=f"Unknown tool: {name}", error=f"Unknown tool: {name}", ) code = arguments.get("code") if not isinstance(code, str) or not code.strip(): return _build_tool_response( status="validation_error", summary="Missing 'code' argument", error="Missing 'code' argument", ) servers = arguments.get("servers", []) if not isinstance(servers, list): return _build_tool_response( status="validation_error", summary="'servers' must be a list", error="'servers' must be a list", ) server_list = [str(server) for server in servers] timeout_value = arguments.get("timeout", DEFAULT_TIMEOUT) if not isinstance(timeout_value, int): return _build_tool_response( status="validation_error", summary="'timeout' must be an integer", error="'timeout' must be an integer", ) timeout_value = max(1, min(MAX_TIMEOUT, timeout_value)) try: result = await bridge.execute_code(code, server_list, timeout_value) summary = "Success" if not result.stdout and not result.stderr: summary = "Success (no output)" return _build_tool_response( status="success", summary=summary, exit_code=result.exit_code, stdout=result.stdout, stderr=result.stderr, servers=server_list, ) except SandboxTimeout as exc: summary = f"Timeout: execution exceeded {timeout_value}s" return _build_tool_response( status="timeout", summary=summary, stdout=exc.stdout, stderr=exc.stderr, servers=server_list, error=str(exc), timeout_seconds=timeout_value, ) except SandboxError as exc: summary = f"Sandbox error: {exc}" return _build_tool_response( status="error", summary=summary, stdout=exc.stdout, stderr=exc.stderr, servers=server_list, error=str(exc), ) except Exception as exc: # pragma: no cover logger.error("Unexpected failure", exc_info=True) return _build_tool_response( status="error", summary="Unexpected failure", error=str(exc), ) async def main() -> None: logging.basicConfig(level=os.environ.get("MCP_BRIDGE_LOG_LEVEL", "INFO")) async with stdio_server(app) as (read_stream, write_stream): await app.run(read_stream, write_stream, app.create_initialization_options()) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: sys.exit(0)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/elusznik/mcp-server-code-execution-mode'

If you have feedback or need assistance with the MCP directory API, please join our Discord server