Skip to main content
Glama

Gemini MCP Server

registry.pyโ€ข8.93 kB
"""Configuration registry for clink CLI integrations.""" from __future__ import annotations import json import logging import shlex from collections.abc import Iterable from pathlib import Path from clink.constants import ( CONFIG_DIR, DEFAULT_TIMEOUT_SECONDS, INTERNAL_DEFAULTS, PROJECT_ROOT, USER_CONFIG_DIR, CLIInternalDefaults, ) from clink.models import ( CLIClientConfig, CLIRoleConfig, ResolvedCLIClient, ResolvedCLIRole, ) from utils.env import get_env from utils.file_utils import read_json_file logger = logging.getLogger("clink.registry") CONFIG_ENV_VAR = "CLI_CLIENTS_CONFIG_PATH" class RegistryLoadError(RuntimeError): """Raised when configuration files are invalid or missing critical data.""" class ClinkRegistry: """Loads CLI client definitions and exposes them for schema generation/runtime use.""" def __init__(self) -> None: self._clients: dict[str, ResolvedCLIClient] = {} self._load() def _load(self) -> None: self._clients.clear() for config_path in self._iter_config_files(): try: data = read_json_file(str(config_path)) except json.JSONDecodeError as exc: raise RegistryLoadError(f"Invalid JSON in {config_path}: {exc}") from exc if not data: logger.debug("Skipping empty configuration file: %s", config_path) continue config = CLIClientConfig.model_validate(data) resolved = self._resolve_config(config, source_path=config_path) key = resolved.name.lower() if key in self._clients: logger.info("Overriding CLI configuration for '%s' from %s", resolved.name, config_path) else: logger.debug("Loaded CLI configuration for '%s' from %s", resolved.name, config_path) self._clients[key] = resolved if not self._clients: raise RegistryLoadError( "No CLI clients configured. Ensure conf/cli_clients contains at least one definition or set " f"{CONFIG_ENV_VAR}." ) def reload(self) -> None: """Reload configurations from disk.""" self._load() def list_clients(self) -> list[str]: return sorted(client.name for client in self._clients.values()) def list_roles(self, cli_name: str) -> list[str]: config = self.get_client(cli_name) return sorted(config.roles.keys()) def get_client(self, cli_name: str) -> ResolvedCLIClient: key = cli_name.lower() if key not in self._clients: available = ", ".join(self.list_clients()) raise KeyError(f"CLI '{cli_name}' is not configured. Available clients: {available}") return self._clients[key] # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _iter_config_files(self) -> Iterable[Path]: search_paths: list[Path] = [] # 1. Built-in configs search_paths.append(CONFIG_DIR) # 2. CLI_CLIENTS_CONFIG_PATH environment override (file or directory) env_path_raw = get_env(CONFIG_ENV_VAR) if env_path_raw: env_path = Path(env_path_raw).expanduser() search_paths.append(env_path) # 3. User overrides in ~/.zen/cli_clients search_paths.append(USER_CONFIG_DIR) seen: set[Path] = set() for base in search_paths: if not base: continue if base in seen: continue seen.add(base) if base.is_file() and base.suffix.lower() == ".json": yield base continue if base.is_dir(): for path in sorted(base.glob("*.json")): if path.is_file(): yield path else: logger.debug("Configuration path does not exist: %s", base) def _resolve_config(self, raw: CLIClientConfig, *, source_path: Path) -> ResolvedCLIClient: if not raw.name: raise RegistryLoadError(f"CLI configuration at {source_path} is missing a 'name' field") normalized_name = raw.name.strip() internal_defaults = INTERNAL_DEFAULTS.get(normalized_name.lower()) if internal_defaults is None: raise RegistryLoadError(f"CLI '{raw.name}' is not supported by clink") executable = self._resolve_executable(raw, internal_defaults, source_path) internal_args = list(internal_defaults.additional_args) if internal_defaults else [] config_args = list(raw.additional_args) timeout_seconds = raw.timeout_seconds or ( internal_defaults.timeout_seconds if internal_defaults else DEFAULT_TIMEOUT_SECONDS ) parser_name = internal_defaults.parser if not parser_name: raise RegistryLoadError( f"CLI '{raw.name}' must define a parser either in configuration or internal defaults" ) runner_name = internal_defaults.runner if internal_defaults else None env = self._merge_env(raw, internal_defaults) working_dir = self._resolve_optional_path(raw.working_dir, source_path.parent) roles = self._resolve_roles(raw, internal_defaults, source_path) output_to_file = raw.output_to_file return ResolvedCLIClient( name=normalized_name, executable=executable, internal_args=internal_args, config_args=config_args, env=env, timeout_seconds=int(timeout_seconds), parser=parser_name, runner=runner_name, roles=roles, output_to_file=output_to_file, working_dir=working_dir, ) def _resolve_executable( self, raw: CLIClientConfig, internal_defaults: CLIInternalDefaults | None, source_path: Path, ) -> list[str]: command = raw.command if not command: raise RegistryLoadError(f"CLI '{raw.name}' must specify a 'command' in configuration") return shlex.split(command) def _merge_env( self, raw: CLIClientConfig, internal_defaults: CLIInternalDefaults | None, ) -> dict[str, str]: merged: dict[str, str] = {} if internal_defaults and internal_defaults.env: merged.update(internal_defaults.env) merged.update(raw.env) return merged def _resolve_roles( self, raw: CLIClientConfig, internal_defaults: CLIInternalDefaults | None, source_path: Path, ) -> dict[str, ResolvedCLIRole]: roles: dict[str, CLIRoleConfig] = dict(raw.roles) default_role_prompt = internal_defaults.default_role_prompt if internal_defaults else None if "default" not in roles: roles["default"] = CLIRoleConfig(prompt_path=default_role_prompt) elif roles["default"].prompt_path is None and default_role_prompt: roles["default"].prompt_path = default_role_prompt resolved: dict[str, ResolvedCLIRole] = {} for role_name, role_config in roles.items(): prompt_path_str = role_config.prompt_path or default_role_prompt if not prompt_path_str: raise RegistryLoadError(f"Role '{role_name}' for CLI '{raw.name}' must define a prompt_path") prompt_path = self._resolve_prompt_path(prompt_path_str, source_path.parent) resolved[role_name] = ResolvedCLIRole( name=role_name, prompt_path=prompt_path, role_args=list(role_config.role_args), description=role_config.description, ) return resolved def _resolve_prompt_path(self, prompt_path: str, base_dir: Path) -> Path: resolved = self._resolve_path(prompt_path, base_dir) if not resolved.exists(): raise RegistryLoadError(f"Prompt file not found: {resolved}") return resolved def _resolve_optional_path(self, candidate: str | None, base_dir: Path) -> Path | None: if not candidate: return None return self._resolve_path(candidate, base_dir) def _resolve_path(self, candidate: str, base_dir: Path) -> Path: path = Path(candidate) if path.is_absolute(): return path candidate_path = (base_dir / path).resolve() if candidate_path.exists(): return candidate_path project_relative = (PROJECT_ROOT / path).resolve() return project_relative _REGISTRY: ClinkRegistry | None = None def get_registry() -> ClinkRegistry: global _REGISTRY if _REGISTRY is None: _REGISTRY = ClinkRegistry() return _REGISTRY

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BeehiveInnovations/gemini-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server