Skip to main content
Glama
by elusznik
mcp_server_code_execution_mode.py98.8 kB
#!/usr/bin/env python3 """MCP Server Code Execution Mode bridge backed by a containerised sandbox.""" from __future__ import annotations import asyncio import copy import json import keyword import logging import os import re import shlex import shutil import sys try: import tomllib except ImportError: tomllib = None import inspect import io import tempfile import textwrap from asyncio import subprocess as aio_subprocess from contextlib import suppress from dataclasses import dataclass from pathlib import Path from typing import ( Any, Awaitable, Callable, Dict, List, Literal, NamedTuple, Optional, Protocol, Sequence, Tuple, Union, cast, ) import anyio from packaging.version import parse as _parse_version _toon_encode: Optional[Callable[..., str]] = None try: # Prefer the official encoder when available import toon_format as _toon_format _toon_encode = _toon_format.encode except ImportError: # pragma: no cover - fallback for environments without toon _toon_encode = None def _check_pydantic_compatibility() -> None: """Check and warn/abort early for Pydantic/typing incompatibilities. Some older Pydantic versions (or environments that shadow the stdlib ``typing`` module with a PyPI package) can cause runtime failures when used with CPython 3.14. We try a minimal import and version check and fail with an actionable message to help users upgrade. """ try: import importlib typing_mod = importlib.import_module("typing") typing_file = getattr(typing_mod, "__file__", "") or "(built-in)" except Exception: # pragma: no cover - defensive typing_file = "(unknown)" try: import pydantic pyd_version = getattr(pydantic, "__version__", "0") except Exception as exc: # pragma: no cover - this covers TypeError mishaps err_text = str(exc) if "prefer_fwd_module" in err_text or "_eval_type" in err_text: raise RuntimeError( "Pydantic appears incompatible with the current Python/typing\n" "configuration: \n\n" " - This usually happens if an old version of pydantic is installed\n" " or if a PyPI-provided 'typing' package is shadowing the standard\n" " library typing module.\n\n" "Recommended actions:\n" " 1. Upgrade pydantic (e.g. `pip install -U pydantic`).\n" " 2. If you have a 'typing' package installed from PyPI, uninstall it:\n" " `pip uninstall typing` or `pipx uninstall typing`.\n" " 3. Recreate the virtual environment and re-run `uv sync`.\n\n" "For more info, check the platform and installed packages.\n" f"typing module path: {typing_file}\n" f"pydantic import error: {err_text}\n" ) from exc raise try: if _parse_version(pyd_version) < _parse_version( "2.12.0" ) and sys.version_info >= (3, 14): raise RuntimeError( f"Detected pydantic {pyd_version} in a Python 3.14 environment -\n" "please upgrade pydantic to a more recent 2.x release (e.g., `pip install -U pydantic`)." ) except Exception: # pragma: no cover - diagnostic fallback pass _check_pydantic_compatibility() from mcp.client.session import ( # noqa: E402 (import intentionally delayed for compatibility checks) ClientSession, ) from mcp.client.stdio import StdioServerParameters, stdio_client # noqa: E402 from mcp.server import Server # noqa: E402 from mcp.server.stdio import stdio_server # noqa: E402 from mcp.shared.exceptions import McpError # noqa: E402 from mcp.types import ( # noqa: E402 INVALID_PARAMS, CallToolResult, ErrorData, Resource, TextContent, Tool, ) logger = logging.getLogger("mcp-server-code-execution-mode") BRIDGE_NAME = "mcp-server-code-execution-mode" DEFAULT_IMAGE = os.environ.get("MCP_BRIDGE_IMAGE", "python:3.14-slim") DEFAULT_RUNTIME = os.environ.get("MCP_BRIDGE_RUNTIME") DEFAULT_TIMEOUT = int(os.environ.get("MCP_BRIDGE_TIMEOUT", "30")) MAX_TIMEOUT = int(os.environ.get("MCP_BRIDGE_MAX_TIMEOUT", "120")) DEFAULT_MEMORY = os.environ.get("MCP_BRIDGE_MEMORY", "512m") DEFAULT_PIDS = int(os.environ.get("MCP_BRIDGE_PIDS", "128")) DEFAULT_CPUS = os.environ.get("MCP_BRIDGE_CPUS") CONTAINER_USER = os.environ.get("MCP_BRIDGE_CONTAINER_USER", "65534:65534") DEFAULT_RUNTIME_IDLE_TIMEOUT = int( os.environ.get("MCP_BRIDGE_RUNTIME_IDLE_TIMEOUT", "300") ) _ALLOW_SELF_SERVER = os.environ.get( "MCP_BRIDGE_ALLOW_SELF_SERVER", "0" ).strip().lower() in { "1", "true", "yes", } _SELF_SERVER_TOKENS = { BRIDGE_NAME.lower(), "mcp_server_code_execution_mode", "mcp-server-code-execution-mode", } _PODMAN_PULL_PREFIXES: tuple[str, ...] = ( 'Resolved "', "Trying to pull", "Getting image source signatures", "Copying blob", "Copying config", "Extracting", "Writing manifest", "Storing signatures", ) SANDBOX_HELPERS_SUMMARY = ( "Persistent Python Sandbox (state retained between tool calls). " "1. DISCOVER: `runtime.discovered_servers()`, `runtime.search_tool_docs('query')`. " "Use `discovered_servers(detailed=True)` for descriptions. " "2. CALL: `await mcp_server.tool()`. " "3. PERSIST: `save_tool(func)`. " "Run `print(runtime.capability_summary())` for the full manual." ) _NOISE_STREAM_TOKENS = {"()"} CAPABILITY_RESOURCE_URI = "resource://mcp-server-code-execution-mode/capabilities" _CAPABILITY_RESOURCE_NAME = "code-execution-capabilities" _CAPABILITY_RESOURCE_TITLE = "Code Execution Sandbox Helpers" _CAPABILITY_RESOURCE_DESCRIPTION = "Capability overview, helper reference, and sandbox usage notes (call runtime.capability_summary() inside the sandbox for this text)." _CAPABILITY_RESOURCE_TEXT = textwrap.dedent( f""" # Code Execution MCP Capabilities {SANDBOX_HELPERS_SUMMARY} ## Quick usage - Pass `servers=[...]` to mount MCP proxies (`mcp_<alias>` modules). - Import `mcp.runtime as runtime`; call `runtime.capability_summary()` instead of rereading this resource for the same hint. - Prefer the `_sync` helpers first to read cached metadata before issuing RPCs. - Server configs support a `cwd` field to start the host MCP server in a specific working directory. - LLMs should check `runtime.describe_server(name)` or `runtime.list_loaded_server_metadata()` for the server's configured `cwd` before assuming the working directory. If `cwd` is absent, the host starts the server in the bridge process' current directory (i.e., the default working directory). If your workload expects a specific working directory, please configure `cwd` in the server config or run the server in a container that mounts the project directory. Resource URI: {CAPABILITY_RESOURCE_URI} """ ).strip() def _build_capability_resource() -> Resource: return Resource( name=_CAPABILITY_RESOURCE_NAME, title=_CAPABILITY_RESOURCE_TITLE, description=_CAPABILITY_RESOURCE_DESCRIPTION, uri=CAPABILITY_RESOURCE_URI, # type: ignore[arg-type] mimeType="text/markdown", size=len(_CAPABILITY_RESOURCE_TEXT.encode("utf-8")), ) class ConfigSource(NamedTuple): path: Path type: Literal["file", "directory"] format: Literal["json", "toml"] = "json" name: str = "Unknown" CONFIG_SOURCES = [ # Only load configs from the MCPs directory for now. ConfigSource(Path.home() / "MCPs", "directory", name="User MCPs"), ] """Temporarily disabled config sources (kept for reference): ConfigSource( Path.home() / ".config" / "mcp" / "servers", "directory", name="Standard MCP" ), ConfigSource( Path.home() / "Library" / "Application Support" / "Claude Code" / "mcp" / "servers", "directory", name="Claude Code", ), ConfigSource( Path.home() / "Library" / "Application Support" / "Claude" / "mcp" / "servers", "directory", name="Claude Desktop", ), ConfigSource(Path.cwd() / "mcp-servers", "directory", name="Local Project"), ConfigSource(Path.home() / ".claude.json", "file", name="Claude CLI"), ConfigSource( Path.home() / "Library" / "Application Support" / "Claude Code" / "claude_code_config.json", "file", name="Claude Code", ), ConfigSource( Path.home() / "Library" / "Application Support" / "Claude" / "claude_code_config.json", "file", name="Claude Code (Legacy)", ), ConfigSource( Path.home() / "Library" / "Application Support" / "Claude" / "claude_desktop_config.json", "file", name="Claude Desktop", ), ConfigSource( Path.cwd() / "claude_code_config.json", "file", name="Local Claude Code" ), ConfigSource( Path.cwd() / "claude_desktop_config.json", "file", name="Local Claude Desktop" ), ConfigSource(Path.home() / ".opencode.json", "file", name="OpenCode CLI"), ConfigSource( Path.home() / "Library" / "Application Support" / "OpenCode" / "opencode_config.json", "file", name="OpenCode", ), ConfigSource( Path.home() / "Library" / "Application Support" / "OpenCode" / "opencode_desktop_config.json", "file", name="OpenCode Desktop", ), ConfigSource(Path.cwd() / "opencode_config.json", "file", name="Local OpenCode"), ConfigSource( Path.cwd() / "opencode_desktop_config.json", "file", name="Local OpenCode Desktop", ), ConfigSource(Path.home() / ".gemini" / "settings.json", "file", name="Gemini CLI"), ConfigSource( Path.home() / ".codex" / "config.toml", "file", format="toml", name="OpenAI Codex", ), ConfigSource(Path.home() / ".cursor" / "mcp.json", "file", name="Cursor"), ConfigSource( Path.home() / ".codeium" / "windsurf" / "mcp_config.json", "file", name="Windsurf", ), ConfigSource(Path.cwd() / ".vscode" / "mcp.json", "file", name="VS Code Workspace"), ConfigSource( Path.home() / "Library" / "Application Support" / "Code" / "User" / "settings.json", "file", name="VS Code Global (macOS)", ), ConfigSource( Path.home() / ".config" / "Code" / "User" / "settings.json", "file", name="VS Code Global (Linux)", ), ConfigSource( Path.home() / ".antigravity" / "settings.json", "file", name="Antigravity IDE" ), ConfigSource( Path.home() / ".antigravity" / "mcp.json", "file", name="Antigravity IDE" ), """ class SandboxError(RuntimeError): """Raised when the sandbox cannot execute user code.""" def __init__(self, message: str, *, stdout: str = "", stderr: str = "") -> None: super().__init__(message) self.stdout = stdout self.stderr = stderr class ClientLike(Protocol): async def list_tools( self, ) -> List[Dict[str, object]]: # pragma: no cover - typing only ... async def call_tool( self, name: str, arguments: Dict[str, object] ) -> Dict[str, object]: # pragma: no cover - typing only ... async def stop(self) -> None: # pragma: no cover - typing only ... class SandboxLike(Protocol): async def execute( self, code: str, **kwargs ) -> SandboxResult: # pragma: no cover - typing only ... async def ensure_shared_directory( self, path: Path ) -> None: # pragma: no cover - typing only ... class SandboxTimeout(SandboxError): """Raised when user code exceeds the configured timeout.""" @dataclass class SandboxResult: """Execution result captured from the sandbox.""" success: bool exit_code: int stdout: str stderr: str @dataclass class MCPServerInfo: """Configuration for a single MCP server binary.""" name: str command: str args: List[str] env: Dict[str, str] cwd: Optional[str] = None description: str = "" def _looks_like_self_server( info: Union[MCPServerInfo, Dict[str, Any]], name: Optional[str] = None ) -> bool: """Return True if the config appears to launch this bridge itself.""" if isinstance(info, MCPServerInfo): server_name = info.name.lower() command = info.command args = info.args else: server_name = (name or "").lower() command = str(info.get("command", "")) raw_args = info.get("args", []) args = [str(a) for a in raw_args] if isinstance(raw_args, list) else [] if server_name in _SELF_SERVER_TOKENS: return True command_name = Path(command).name.lower() if command_name in _SELF_SERVER_TOKENS or command_name.endswith( "mcp_server_code_execution_mode.py" ): return True for arg in args: arg_lower = str(arg).lower() arg_name = Path(arg_lower).name if ( arg_lower in _SELF_SERVER_TOKENS or arg_name.lower() in _SELF_SERVER_TOKENS or arg_lower.endswith("mcp_server_code_execution_mode.py") ): return True return False def _split_output_lines(stream: Optional[str]) -> List[str]: """Return a newline-preserving list for stdout/stderr fields.""" if not stream: return [] return stream.splitlines() def _filter_stream_lines(lines: Sequence[str]) -> List[str]: """Drop whitespace-only or noise-only lines to save response tokens.""" filtered: List[str] = [] for line in lines: text = str(line) stripped = text.strip() if not stripped or stripped in _NOISE_STREAM_TOKENS: continue filtered.append(text) return filtered def _render_toon_block(payload: Dict[str, object]) -> str: """Encode a payload in TOON format, falling back to JSON when unavailable.""" if _toon_encode is not None: try: body = _toon_encode(payload) except Exception: # pragma: no cover - defensive fallback logger.debug("Failed to encode payload as TOON", exc_info=True) else: body = body.rstrip() return f"```toon\n{body}\n```" if body else "```toon\n```" fallback = json.dumps(payload, indent=2, sort_keys=True) return f"```json\n{fallback}\n```" def _output_mode() -> str: """Return the configured output mode.""" return os.environ.get("MCP_BRIDGE_OUTPUT_MODE", "compact").strip().lower() def _render_compact_output(payload: Dict[str, object]) -> str: """Render a terse, token-efficient textual summary.""" lines: List[str] = [] stdout_raw = payload.get("stdout", ()) if isinstance(stdout_raw, (list, tuple)): stdout_lines = list(stdout_raw) else: stdout_lines = [] stderr_raw = payload.get("stderr", ()) if isinstance(stderr_raw, (list, tuple)): stderr_lines = list(stderr_raw) else: stderr_lines = [] if stdout_lines: lines.append("\n".join(str(item) for item in stdout_lines)) if stderr_lines: stderr_text = "\n".join(str(item) for item in stderr_lines) lines.append(f"stderr:\n{stderr_text}") status = str(payload.get("status", "")) exit_code = payload.get("exitCode") error = payload.get("error") if not lines and payload.get("summary"): lines.append(str(payload["summary"])) if error and (not lines or status != "error"): lines.append(f"error: {error}") if exit_code not in (None, 0): lines.insert(0, f"exit: {exit_code}") if status and status.lower() not in {"", "success"}: lines.insert(0, f"status: {status}") text = "\n".join(line for line in lines if line).strip() if text: return text if status: return status return str(payload.get("summary", "")).strip() or "success" def _build_compact_structured_payload(payload: Dict[str, object]) -> Dict[str, object]: """Return a trimmed structured representation for compact responses.""" compact: Dict[str, object] = {} status = str(payload.get("status", "")) exit_code = payload.get("exitCode") if status and status.lower() != "success": compact["status"] = status if exit_code not in (None, 0): compact["exitCode"] = exit_code if payload.get("stdout"): compact["stdout"] = payload["stdout"] if payload.get("stderr"): compact["stderr"] = payload["stderr"] if payload.get("servers"): compact["servers"] = payload["servers"] if payload.get("timeoutSeconds"): compact["timeoutSeconds"] = payload["timeoutSeconds"] if payload.get("error"): compact["error"] = payload["error"] summary = payload.get("summary") if summary and (status.lower() != "success" or not compact.get("stdout")): compact["summary"] = summary return compact or { key: payload[key] for key in ("status", "summary") if key in payload } def _build_response_payload( *, status: str, summary: str, exit_code: Optional[int] = None, stdout: Optional[str] = None, stderr: Optional[str] = None, servers: Optional[Sequence[str]] = None, error: Optional[str] = None, timeout_seconds: Optional[int] = None, ) -> Dict[str, object]: """Create a structured payload shared by compact/TOON responses.""" summary_lower = summary.strip().lower() payload: Dict[str, object] = { "status": status, "summary": summary, } if exit_code is not None: payload["exitCode"] = exit_code if servers: payload["servers"] = list(servers) stdout_lines = _filter_stream_lines(_split_output_lines(stdout)) if stdout_lines: payload["stdout"] = stdout_lines stderr_lines = _filter_stream_lines(_split_output_lines(stderr)) if stderr_lines: payload["stderr"] = stderr_lines if error: payload["error"] = error if timeout_seconds is not None: payload["timeoutSeconds"] = timeout_seconds if ( status.lower() == "success" and not payload.get("stdout") and not payload.get("stderr") and summary_lower == "success" ): payload["summary"] = "Success (no output)" return {key: value for key, value in payload.items() if not _is_empty_field(value)} def _is_empty_field(value: object) -> bool: """Return True when a structured field should be omitted.""" if value is None: return True if isinstance(value, (list, tuple, set, dict, str)): return len(value) == 0 return False def _build_tool_response( *, status: str, summary: str, exit_code: Optional[int] = None, stdout: Optional[str] = None, stderr: Optional[str] = None, servers: Optional[Sequence[str]] = None, error: Optional[str] = None, timeout_seconds: Optional[int] = None, ) -> CallToolResult: """Render a tool response in compact text (default) or TOON format.""" payload = _build_response_payload( status=status, summary=summary, exit_code=exit_code, stdout=stdout, stderr=stderr, servers=servers, error=error, timeout_seconds=timeout_seconds, ) status = str(payload.get("status", "error")).lower() is_error = status not in {"success"} mode = _output_mode() if mode == "compact": message = _render_compact_output(payload) structured = _build_compact_structured_payload(payload) return CallToolResult( content=[TextContent(type="text", text=message)], structuredContent=structured, isError=is_error, ) message = _render_toon_block(payload) return CallToolResult( content=[TextContent(type="text", text=message)], structuredContent=payload, isError=is_error, ) def _sanitize_identifier(value: str, *, default: str) -> str: """Convert an arbitrary string into a valid Python identifier.""" cleaned = re.sub(r"[^0-9a-zA-Z_]+", "_", value.strip()) cleaned = cleaned.lower() or default if cleaned[0].isdigit(): cleaned = f"_{cleaned}" if keyword.iskeyword(cleaned): cleaned = f"{cleaned}_" return cleaned class PersistentMCPClient: """Maintain a persistent MCP stdio session.""" def __init__(self, server_info: MCPServerInfo) -> None: self.server_info = server_info self._stdio_cm: Optional[Any] = None self._session: Optional[ClientSession] = None self._forward_task: Optional[asyncio.Task[None]] = None self._captured_stderr: Optional[io.TextIOBase] = None async def start(self) -> None: if self._session: return params = StdioServerParameters( command=self.server_info.command, args=self.server_info.args, env=self.server_info.env or None, cwd=self.server_info.cwd or None, ) # Capture stderr in a real file object for cross-platform compatibility self._captured_stderr = tempfile.TemporaryFile(mode="w+t", encoding="utf-8") # Only pass errlog if stdio_client supports it (tests may patch stdio_client) if "errlog" in inspect.signature(stdio_client).parameters: client_cm = stdio_client(params, errlog=self._captured_stderr) else: client_cm = stdio_client(params) self._stdio_cm = client_cm raw_read_stream, write_stream = await client_cm.__aenter__() # Create a filtered reader stream to hide benign XML/blank-line JSON parse errors filtered_writer, filtered_read = anyio.create_memory_object_stream(0) async def _forward_read() -> None: try: async with filtered_writer: async for item in raw_read_stream: # Filter out JSON parse errors that are likely caused by stray blank lines if isinstance(item, Exception): message = str(item) if ( "Invalid JSON" in message and "EOF while parsing a value" in message and "input_value='\\n'" in message ): # ignore blank line parse errors continue await filtered_writer.send(item) except anyio.ClosedResourceError: await anyio.lowlevel.checkpoint() # Launch the forwarder task self._forward_task = asyncio.create_task(_forward_read()) session = ClientSession(filtered_read, write_stream) await session.__aenter__() try: await session.initialize() except Exception as exc: # pragma: no cover - initialization failure reporting # Read captured stderr content for diagnostics if present stderr_text = "" if self._captured_stderr is not None: try: self._captured_stderr.seek(0) stderr_text = self._captured_stderr.read() except Exception: stderr_text = "<failed to read captured stderr>" logger.debug( "Client session failed to initialize: %s (stderr=%s)", exc, stderr_text ) # Re-raise for callers to handle; captured stderr is useful for debugging raise self._session = session async def list_tools(self) -> List[Dict[str, object]]: if not self._session: raise SandboxError("MCP client not started") result = await self._session.list_tools() return [ tool.model_dump(by_alias=True, exclude_none=True) for tool in result.tools ] async def call_tool( self, name: str, arguments: Dict[str, object] ) -> Dict[str, object]: if not self._session: raise SandboxError("MCP client not started") call_result = await self._session.call_tool(name=name, arguments=arguments) return call_result.model_dump(by_alias=True, exclude_none=True) async def stop(self) -> None: if self._session: try: await self._session.__aexit__(None, None, None) except* Exception as exc: # pragma: no cover - defensive cleanup logger.debug("MCP session shutdown raised %s", exc, exc_info=True) finally: self._session = None if self._stdio_cm: try: await self._stdio_cm.__aexit__(None, None, None) # type: ignore[union-attr] except* Exception as exc: # pragma: no cover - defensive cleanup logger.debug("MCP stdio shutdown raised %s", exc, exc_info=True) finally: self._stdio_cm = None # Ensure the forwarder task is cancelled if self._forward_task: task = self._forward_task self._forward_task = None task.cancel() with suppress(asyncio.CancelledError): await task if self._captured_stderr is not None: try: self._captured_stderr.close() except Exception: pass self._captured_stderr = None class RootlessContainerSandbox: """Execute Python code in a locked-down container.""" def __init__( self, *, runtime: Optional[str] = None, image: str = DEFAULT_IMAGE, memory_limit: str = DEFAULT_MEMORY, pids_limit: int = DEFAULT_PIDS, cpu_limit: Optional[str] = DEFAULT_CPUS, runtime_idle_timeout: int = DEFAULT_RUNTIME_IDLE_TIMEOUT, ) -> None: self.runtime = detect_runtime(runtime) self.image = image self.memory_limit = memory_limit self.pids_limit = pids_limit self.cpu_limit = cpu_limit self._runtime_check_lock = asyncio.Lock() self.runtime_idle_timeout = max(0, runtime_idle_timeout) self._shutdown_task: Optional[asyncio.Task[None]] = None self._share_lock = asyncio.Lock() self._shared_paths: set[str] = set() self._process: Optional[asyncio.subprocess.Process] = None def _base_cmd(self) -> List[str]: if not self.runtime: raise SandboxError( "No container runtime found. Install podman or rootless docker and set " "MCP_BRIDGE_RUNTIME if multiple runtimes are available." ) cmd: List[str] = [ self.runtime, "run", "--rm", "--interactive", "--network", "none", "--read-only", "--pids-limit", str(self.pids_limit), "--memory", self.memory_limit, "--tmpfs", "/tmp:rw,noexec,nosuid,nodev,size=64m", "--tmpfs", "/workspace:rw,noexec,nosuid,nodev,size=128m", "--workdir", "/workspace", "--env", "HOME=/workspace", "--env", "PYTHONUNBUFFERED=1", "--env", "PYTHONIOENCODING=utf-8", "--env", "PYTHONDONTWRITEBYTECODE=1", "--security-opt", "no-new-privileges", "--cap-drop", "ALL", "--user", CONTAINER_USER, ] if self.cpu_limit: cmd.extend(["--cpus", self.cpu_limit]) return cmd def _render_entrypoint( self, servers_metadata: Sequence[Dict[str, object]], discovered_servers: Dict[str, str], ) -> str: metadata_json = json.dumps(servers_metadata, separators=(",", ":")) discovered_json = json.dumps(discovered_servers, separators=(",", ":")) template = textwrap.dedent( """ import asyncio import inspect import json import sys import traceback import types from contextlib import suppress from pathlib import Path AVAILABLE_SERVERS = json.loads(__METADATA_JSON__) DISCOVERED_SERVERS = json.loads(__DISCOVERED_JSON__) USER_TOOLS_PATH = Path("/projects/user_tools.py") _PENDING_RESPONSES = {} _REQUEST_COUNTER = 0 _EXECUTION_QUEUE = asyncio.Queue() def _send_message(message): sys.__stdout__.write(json.dumps(message, separators=(",", ":")) + "\\n") sys.__stdout__.flush() class _StreamProxy: def __init__(self, kind): self._kind = kind def write(self, data): if not data: return _send_message({"type": self._kind, "data": data}) def flush(self): pass def isatty(self): return False sys.stdout = _StreamProxy("stdout") sys.stderr = _StreamProxy("stderr") async def _stdin_reader(): loop = asyncio.get_running_loop() reader = asyncio.StreamReader() protocol = asyncio.StreamReaderProtocol(reader) await loop.connect_read_pipe(lambda: protocol, sys.stdin) while True: line = await reader.readline() if not line: sys.exit(0) try: message = json.loads(line.decode()) except Exception: continue msg_type = message.get("type") if msg_type == "rpc_response": request_id = message.get("id") future = _PENDING_RESPONSES.pop(request_id, None) if future and not future.done(): if message.get("success", True): future.set_result(message.get("payload")) else: future.set_exception(RuntimeError(message.get("error", "RPC error"))) elif msg_type == "execute": await _EXECUTION_QUEUE.put(message.get("code")) # The original try/finally block for transport is removed as per instruction. async def _rpc_call(payload): loop = asyncio.get_running_loop() global _REQUEST_COUNTER _REQUEST_COUNTER += 1 request_id = _REQUEST_COUNTER future = loop.create_future() _PENDING_RESPONSES[request_id] = future _send_message({"type": "rpc_request", "id": request_id, "payload": payload}) return await future def _install_mcp_modules(): mcp_pkg = types.ModuleType("mcp") mcp_pkg.__path__ = [] mcp_pkg.__all__ = ["runtime", "servers"] sys.modules["mcp"] = mcp_pkg runtime_module = types.ModuleType("mcp.runtime") servers_module = types.ModuleType("mcp.servers") servers_module.__path__ = [] sys.modules["mcp.runtime"] = runtime_module sys.modules["mcp.servers"] = servers_module mcp_pkg.runtime = runtime_module mcp_pkg.servers = servers_module # Load user tools if they exist if USER_TOOLS_PATH.exists(): try: import importlib.util spec = importlib.util.spec_from_file_location("user_tools", USER_TOOLS_PATH) if spec and spec.loader: user_tools = importlib.util.module_from_spec(spec) sys.modules["user_tools"] = user_tools spec.loader.exec_module(user_tools) # Export everything from user_tools to global namespace for name, val in vars(user_tools).items(): if not name.startswith("_"): globals()[name] = val except Exception: pass def save_tool(func): '''Saves a function as a persistent tool available in future sessions.''' if not inspect.isfunction(func): raise ValueError("save_tool expects a function") source = inspect.getsource(func) USER_TOOLS_PATH.parent.mkdir(parents=True, exist_ok=True) with open(USER_TOOLS_PATH, "a") as f: f.write("\\n\\n") f.write(source) return f"Tool '{func.__name__}' saved. It will be available in future sessions." runtime_module.save_tool = save_tool globals()["save_tool"] = save_tool class MCPError(RuntimeError): 'Raised when an MCP call fails.' _CAPABILITY_SUMMARY = ( "--- PYTHON SANDBOX MANUAL ---\\n" "1. PHILOSOPHY: You are in a persistent Python environment. Prefer writing code over calling tools when possible.\\n" "2. DISCOVERY: Use `runtime.discovered_servers()` to list servers. " "Use `runtime.discovered_servers(detailed=True)` for descriptions. " "Use `runtime.search_tool_docs('query')` to find tools. " "Don't guess tool names; search first.\\n" "3. PERSISTENCE: You can save your own tools! Define a Python function and call `save_tool(func)`. " "It will be saved to `~/MCPs/user_tools.py` and auto-loaded in future sessions.\\n" "4. HELPERS: `import mcp.runtime as runtime`. Available: list_servers(), list_tools_sync(server), " "query_tool_docs(server), describe_server(name).\\n" "5. PROXIES: Loaded servers are available as `mcp_<alias>` (e.g. `await mcp_filesystem.read_file(...)`)." ) _LOADED_SERVER_NAMES = tuple(server.get("name") for server in AVAILABLE_SERVERS) def _lookup_server(name): for server in AVAILABLE_SERVERS: if server.get("name") == name: return server raise MCPError(f"Server {name!r} is not loaded") def _normalise_detail(value): detail = str(value).lower() if value is not None else "summary" return detail if detail in {"summary", "full"} else "summary" def _format_tool_doc(server_info, tool_info, detail): doc = { "server": server_info.get("name"), "serverAlias": server_info.get("alias"), "tool": tool_info.get("name"), "toolAlias": tool_info.get("alias"), } description = tool_info.get("description") if description: doc["description"] = description if detail == "full" and tool_info.get("input_schema") is not None: doc["inputSchema"] = tool_info.get("input_schema") return doc async def call_tool(server, tool, arguments=None): response = await _rpc_call( { "type": "call_tool", "server": server, "tool": tool, "arguments": arguments or {}, } ) if not response.get("success", True): raise MCPError(response.get("error", "MCP request failed")) return response.get("result") async def list_tools(server): response = await _rpc_call( { "type": "list_tools", "server": server, } ) if not response.get("success", True): raise MCPError(response.get("error", "MCP request failed")) return response.get("tools", []) async def list_servers(): response = await _rpc_call({"type": "list_servers"}) if not response.get("success", True): raise MCPError(response.get("error", "MCP request failed")) return tuple(response.get("servers", ())) def list_servers_sync(): return tuple(name for name in _LOADED_SERVER_NAMES if name) def discovered_servers(detailed=False): if detailed: return tuple({"name": k, "description": v} for k, v in DISCOVERED_SERVERS.items()) return tuple(DISCOVERED_SERVERS.keys()) def describe_server(name): return _lookup_server(name) def list_loaded_server_metadata(): return tuple(AVAILABLE_SERVERS) def list_tools_sync(server=None): if server is None: raise MCPError("list_tools_sync(server) requires a server name") info = _lookup_server(server) tools = info.get("tools", ()) or () return tuple(tools) async def query_tool_docs(server, tool=None, detail="summary"): payload = {"type": "query_tool_docs", "server": server} if tool is not None: payload["tool"] = tool if detail is not None: payload["detail"] = detail response = await _rpc_call(payload) if not response.get("success", True): raise MCPError(response.get("error", "MCP request failed")) docs = response.get("docs", []) if tool is not None and isinstance(docs, list) and len(docs) == 1: return docs[0] return docs async def search_tool_docs(query, *, limit=5, detail="summary"): payload = {"type": "search_tool_docs", "query": query} if limit is not None: payload["limit"] = limit if detail is not None: payload["detail"] = detail response = await _rpc_call(payload) if not response.get("success", True): raise MCPError(response.get("error", "MCP request failed")) return response.get("results", []) def query_tool_docs_sync(server, tool=None, detail="summary"): info = _lookup_server(server) detail_value = _normalise_detail(detail) tools = info.get("tools", ()) or () if tool is None: return [_format_tool_doc(info, tool_info, detail_value) for tool_info in tools] if not isinstance(tool, str): raise MCPError("'tool' must be a string when provided") target = tool.lower() for candidate in tools: alias_value = str(candidate.get("alias", "")).lower() name_value = str(candidate.get("name", "")).lower() if target in {alias_value, name_value}: return [_format_tool_doc(info, candidate, detail_value)] raise MCPError(f"Tool {tool!r} not found for server {server}") def search_tool_docs_sync(query, *, limit=5, detail="summary"): tokens = [token for token in str(query).lower().split() if token] if not tokens: return [] detail_value = _normalise_detail(detail) try: capped = max(1, min(20, int(limit))) except Exception: capped = 5 matches = [] for server_info in AVAILABLE_SERVERS: tools = server_info.get("tools", ()) or () server_keywords = " ".join( filter( None, ( server_info.get("name"), server_info.get("alias"), ), ) ).lower() for tool_info in tools: haystack = " ".join( filter( None, ( server_keywords, tool_info.get("name"), tool_info.get("alias"), tool_info.get("description"), ), ) ).lower() if all(token in haystack for token in tokens): matches.append(_format_tool_doc(server_info, tool_info, detail_value)) if len(matches) >= capped: return matches return matches def capability_summary(): return _CAPABILITY_SUMMARY runtime_module.MCPError = MCPError runtime_module.call_tool = call_tool runtime_module.list_tools = list_tools runtime_module.list_servers = list_servers runtime_module.list_servers_sync = list_servers_sync runtime_module.discovered_servers = discovered_servers runtime_module.describe_server = describe_server runtime_module.list_loaded_server_metadata = list_loaded_server_metadata runtime_module.list_tools_sync = list_tools_sync runtime_module.query_tool_docs = query_tool_docs runtime_module.search_tool_docs = search_tool_docs runtime_module.query_tool_docs_sync = query_tool_docs_sync runtime_module.search_tool_docs_sync = search_tool_docs_sync runtime_module.capability_summary = capability_summary runtime_module.__all__ = [ "MCPError", "call_tool", "list_tools", "list_tools_sync", "list_servers", "list_servers_sync", "discovered_servers", "describe_server", "list_loaded_server_metadata", "query_tool_docs_sync", "query_tool_docs", "search_tool_docs_sync", "search_tool_docs", "capability_summary", ] servers_module.__all__ = [] def _make_tool_callable(server_name, tool_name): async def _invoke(**kwargs): return await call_tool(server_name, tool_name, kwargs) return _invoke for server in AVAILABLE_SERVERS: alias = server["alias"] module_name = f"mcp.servers.{alias}" server_module = types.ModuleType(module_name) server_module.__doc__ = f"MCP server '{server['name']}' wrappers" server_module.__all__ = [] tool_map = {} for tool in server.get("tools", []): tool_alias = tool["alias"] summary = (tool.get("description") or "").strip() or f"MCP tool {tool['name']} from {server['name']}" func = _make_tool_callable(server["name"], tool["name"]) func.__name__ = tool_alias func.__doc__ = summary setattr(server_module, tool_alias, func) server_module.__all__.append(tool_alias) tool_map[tool_alias] = tool server_module.TOOLS = server.get("tools", []) server_module.TOOL_MAP = tool_map setattr(servers_module, alias, server_module) sys.modules[module_name] = server_module servers_module.__all__.append(alias) return runtime_module runtime_module = _install_mcp_modules() import mcp class _MCPProxy: def __init__(self, server_info): self._server_name = server_info["name"] self._tools = {tool["alias"]: tool for tool in server_info.get("tools", [])} async def list_tools(self): response = await _rpc_call( { "type": "list_tools", "server": self._server_name, } ) if not response.get("success", True): raise RuntimeError(response.get("error", "MCP request failed")) return response.get("tools", []) def __getattr__(self, tool_alias): tool = self._tools.get(tool_alias) target = tool.get("name") if tool else tool_alias summary = (tool.get("description") if tool else "") or "" async def _invoke(_target=target, **kwargs): response = await _rpc_call( { "type": "call_tool", "server": self._server_name, "tool": _target, "arguments": kwargs, } ) if not response.get("success", True): raise RuntimeError(response.get("error", "MCP call failed")) return response.get("result") if summary: _invoke.__doc__ = summary _invoke.__name__ = tool_alias return _invoke _GLOBAL_NAMESPACE = {"__name__": "__sandbox__"} _GLOBAL_NAMESPACE.setdefault("mcp", __import__("mcp")) LOADED_MCP_SERVERS = tuple(server["name"] for server in AVAILABLE_SERVERS) mcp_servers = {} for server in AVAILABLE_SERVERS: proxy = _MCPProxy(server) mcp_servers[server["name"]] = proxy _GLOBAL_NAMESPACE[f"mcp_{server['alias']}"] = proxy _GLOBAL_NAMESPACE.setdefault("mcp_servers", {}).update(mcp_servers) _GLOBAL_NAMESPACE["LOADED_MCP_SERVERS"] = LOADED_MCP_SERVERS async def _execute_code(code): try: flags = getattr(__import__("ast"), "PyCF_ALLOW_TOP_LEVEL_AWAIT", 0) compiled = compile(code, "<sandbox>", "exec", flags=flags) result = eval(compiled, _GLOBAL_NAMESPACE, _GLOBAL_NAMESPACE) if inspect.isawaitable(result): await result except SystemExit: raise except BaseException: traceback.print_exc() async def _main_loop(): asyncio.create_task(_stdin_reader()) while True: code = await _EXECUTION_QUEUE.get() await _execute_code(code) _send_message({"type": "execution_done"}) if __name__ == "__main__": try: asyncio.run(_main_loop()) except KeyboardInterrupt: pass """ ).lstrip() return template.replace("__METADATA_JSON__", repr(metadata_json)).replace( "__DISCOVERED_JSON__", repr(discovered_json) ) async def _run_runtime_command(self, *args: str) -> tuple[int, str, str]: process = await asyncio.create_subprocess_exec( self.runtime, *args, stdout=aio_subprocess.PIPE, stderr=aio_subprocess.PIPE, ) stdout_bytes, stderr_bytes = await process.communicate() stdout_text = stdout_bytes.decode(errors="replace") stderr_text = stderr_bytes.decode(errors="replace") assert process.returncode is not None return process.returncode, stdout_text, stderr_text async def _stop_runtime(self) -> None: if not self.runtime: return runtime_name = os.path.basename(self.runtime) if "podman" not in runtime_name: return code, stdout_text, stderr_text = await self._run_runtime_command( "machine", "stop" ) if code != 0: combined = f"{stdout_text}\n{stderr_text}".lower() if "already stopped" in combined or "is not running" in combined: return logger.debug("Failed to stop podman machine: %s", stderr_text.strip()) async def _cancel_runtime_shutdown_timer(self) -> None: if not self._shutdown_task: return task = self._shutdown_task self._shutdown_task = None task.cancel() with suppress(asyncio.CancelledError): await task async def _schedule_runtime_shutdown(self) -> None: if self.runtime_idle_timeout <= 0: return await self._cancel_runtime_shutdown_timer() async def _delayed_shutdown() -> None: try: await asyncio.sleep(self.runtime_idle_timeout) await self._stop_runtime() except asyncio.CancelledError: raise except Exception: # pragma: no cover - diagnostic fallback logger.debug("Runtime shutdown task failed", exc_info=True) self._shutdown_task = asyncio.create_task(_delayed_shutdown()) async def _ensure_runtime_ready(self) -> None: async with self._runtime_check_lock: await self._cancel_runtime_shutdown_timer() if not self.runtime: # We will fail later when trying to run the command, but for now # we can't do any runtime specific checks return runtime_name = os.path.basename(self.runtime) if "podman" not in runtime_name: return for _ in range(3): code, stdout_text, stderr_text = await self._run_runtime_command( "info", "--format", "{{json .}}", ) if code == 0: return combined = f"{stdout_text}\n{stderr_text}".lower() needs_machine = any( phrase in combined for phrase in ( "cannot connect to podman", "podman machine", "run the podman machine", "socket: connect", ) ) if not needs_machine: raise SandboxError( "Container runtime is unavailable", stdout=stdout_text, stderr=stderr_text, ) ( start_code, start_stdout, start_stderr, ) = await self._run_runtime_command("machine", "start") if start_code == 0: continue start_combined = f"{start_stdout}\n{start_stderr}".lower() if ( "does not exist" in start_combined or "no such machine" in start_combined ): ( init_code, init_stdout, init_stderr, ) = await self._run_runtime_command("machine", "init") if init_code != 0: raise SandboxError( "Failed to initialize Podman machine", stdout=init_stdout, stderr=init_stderr, ) # After init, loop will retry info/start sequence continue raise SandboxError( "Failed to start Podman machine", stdout=start_stdout, stderr=start_stderr, ) raise SandboxError( "Unable to prepare Podman runtime", stdout="", stderr="Repeated podman machine start attempts failed", ) async def _ensure_started( self, servers_metadata: Sequence[Dict[str, object]], discovered_servers: Dict[str, str], container_env: Optional[Dict[str, str]], volume_mounts: Optional[Sequence[str]], host_dir: Path, ) -> None: if self._process and self._process.returncode is None: return await self._ensure_runtime_ready() if not self.runtime: raise SandboxError( "No container runtime found. Install podman or rootless docker and set " "MCP_BRIDGE_RUNTIME if multiple runtimes are available." ) entrypoint_path = host_dir / "entrypoint.py" entrypoint_path.write_text( self._render_entrypoint(servers_metadata, discovered_servers) ) entrypoint_target = f"/ipc/{entrypoint_path.name}" cmd = self._base_cmd() if volume_mounts: for mount in volume_mounts: cmd.extend(["--volume", mount]) if container_env: for key, value in container_env.items(): cmd.extend(["--env", f"{key}={value}"]) cmd.extend([self.image, "python3", "-u", entrypoint_target]) self._process = await asyncio.create_subprocess_exec( *cmd, stdin=aio_subprocess.PIPE, stdout=aio_subprocess.PIPE, stderr=aio_subprocess.PIPE, ) async def execute( self, code: str, *, timeout: int = DEFAULT_TIMEOUT, servers_metadata: Sequence[Dict[str, object]] = (), discovered_servers: Dict[str, str] = {}, container_env: Optional[Dict[str, str]] = None, volume_mounts: Optional[Sequence[str]] = None, host_dir: Optional[Path] = None, rpc_handler: Optional[ Callable[[Dict[str, object]], Awaitable[Dict[str, object]]] ] = None, ) -> SandboxResult: if host_dir is None: raise SandboxError("Sandbox host directory is not available") await self._ensure_started( servers_metadata, discovered_servers, container_env, volume_mounts, host_dir, ) process = self._process assert process is not None assert process.stdin is not None assert process.stdout is not None assert process.stderr is not None # Send code execution request request = {"type": "execute", "code": code} try: process.stdin.write(json.dumps(request).encode("utf-8") + b"\n") await process.stdin.drain() except Exception as exc: raise SandboxError(f"Failed to send code to sandbox: {exc}") from exc stdout_chunks: List[str] = [] stderr_chunks: List[str] = [] async def _handle_stdout() -> None: assert process.stdout is not None async for line in process.stdout: try: message = json.loads(line.decode()) except Exception: stderr_chunks.append(line.decode(errors="replace")) continue msg_type = message.get("type") if msg_type == "stdout": stdout_chunks.append(message.get("data", "")) elif msg_type == "stderr": stderr_chunks.append(message.get("data", "")) elif msg_type == "execution_done": break elif msg_type == "rpc_request": if process.stdin is None: continue if rpc_handler is None: response: Dict[str, object] = { "success": False, "error": "RPC handler unavailable", } else: try: payload = message.get("payload", {}) response = await rpc_handler( payload if isinstance(payload, dict) else {} ) except Exception as exc: logger.debug("RPC handler failed", exc_info=True) response = {"success": False, "error": str(exc)} reply: Dict[str, object] = { "type": "rpc_response", "id": message.get("id"), "success": response.get("success", True), "payload": response, } if not reply["success"]: reply["error"] = response.get("error", "RPC error") try: data = ( json.dumps(reply, separators=(",", ":")).encode("utf-8") + b"\n" ) process.stdin.write(data) await process.stdin.drain() except Exception: stderr_chunks.append("Failed to deliver RPC response\n") break else: stderr_chunks.append(json.dumps(message, separators=(",", ":"))) async def _read_stderr() -> None: assert process.stderr is not None while True: # We can't just read until EOF because the process is persistent. # We need to read what's available. # But stderr from python usually comes line by line or buffered. # If we read(4096), it might block if nothing is there? # No, asyncio read blocks until *some* data is available. # But we don't know when to STOP reading for this execution. # The container script redirects sys.stderr to _StreamProxy which sends JSON over stdout. # So REAL stderr should only contain runtime errors or C-level stderr. # We should probably read it continuously in a background task that lives as long as the process? # For now, let's just read line by line. line = await process.stderr.readline() if not line: break stderr_chunks.append(line.decode(errors="replace")) # We can't easily sync stderr reading with execution_done if it's not wrapped. # But our entrypoint redirects sys.stderr to stdout (wrapped). # So process.stderr will only have "hard" errors. # We can just let it accumulate in the background? # But we want to return it with the result. # Let's just assume "hard" stderr is rare and maybe we miss some if it comes after execution_done. # Actually, we can't block on stderr.readline() if there is no stderr. # So we should probably NOT await stderr reading here, or use a non-blocking approach. # But we are in an async function. # Let's spawn a stderr reader task that runs forever (attached to the object?) # Or just ignore raw stderr for now? # The previous implementation read stderr until EOF. # I'll stick to reading stdout loop. Raw stderr will be lost if I don't read it. # I'll add a background task to self that consumes stderr? pass stdout_task = asyncio.create_task(_handle_stdout()) try: await asyncio.wait_for(stdout_task, timeout=timeout) except asyncio.TimeoutError as exc: # We don't kill the process on timeout, we just stop waiting? # Or do we kill it? # If we don't kill it, the loop is still running. # We should probably kill it to clear state? # Or send a "cancel" message? # For now, let's kill it on timeout to be safe. process.kill() await process.wait() raise SandboxTimeout( f"Execution timed out after {timeout}s", stdout="".join(stdout_chunks), stderr="".join(stderr_chunks), ) from exc stdout_text = "".join(stdout_chunks) stderr_text = "".join(stderr_chunks) # We don't check return code because process is still running. return SandboxResult(True, 0, stdout_text, stderr_text) async def _stop_runtime(self) -> None: if self._process: try: self._process.terminate() await self._process.wait() except Exception: pass self._process = None if not self.runtime: return runtime_name = os.path.basename(self.runtime) if "podman" not in runtime_name: return code, stdout_text, stderr_text = await self._run_runtime_command( "machine", "stop" ) if code != 0: combined = f"{stdout_text}\n{stderr_text}".lower() if "already stopped" in combined or "is not running" in combined: return logger.debug("Failed to stop podman machine: %s", stderr_text.strip()) async def ensure_shared_directory(self, path: Path) -> None: resolved = path.expanduser().resolve() resolved.mkdir(parents=True, exist_ok=True) path_str = str(resolved) if path_str in self._shared_paths: return async with self._share_lock: if path_str in self._shared_paths: return shared = True runtime_name = os.path.basename(self.runtime) if self.runtime else "" if "podman" in runtime_name: shared = await self._ensure_podman_volume_shared(resolved) if shared: self._shared_paths.add(path_str) async def _ensure_podman_volume_shared(self, path: Path) -> bool: if not self.runtime: return False share_spec = f"{path}:{path}" try: process = await asyncio.create_subprocess_exec( self.runtime, "machine", "set", "--rootful", "--volume", share_spec, stdout=aio_subprocess.PIPE, stderr=aio_subprocess.PIPE, ) except FileNotFoundError: logger.debug( "Podman binary not found while ensuring volume share for %s", path ) return False stdout_bytes, stderr_bytes = await process.communicate() stderr_text = stderr_bytes.decode(errors="replace") if process.returncode == 0: return True lower = stderr_text.lower() if "already exists" in lower or "would overwrite" in lower: return True if ( "unknown flag: --volume" in lower or "unrecognized option '--volume'" in lower ): if await self._podman_share_already_available(path): logger.info( "Podman runtime already exposes %s; skipping --volume configuration", path, ) return True logger.debug( "Failed to ensure podman shared volume for %s (exit %s): %s", path, process.returncode, stderr_text.strip() or stdout_bytes.decode(errors="replace").strip(), ) return False async def _podman_share_already_available(self, path: Path) -> bool: if not self.runtime: return False quoted = shlex.quote(str(path)) try: process = await asyncio.create_subprocess_exec( self.runtime, "machine", "ssh", f"test -d {quoted}", stdout=aio_subprocess.PIPE, stderr=aio_subprocess.PIPE, ) except FileNotFoundError: return False stdout_bytes, stderr_bytes = await process.communicate() if process.returncode == 0: return True logger.debug( "Podman VM does not see %s (exit %s): %s", path, process.returncode, stderr_bytes.decode(errors="replace").strip() or stdout_bytes.decode(errors="replace").strip(), ) return False def _filter_runtime_stderr(self, text: str) -> str: """Strip known runtime pull chatter so successful runs stay quiet.""" if not text or not self.runtime: return text runtime_name = os.path.basename(self.runtime).lower() if "podman" not in runtime_name: return text filtered_lines: List[str] = [] for line in text.splitlines(): stripped = line.strip() if stripped and any( stripped.startswith(prefix) for prefix in _PODMAN_PULL_PREFIXES ): continue filtered_lines.append(line) return "\n".join(filtered_lines).strip("\n") def detect_runtime(preferred: Optional[str] = None) -> Optional[str]: """Return the first available container runtime, or None if not found.""" candidates: List[Optional[str]] = [] if preferred: candidates.append(preferred) if DEFAULT_RUNTIME and DEFAULT_RUNTIME not in candidates: candidates.append(DEFAULT_RUNTIME) candidates.extend(["podman", "docker"]) for candidate in candidates: if candidate and shutil.which(candidate): return candidate return None class SandboxInvocation: """Context manager that prepares IPC resources for a sandbox invocation.""" def __init__(self, bridge: "MCPBridge", active_servers: Sequence[str]) -> None: self.bridge = bridge self.active_servers = list(dict.fromkeys(active_servers)) self._temp_dir: Optional[tempfile.TemporaryDirectory[str]] = None self.host_dir: Optional[Path] = None self.container_env: Dict[str, str] = {} self.volume_mounts: List[str] = [] self.server_metadata: List[Dict[str, object]] = [] self.allowed_servers: set[str] = set() self.discovered_servers: Dict[str, str] = {} async def __aenter__(self) -> "SandboxInvocation": self.server_metadata = [] for server_name in self.active_servers: metadata = await self.bridge.get_cached_server_metadata(server_name) self.server_metadata.append(metadata) self.allowed_servers = { str(meta.get("name")) for meta in self.server_metadata if isinstance(meta.get("name"), str) } self.discovered_servers = { name: server.description for name, server in self.bridge.servers.items() } state_dir_env = os.environ.get("MCP_BRIDGE_STATE_DIR") if state_dir_env: base_dir = Path(state_dir_env).expanduser() else: base_dir = Path.home() / "MCPs" base_dir = base_dir.resolve() base_dir.mkdir(parents=True, exist_ok=True) # Create user_tools directory user_tools_dir = base_dir / "user_tools" user_tools_dir.mkdir(parents=True, exist_ok=True) ensure_share = getattr(self.bridge.sandbox, "ensure_shared_directory", None) if ensure_share: await ensure_share(base_dir) self._temp_dir = tempfile.TemporaryDirectory( prefix="mcp-bridge-ipc-", dir=str(base_dir) ) host_dir = Path(self._temp_dir.name) os.chmod(host_dir, 0o755) self.host_dir = host_dir self.volume_mounts.append(f"{host_dir}:/ipc:rw") self.volume_mounts.append(f"{user_tools_dir}:/projects:rw") self.container_env["MCP_AVAILABLE_SERVERS"] = json.dumps( self.server_metadata, separators=(",", ":"), ) self.container_env["MCP_DISCOVERED_SERVERS"] = json.dumps( self.discovered_servers, separators=(",", ":"), ) return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: if self._temp_dir: self._temp_dir.cleanup() async def handle_rpc(self, request: Dict[str, object]) -> Dict[str, object]: req_type = request.get("type") if req_type == "list_servers": return { "success": True, "servers": sorted(self.allowed_servers), } if req_type == "query_tool_docs": server = request.get("server") if not isinstance(server, str) or server not in self.allowed_servers: return { "success": False, "error": f"Server {server!r} is not available", } tool = request.get("tool") if tool is not None and not isinstance(tool, str): return { "success": False, "error": "'tool' must be a string when provided", } detail = request.get("detail", "summary") try: docs = await self.bridge.get_tool_docs(server, tool=tool, detail=detail) except SandboxError as exc: return {"success": False, "error": str(exc)} return {"success": True, "docs": docs} if req_type == "search_tool_docs": query = request.get("query") if not isinstance(query, str) or not query.strip(): return { "success": False, "error": "Missing 'query' value", } limit = request.get("limit", 5) if not isinstance(limit, int): return { "success": False, "error": "'limit' must be an integer", } detail = request.get("detail", "summary") try: results = await self.bridge.search_tool_docs( query, allowed_servers=sorted(self.allowed_servers), limit=limit, detail=detail, ) except SandboxError as exc: return {"success": False, "error": str(exc)} return {"success": True, "results": results} if req_type not in {"list_tools", "call_tool"}: return { "success": False, "error": f"Unknown RPC type: {req_type}", } server = request.get("server") if not isinstance(server, str) or server not in self.allowed_servers: return { "success": False, "error": f"Server {server!r} is not available", } client = self.bridge.clients.get(server) if not client: return { "success": False, "error": f"Server {server} is not loaded", } try: if req_type == "list_tools": client_obj = cast(ClientLike, client) tools = await client_obj.list_tools() return {"success": True, "tools": tools} tool_name = request.get("tool") arguments = request.get("arguments", {}) if not isinstance(tool_name, str): return {"success": False, "error": "Missing tool name"} if not isinstance(arguments, dict): return {"success": False, "error": "Arguments must be an object"} arguments = cast(Dict[str, object], arguments) client_obj = cast(ClientLike, client) result = await client_obj.call_tool(tool_name, arguments) return {"success": True, "result": result} except Exception as exc: # pragma: no cover logger.debug("MCP proxy call failed", exc_info=True) return {"success": False, "error": str(exc)} class MCPBridge: """Expose the secure sandbox as an MCP tool with MCP proxying.""" def __init__(self, sandbox: Optional[object] = None) -> None: self.sandbox = sandbox or RootlessContainerSandbox() self.servers: Dict[str, MCPServerInfo] = {} self.clients: Dict[str, object] = {} self.loaded_servers: set[str] = set() self._aliases: Dict[str, str] = {} self._discovered = False self._server_metadata_cache: Dict[str, Dict[str, object]] = {} self._server_docs_cache: Dict[str, Dict[str, object]] = {} self._search_index: List[Dict[str, object]] = [] self._search_index_dirty = False async def discover_servers(self) -> Dict[str, str]: """ Scans all configured sources for MCP server definitions. Returns a dict of server_name -> description. """ discovered: Dict[str, str] = {} # 1. Scan all configured sources for source in CONFIG_SOURCES: if not source.path.exists(): continue try: if source.type == "directory": for config_file in source.path.glob(f"*.{source.format}"): server_configs = self._load_server_config( config_file, source_name=f"{source.name} ({config_file.name})", ) for name, (config, description) in server_configs.items(): if name not in self.servers: info = self._parse_server_config( name, config, description ) if info: self.servers[name] = info discovered[name] = description logger.info( "Found MCP server %s in %s (%s)", name, config_file, source.name, ) elif source.type == "file": server_configs = self._load_server_config( source.path, source_name=source.name ) for name, (config, description) in server_configs.items(): if name not in self.servers: info = self._parse_server_config(name, config, description) if info: self.servers[name] = info discovered[name] = description logger.info( "Found MCP server %s in %s (%s)", name, source.path, source.name, ) except Exception as e: logger.warning( f"Failed to scan source {source.name} ({source.path}): {e}", exc_info=True, ) # 2. Load from environment variable (highest priority for overrides if we implemented that) env_config_path = os.environ.get("MCP_SERVERS_CONFIG") if env_config_path: try: env_server_configs = self._load_server_config( Path(env_config_path), source_name="Environment" ) for name, (config, description) in env_server_configs.items(): if ( name not in self.servers ): # Environment variable configs override or add info = self._parse_server_config(name, config, description) if info: self.servers[name] = info discovered[name] = description logger.info( "Found MCP server %s in %s (Environment)", name, env_config_path, ) except Exception as e: logger.error( f"Failed to load MCP_SERVERS_CONFIG from {env_config_path}: {e}" ) logger.info("Discovered %d MCP servers", len(self.servers)) self._discovered = True return discovered def _load_server_config( self, path: Path, source_name: str = "Config" ) -> Dict[str, Tuple[Dict[str, Any], str]]: """ Loads MCP server configuration from a JSON or TOML file. Returns a dict of server_name -> (server_config, description). """ try: with open(path, "rb") as f: if path.suffix == ".toml": if tomllib: data = tomllib.load(f) else: logger.warning( f"Skipping {path}: tomllib not available (Python < 3.11)" ) return {} else: data = json.load(f) mcp_servers = data.get("mcpServers", {}) file_description = data.get("description", "") result = {} for name, config in mcp_servers.items(): if _looks_like_self_server(config, name=name): if not _ALLOW_SELF_SERVER: logger.info( f"Skipping self-referential server '{name}' in {source_name}" ) continue server_desc = config.get("description", file_description) result[name] = (config, server_desc) return result except (json.JSONDecodeError, OSError) as e: logger.warning(f"Failed to load {source_name} from {path}: {e}") return {} def _parse_server_config( self, name: str, raw: Dict[str, object], description: str ) -> Optional[MCPServerInfo]: command = raw.get("command") if not isinstance(command, str): return None args = raw.get("args", []) if not isinstance(args, list): args = [] env = raw.get("env", {}) if not isinstance(env, dict): env = {} str_env = {str(k): str(v) for k, v in env.items()} str_args = [str(arg) for arg in args] cwd_raw = raw.get("cwd") cwd_str: Optional[str] = None if isinstance(cwd_raw, (str, Path)): cwd_str = str(cwd_raw) return MCPServerInfo( name=name, command=command, args=str_args, env=str_env, cwd=cwd_str, description=description, ) async def load_server(self, server_name: str) -> None: if server_name in self.loaded_servers: return info = self.servers.get(server_name) if not info: raise SandboxError(f"Unknown MCP server: {server_name}") # Validate cwd if provided - warn, but do not fail startup if info.cwd: try: path = Path(info.cwd) if not path.exists(): logger.warning( "Configured cwd for MCP server %s does not exist: %s", server_name, info.cwd, ) except Exception: logger.debug( "Failed to check cwd for server %s: %s", server_name, info.cwd, exc_info=True, ) client = PersistentMCPClient(info) await client.start() self.clients[server_name] = client self.loaded_servers.add(server_name) logger.info("Loaded MCP server %s", server_name) self._server_metadata_cache.pop(server_name, None) self._server_docs_cache.pop(server_name, None) self._search_index_dirty = True def _alias_for(self, name: str) -> str: if name in self._aliases: return self._aliases[name] base = re.sub(r"[^a-z0-9_]+", "_", name.lower()) or "server" if base[0].isdigit(): base = f"_{base}" alias = base suffix = 1 used = set(self._aliases.values()) while alias in used: suffix += 1 alias = f"{base}_{suffix}" self._aliases[name] = alias return alias async def _ensure_server_metadata(self, server_name: str) -> None: if server_name in self._server_metadata_cache: return client = self.clients.get(server_name) if not client: raise SandboxError(f"Server {server_name} is not loaded") client_obj = cast(ClientLike, client) tool_specs = await client_obj.list_tools() alias = self._alias_for(server_name) alias_counts: Dict[str, int] = {} tools: List[Dict[str, object]] = [] doc_entries: List[Dict[str, object]] = [] identifier_index: Dict[str, Dict[str, object]] = {} for spec in tool_specs: raw_name = str(spec.get("name") or "tool") base_alias = _sanitize_identifier(raw_name, default="tool") alias_counts[base_alias] = alias_counts.get(base_alias, 0) + 1 count = alias_counts[base_alias] tool_alias = base_alias if count == 1 else f"{base_alias}_{count}" input_schema = spec.get("input_schema") or spec.get("inputSchema") description = str(spec.get("description") or "").strip() tool_payload = { "name": raw_name, "alias": tool_alias, "description": description, "input_schema": input_schema, } tools.append(tool_payload) keywords = " ".join( filter( None, { server_name, alias, raw_name, tool_alias, description, }, ) ).lower() doc_entry = { "name": raw_name, "alias": tool_alias, "description": description, "input_schema": input_schema, "keywords": keywords, } doc_entries.append(doc_entry) identifier_index[tool_alias.lower()] = doc_entry identifier_index[raw_name.lower()] = doc_entry server_obj = self.servers.get(server_name) cwd_value = ( str(server_obj.cwd) if server_obj and getattr(server_obj, "cwd", None) else None ) metadata = { "name": server_name, "alias": alias, "tools": tools, "cwd": cwd_value, } self._server_metadata_cache[server_name] = cast(Dict[str, object], metadata) self._server_docs_cache[server_name] = cast( Dict[str, object], { "name": server_name, "alias": alias, "tools": doc_entries, "identifier_index": identifier_index, }, ) self._search_index_dirty = True async def get_cached_server_metadata(self, server_name: str) -> Dict[str, object]: await self._ensure_server_metadata(server_name) return copy.deepcopy(self._server_metadata_cache[server_name]) @staticmethod def _normalise_detail(value: object) -> str: detail = str(value).lower() if value is not None else "summary" return detail if detail in {"summary", "full"} else "summary" @staticmethod def _format_tool_doc( server_name: str, server_alias: str, info: Dict[str, object], detail: str, ) -> Dict[str, object]: doc: Dict[str, object] = { "server": server_name, "serverAlias": server_alias, "tool": info.get("name"), "toolAlias": info.get("alias"), } description = info.get("description") if description: doc["description"] = description if detail == "full" and info.get("input_schema") is not None: doc["inputSchema"] = info.get("input_schema") return doc async def get_tool_docs( self, server_name: str, *, tool: Optional[str] = None, detail: object = "summary", ) -> List[Dict[str, object]]: await self._ensure_server_metadata(server_name) cache_entry = self._server_docs_cache.get(server_name) if not cache_entry: raise SandboxError(f"Documentation unavailable for server {server_name}") detail_value = self._normalise_detail(detail) server_alias = str(cache_entry.get("alias", "")) docs: List[Dict[str, object]] = [] if tool is not None: if not isinstance(tool, str): raise SandboxError("'tool' must be a string when provided") identifier_map_raw = cache_entry.get("identifier_index", {}) identifier_map: Dict[str, Dict[str, object]] = {} if isinstance(identifier_map_raw, dict): identifier_map = cast(Dict[str, Dict[str, object]], identifier_map_raw) match = identifier_map.get(tool.lower()) if not match: raise SandboxError(f"Tool {tool!r} not found for server {server_name}") docs.append( self._format_tool_doc( server_name, server_alias, cast(Dict[str, object], match), detail_value, ) ) return docs tools_raw = cache_entry.get("tools", []) if not isinstance(tools_raw, (list, tuple)): tools_raw = [] for info_raw in tools_raw: info = cast(Dict[str, object], info_raw) docs.append( self._format_tool_doc(server_name, server_alias, info, detail_value) ) return docs def _ensure_search_index(self) -> None: if not self._search_index_dirty: return entries: List[Dict[str, object]] = [] for server_name, cache_entry in self._server_docs_cache.items(): server_alias = str(cache_entry.get("alias", "")) tools_raw = cache_entry.get("tools", []) if not isinstance(tools_raw, (list, tuple)): continue for info_raw in tools_raw: info = cast(Dict[str, object], info_raw) entries.append( { "server": server_name, "server_alias": server_alias, "info": info, "keywords": str(info.get("keywords", "")), } ) self._search_index = entries self._search_index_dirty = False async def search_tool_docs( self, query: str, *, allowed_servers: Sequence[str], limit: int = 5, detail: object = "summary", ) -> List[Dict[str, object]]: if not query.strip(): return [] for server_name in allowed_servers: await self._ensure_server_metadata(server_name) self._ensure_search_index() tokens = [token for token in query.lower().split() if token] if not tokens: return [] detail_value = self._normalise_detail(detail) allowed = set(allowed_servers) matches: List[Dict[str, object]] = [] for entry in self._search_index: if entry.get("server") not in allowed: continue keywords = str(entry.get("keywords", "")) if all(token in keywords for token in tokens): info_raw = entry.get("info", {}) info = cast(Dict[str, object], info_raw) matches.append( self._format_tool_doc( str(entry.get("server")), str(entry.get("server_alias", "")), info, detail_value, ) ) capped = max(1, min(20, limit)) return matches[:capped] async def execute_code( self, code: str, servers: Optional[Sequence[str]] = None, timeout: int = DEFAULT_TIMEOUT, ) -> SandboxResult: await self.discover_servers() request_timeout = max(1, min(MAX_TIMEOUT, timeout)) requested_servers = list(dict.fromkeys(servers or [])) for server_name in requested_servers: await self.load_server(server_name) async with SandboxInvocation(self, requested_servers) as invocation: sandbox_obj = cast(SandboxLike, self.sandbox) result = await sandbox_obj.execute( code, timeout=request_timeout, servers_metadata=invocation.server_metadata, discovered_servers=invocation.discovered_servers, container_env=invocation.container_env, volume_mounts=invocation.volume_mounts, host_dir=invocation.host_dir, rpc_handler=invocation.handle_rpc, ) if not result.success: raise SandboxError( f"Sandbox exited with code {result.exit_code}", stdout=result.stdout, stderr=result.stderr, ) return result bridge = MCPBridge() app = Server(BRIDGE_NAME) # Monkey-patch MCP server _handle_message to ignore benign JSON parse exceptions try: from mcp.server.lowlevel.server import Server as LowLevelServer _orig_handle_message = LowLevelServer._handle_message async def _patched_handle_message( self, message, session, lifespan_context, raise_exceptions: bool = False, ): # Ignore parse exceptions produced by pydantic when a blank newline is sent try: if isinstance(message, Exception): txt = str(message) if ( "Invalid JSON: EOF while parsing a value" in txt and "input_value='\\n'" in txt ): return except Exception: pass return await _orig_handle_message( self, message, session, lifespan_context, raise_exceptions ) LowLevelServer._handle_message = _patched_handle_message except Exception: # If the library structure changes or import fails, don't hard-fail; just skip the monkey-patch. pass @app.list_tools() async def list_tools() -> List[Tool]: return [ Tool( name="run_python", description=( "The Code Execution MCP engine. Executes Python code in a stateful, persistent rootless sandbox environment " "similar to a Jupyter notebook. Variables, functions, and imports are preserved across calls. " "Use this tool for general code execution, data analysis, or when the user asks to 'run code'. " "Supports loading additional MCP servers via the 'servers' array." ), inputSchema={ "type": "object", "properties": { "code": { "type": "string", "description": ( "Python source code to execute. Call runtime.capability_summary() inside the sandbox for this digest. " f"{SANDBOX_HELPERS_SUMMARY}" ), }, "servers": { "type": "array", "items": {"type": "string"}, "description": ( "Optional list of MCP servers to make available as mcp_<name> proxies" ), }, "timeout": { "type": "integer", "minimum": 1, "maximum": MAX_TIMEOUT, "default": DEFAULT_TIMEOUT, "description": "Execution timeout in seconds", }, }, "required": ["code"], }, ) ] @app.list_resources() async def list_resources() -> List[Resource]: return [_build_capability_resource()] @app.read_resource() async def read_resource(uri: str) -> str: uri_str = str(uri) if uri_str != CAPABILITY_RESOURCE_URI: raise McpError( ErrorData( code=INVALID_PARAMS, message=f"Unknown resource: {uri_str}", ) ) return _CAPABILITY_RESOURCE_TEXT @app.call_tool() async def call_tool(name: str, arguments: Dict[str, object]) -> CallToolResult: if name != "run_python": return _build_tool_response( status="error", summary=f"Unknown tool: {name}", error=f"Unknown tool: {name}", ) code = arguments.get("code") if not isinstance(code, str) or not code.strip(): return _build_tool_response( status="validation_error", summary="Missing 'code' argument", error="Missing 'code' argument", ) servers = arguments.get("servers", []) if not isinstance(servers, list): return _build_tool_response( status="validation_error", summary="'servers' must be a list", error="'servers' must be a list", ) server_list = [str(server) for server in servers] timeout_value = arguments.get("timeout", DEFAULT_TIMEOUT) if not isinstance(timeout_value, int): return _build_tool_response( status="validation_error", summary="'timeout' must be an integer", error="'timeout' must be an integer", ) timeout_value = max(1, min(MAX_TIMEOUT, timeout_value)) try: result = await bridge.execute_code(code, server_list, timeout_value) summary = "Success" if not result.stdout and not result.stderr: summary = "Success (no output)" return _build_tool_response( status="success", summary=summary, exit_code=result.exit_code, stdout=result.stdout, stderr=result.stderr, servers=server_list, ) except SandboxTimeout as exc: summary = f"Timeout: execution exceeded {timeout_value}s" return _build_tool_response( status="timeout", summary=summary, stdout=exc.stdout, stderr=exc.stderr, servers=server_list, error=str(exc), timeout_seconds=timeout_value, ) except SandboxError as exc: summary = f"Sandbox error: {exc}" return _build_tool_response( status="error", summary=summary, stdout=exc.stdout, stderr=exc.stderr, servers=server_list, error=str(exc), ) except Exception as exc: # pragma: no cover logger.error("Unexpected failure", exc_info=True) return _build_tool_response( status="error", summary="Unexpected failure", error=str(exc), ) async def main() -> None: logging.basicConfig(level=os.environ.get("MCP_BRIDGE_LOG_LEVEL", "INFO")) try: async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) except Exception: logging.exception("Fatal error in main loop") raise if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: sys.exit(0)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/elusznik/mcp-server-code-execution-mode'

If you have feedback or need assistance with the MCP directory API, please join our Discord server