Skip to main content
Glama
mcp_registry.py6.42 kB
""" MCP server discovery registry. Manages .nisaba/mcp_servers.json for server registration and discovery. Thread-safe registry for tracking running MCP server instances. """ import json import logging import os from pathlib import Path from typing import Dict, Optional from datetime import datetime from nisaba.workspace_files import WorkspaceFiles logger = logging.getLogger(__name__) class MCPServerRegistry: """ Thread-safe registry for MCP server instances. Manages .nisaba/mcp_servers.json in project root for server discovery. Automatically cleans up stale entries (dead processes). Uses WorkspaceFiles singleton for shared cache consistency across components. """ REGISTRY_VERSION = "1.0" def __init__(self, registry_path: Optional[Path] = None): """ Initialize registry. Args: registry_path: Path to registry JSON file (legacy parameter). Ignored in favor of WorkspaceFiles singleton. Logs warning if different from singleton path. """ # Use shared singleton instead of creating own instance self._file = WorkspaceFiles.instance().mcp_servers self.registry_path = self._file.file_path # Validate registry_path if provided (backward compatibility) if registry_path is not None and Path(registry_path) != self.registry_path: logger.warning( f"Custom registry_path {registry_path} ignored, " f"using WorkspaceFiles singleton: {self.registry_path}" ) logger.debug(f"MCPServerRegistry initialized via WorkspaceFiles singleton: {self.registry_path}") def register_server(self, server_id: str, server_info: dict) -> None: """ Register or update a server entry. Atomically updates registry file with new server information. Cleans up stale entries before writing. Args: server_id: Unique server identifier (e.g., "nabu_12345") server_info: Server metadata dict with keys: - name: Server name - pid: Process ID - stdio_active: Whether STDIO transport is active - http: HTTP transport info (enabled, host, port, url) - started_at: ISO timestamp - cwd: Working directory """ try: def update_registry(data: dict) -> dict: """Modifier function for atomic update.""" # Ensure structure if "servers" not in data: data["servers"] = {} if "version" not in data: data["version"] = self.REGISTRY_VERSION # Cleanup stale entries data["servers"] = self._cleanup_stale_entries(data["servers"]) # Add/update this server data["servers"][server_id] = server_info return data # Atomic update using JsonStructuredFile self._file.atomic_update_json(update_registry) logger.info(f"Registered server: {server_id}") except Exception as e: logger.error(f"Failed to register server {server_id}: {e}", exc_info=True) raise def unregister_server(self, server_id: str) -> None: """ Remove a server entry from registry. Uses atomic transaction to prevent races. Args: server_id: Server identifier to remove """ try: if not self.registry_path.exists(): logger.debug(f"Registry does not exist, nothing to unregister: {server_id}") return def remove_server(data: dict) -> dict: """Modifier function for atomic removal.""" if "servers" in data and server_id in data["servers"]: del data["servers"][server_id] logger.info(f"Unregistered server: {server_id}") else: logger.debug(f"Server not in registry: {server_id}") return data # Atomic update using JsonStructuredFile self._file.atomic_update_json(remove_server) except Exception as e: logger.error(f"Failed to unregister server {server_id}: {e}", exc_info=True) raise def list_servers(self) -> Dict[str, dict]: """ List all active servers. Filters out entries for dead processes. Returns: Dict mapping server_id -> server_info for active servers """ try: if not self.registry_path.exists(): return {} data = self._file.load_json() servers = data.get("servers", {}) # Filter to only alive processes return self._cleanup_stale_entries(servers) except Exception as e: logger.error(f"Failed to list servers: {e}", exc_info=True) return {} def _is_process_alive(self, pid: int) -> bool: """ Check if process with given PID is alive. Uses os.kill(pid, 0) which doesn't send a signal but checks existence. Args: pid: Process ID to check Returns: True if process exists, False otherwise """ try: os.kill(pid, 0) return True except OSError: return False except Exception as e: logger.warning(f"Error checking PID {pid}: {e}") return False def _cleanup_stale_entries(self, servers: Dict[str, dict]) -> Dict[str, dict]: """ Remove entries for dead processes. Args: servers: Dict of server_id -> server_info Returns: Filtered dict with only alive servers """ alive = {} for server_id, info in servers.items(): pid = info.get("pid") if pid is None: logger.warning(f"Server entry missing PID: {server_id}") continue if self._is_process_alive(pid): alive[server_id] = info else: logger.debug(f"Removing stale entry for dead process: {server_id} (PID {pid})") return alive

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server