Skip to main content
Glama
mcp_client.py10.3 kB
"""Functional helpers for managing the MCP server subprocess.""" from __future__ import annotations import json import logging import select import subprocess from typing import Dict, Iterable, Optional DEFAULT_TIMEOUT_SECONDS = 5.0 LOGGER = logging.getLogger(__name__) def dumps_json(payload: Dict[str, object]) -> str: """Serialize dictionaries to JSON with stable formatting.""" return json.dumps(payload, indent=2, sort_keys=True) def loads_json(content: str) -> Dict[str, object]: """Parse JSON content into a dictionary.""" data = json.loads(content) if not isinstance(data, dict): raise ValueError("Configuration JSON must deserialize into a dictionary") return data def _normalize_timeout(value: Optional[object]) -> float: if value is None: return DEFAULT_TIMEOUT_SECONDS if isinstance(value, (int, float)): return float(value) message = "Timeout must be numeric" raise ValueError(message) def _validate_config(config: Dict[str, object]) -> Dict[str, object]: if not isinstance(config, dict): raise ValueError("Configuration must be provided as a dictionary") server_path = config.get("path") node_path = config.get("node_path") if not server_path: raise ValueError("Configuration requires 'path'") if not node_path: raise ValueError("Configuration requires 'node_path'") normalized: Dict[str, object] = {} normalized["server_path"] = str(server_path) normalized["node_path"] = str(node_path) normalized["timeout"] = _normalize_timeout(config.get("timeout_seconds")) process = config.get("process") normalized["process"] = process normalized["active_contexts"] = 0 return normalized def create_client(config: Dict[str, object]) -> Dict[str, object]: """Create a client dictionary from an in-memory configuration.""" validated = _validate_config(config) client: Dict[str, object] = {} for key, value in validated.items(): client[key] = value return client def create_client_from_file(config_path: str) -> Dict[str, object]: """Load configuration from a JSON file and create a client.""" with open(config_path, "r", encoding="utf-8") as handle: content = handle.read() config = loads_json(content) return create_client(config) def popen_launch(executable: str, args: Iterable[str]) -> subprocess.Popen: """Launch the MCP server process using subprocess.Popen.""" arguments_list = [] for item in args: arguments_list.append(item) return subprocess.Popen( # nosec B603 [executable] + arguments_list, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) def _ensure_not_running(client: Dict[str, object]) -> None: process = client.get("process") if process is not None: if process.poll() is None: raise RuntimeError("MCP client already running") client["process"] = None def start_client(client: Dict[str, object]) -> bool: """Start the MCP server subprocess for the given client.""" _ensure_not_running(client) executable = client["node_path"] args = [client["server_path"]] LOGGER.info("Starting MCP server", extra={"executable": executable, "args": args}) try: process = popen_launch(executable, args) except OSError as error: LOGGER.error("Failed to start MCP server", exc_info=True) message = f"Failed to start MCP server: {error}" raise RuntimeError(message) from error client["process"] = process return True def _wait_for_process(process: subprocess.Popen, timeout: float) -> None: try: process.wait(timeout=timeout) except TimeoutError: raise except subprocess.TimeoutExpired: raise TimeoutError() def stop_client(client: Dict[str, object]) -> bool: """Stop the MCP server subprocess if it is running.""" process = client.get("process") if process is None: LOGGER.debug("Stop requested but no active process") return False timeout = float(client.get("timeout", DEFAULT_TIMEOUT_SECONDS)) try: LOGGER.info("Terminating MCP server process") process.terminate() except Exception: LOGGER.warning("Failed to terminate process cleanly", exc_info=True) client["process"] = None return True try: _wait_for_process(process, timeout) except TimeoutError: LOGGER.warning("Process did not exit before timeout", extra={"timeout": timeout}) client["process"] = None return True client["process"] = None return True def create_request_id_generator(): """Return a callable that generates unique JSON-RPC request identifiers.""" state = {"value": 0} def _next_id(): state["value"] += 1 return str(state["value"]) return _next_id def build_json_rpc_request(method: str, params: Dict[str, object], generator) -> Dict[str, object]: """Construct a JSON-RPC 2.0 request dictionary.""" if generator is None: generator = create_request_id_generator() request_id = generator() request: Dict[str, object] = { "jsonrpc": "2.0", "id": request_id, "method": method, "params": params, } return request def send_json_rpc_request(client: Dict[str, object], request: Dict[str, object]) -> None: """Serialize and send a JSON-RPC request via the client's subprocess stdin.""" process = client.get("process") if process is None or not hasattr(process, "stdin"): raise RuntimeError("MCP client process is not running") LOGGER.debug( "Sending JSON-RPC request", extra={"method": request.get("method"), "id": request.get("id")}, ) serialized = dumps_json(request) + "\n" process.stdin.write(serialized) process.stdin.flush() def read_json_rpc_response(client: Dict[str, object]) -> Dict[str, object]: """Read and deserialize a JSON-RPC response from the client's subprocess stdout.""" process = client.get("process") if process is None or not hasattr(process, "stdout"): raise RuntimeError("MCP client process is not running") timeout = float(client.get("timeout", DEFAULT_TIMEOUT_SECONDS)) while True: line = _readline_with_timeout(process.stdout, timeout) if line == "": raise RuntimeError("No response received from MCP server") stripped = line.strip() if stripped: try: response = json.loads(stripped) except json.JSONDecodeError as error: LOGGER.error("Failed to decode MCP response", extra={"payload": stripped}) raise RuntimeError("Invalid JSON-RPC response format") from error if not isinstance(response, dict): raise RuntimeError("Invalid JSON-RPC response format") LOGGER.debug( "Received JSON-RPC response", extra={"keys": list(response.keys())}, ) return response def _get_or_create_request_id_generator(client: Dict[str, object]): generator = client.get("_request_id_generator") if generator is None: generator = create_request_id_generator() client["_request_id_generator"] = generator return generator def invoke_tool(client: Dict[str, object], method: str, params: Dict[str, object]) -> Dict[str, object]: """Send a JSON-RPC request and return the result field from the response.""" generator = _get_or_create_request_id_generator(client) request = build_json_rpc_request(method, params, generator) send_json_rpc_request(client, request) response = read_json_rpc_response(client) if "error" in response: error = response["error"] message = "MCP server returned an error" if isinstance(error, dict) and "message" in error: message = str(error.get("message")) LOGGER.error("MCP server returned error", extra={"message": message}) raise RuntimeError(message) result = response.get("result") if result is None: raise RuntimeError("MCP server response missing result field") if isinstance(result, dict): normalized: Dict[str, object] = {} for key, value in result.items(): normalized[key] = value LOGGER.debug("Parsed JSON-RPC result dictionary", extra={"keys": list(normalized.keys())}) return normalized LOGGER.debug("Wrapped non-dict JSON-RPC result", extra={"type": type(result).__name__}) return {"value": result} def _readline_with_timeout(stream, timeout: float) -> str: """Read a line from stream, respecting timeout when possible.""" if timeout is None: return stream.readline() if hasattr(stream, "fileno"): try: fileno_method = stream.fileno except AttributeError: fileno_method = None if fileno_method is not None: try: _ = fileno_method() readable, _, _ = select.select([stream], [], [], timeout) if not readable: raise TimeoutError() except (OSError, ValueError): pass return stream.readline() def use_client(client: Dict[str, object]): """Return a context manager that manages MCP client lifecycle.""" state = {"entered": False, "started": False} def __enter__(self): if state["entered"]: raise RuntimeError("Client context already in use") active = int(client.get("active_contexts", 0)) if active > 0: raise RuntimeError("Client context already active") state["entered"] = True client["active_contexts"] = active + 1 start_client(client) state["started"] = True return client def __exit__(self, exc_type, exc, exc_tb): if state["started"]: stop_client(client) state["started"] = False client["active_contexts"] = 0 state["entered"] = False return False context_type = type("_ClientContext", (), {"__enter__": __enter__, "__exit__": __exit__}) return context_type()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mberjans/google-jules-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server