Skip to main content
Glama
config_loader.py77.6 kB
# # MCP Foxxy Bridge - Configuration Loader # # Copyright (C) 2024 Billy Bryant # Portions copyright (C) 2024 Sergey Parfenyuk (original MIT-licensed author) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # # MIT License attribution: Portions of this file were originally licensed # under the MIT License by Sergey Parfenyuk (2024). # """Configuration Loading and Management for MCP Foxzy Bridge. This module provides comprehensive configuration loading, validation, and management functionality for the MCP Foxxy Bridge system, including: - JSON configuration file parsing with environment variable expansion - Command substitution with security validation - Server configuration validation and normalization - Bridge settings management - Schema validation with detailed error reporting Key Features: - Environment variable expansion with ${VAR_NAME:default} syntax - Secure command substitution with $(command) syntax - Comprehensive configuration validation - Support for multiple transport types (STDIO, SSE) - Health check configuration management - OAuth and authentication configuration - Detailed error reporting and logging Example: Basic configuration loading: >>> loader = ConfigLoader("config.json") >>> config = await loader.load_config() >>> print(f"Loaded {len(config.servers)} servers") Environment variable expansion: >>> # In config.json: {"api_key": "${API_KEY:default-key}"} >>> config = load_bridge_config("config.json", {}) >>> # api_key will be expanded from environment Command substitution: >>> # In config.json: {"token": "$(vault read -field=token secret/api)"} >>> config = load_bridge_config("config.json", {}) >>> # token will be retrieved from vault command """ import json import os import re import shlex import shutil import subprocess from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from mcp_foxxy_bridge.security.config import BridgeSecurityConfig, ServerSecurityConfig from mcp.client.stdio import StdioServerParameters from mcp_foxxy_bridge.utils.config_migration import get_config_dir try: import jsonschema JSONSCHEMA_AVAILABLE = True except ImportError: JSONSCHEMA_AVAILABLE = False from mcp_foxxy_bridge.utils.logging import get_logger logger = get_logger(__name__, facility="CONFIG") # Security constants for command validation MAX_COMMAND_LENGTH = 1000 # Maximum allowed command string length MAX_ARG_LENGTH = 500 # Maximum allowed individual argument length COMMAND_TIMEOUT = 30 # Timeout in seconds for command execution def _sanitize_command_for_logging(command: str) -> str: """Sanitize command string for safe logging by removing/escaping potentially dangerous characters.""" # Remove or escape characters that could be interpreted by shells or log viewers # Replace backticks, dollar signs, semicolons, pipes, and other shell metacharacters sanitized = re.sub(r"[`$;|&<>(){}[\]*?~]", "?", command) # Limit length to prevent log spam if len(sanitized) > 100: sanitized = sanitized[:97] + "..." # Escape any remaining quotes to prevent log injection return sanitized.replace('"', '\\"').replace("'", "\\'") def _migrate_oauth_fields(config_data: dict[str, Any]) -> bool: """Migrate legacy 'oauth' field to 'oauth_config' in server configurations. This handles backward compatibility where older configs or CLI commands may have used 'oauth' instead of the expected 'oauth_config' field name. Args: config_data: The parsed configuration data Returns: True if any migrations were performed, False otherwise """ migrated = False try: servers = config_data.get("mcpServers", {}) for server_name, server_config in servers.items(): if isinstance(server_config, dict): # Check if server has 'oauth' but not 'oauth_config' if "oauth" in server_config and "oauth_config" not in server_config: # Migrate oauth to oauth_config server_config["oauth_config"] = server_config.pop("oauth") logger.debug(f"Migrated 'oauth' to 'oauth_config' for server '{server_name}'") migrated = True elif "oauth" in server_config and "oauth_config" in server_config: # Both exist - prefer oauth_config and remove oauth server_config.pop("oauth") logger.debug(f"Removed duplicate 'oauth' field for server '{server_name}' (keeping 'oauth_config')") migrated = True except Exception as e: logger.debug(f"Error during oauth field migration: {e}") return migrated def _write_config_to_disk(config_path: str, config_data: dict[str, Any]) -> None: """Write configuration data back to disk with backup. Args: config_path: Path to the configuration file config_data: The configuration data to write """ try: config_file_path = Path(config_path) # Create backup backup_path = config_file_path.with_suffix(".json.backup") try: shutil.copy2(config_file_path, backup_path) logger.debug("Created configuration backup") except Exception as e: logger.debug(f"Could not create backup: {e}") # Write updated config with config_file_path.open("w") as f: json.dump(config_data, f, indent=2) logger.info("Updated configuration file with migrations") except Exception as e: logger.debug(f"Failed to write config to disk: {e}") def _ensure_schema_reference(config_path: str, config_data: dict[str, Any]) -> bool: """Ensure the config file has a $schema reference for IDE support. Args: config_path: Path to the configuration file config_data: The parsed configuration data Returns: True if the schema reference was added and file was updated, False otherwise """ try: # Check if schema reference already exists and is correct current_schema = config_data.get("$schema", "") # Get the config directory and build absolute path to schema config_dir = get_config_dir() schema_path = config_dir / "bridge_config_schema.json" correct_schema = str(schema_path) if current_schema == correct_schema: logger.debug("Config already has correct schema reference") return False # Add or update schema reference, ensuring it's first in the JSON # Create new ordered dict with schema first updated_config = {"$schema": correct_schema} # Add all other keys (except existing $schema if present) updated_config.update({key: value for key, value in config_data.items() if key != "$schema"}) # Write the updated config back to file config_file_path = Path(config_path) # Create backup backup_path = config_file_path.with_suffix(".json.backup") try: shutil.copy2(config_file_path, backup_path) logger.debug("Created configuration backup") except Exception as e: logger.debug(f"Could not create backup: {e}") # Write updated config with config_file_path.open("w") as f: json.dump(updated_config, f, indent=2) if current_schema: logger.info("Updated schema reference in configuration file for IDE support") else: logger.info("Added schema reference to configuration file for IDE support") return True except Exception as e: logger.debug(f"Failed to add schema reference to config: {e}") return False def _ensure_config_schema() -> None: """Copy the JSON schema to the config directory if it doesn't exist or is outdated. This ensures users always have access to the current schema for IDE auto-completion and validation, matching the exact version of the bridge they're running. """ try: # Get the config directory and schema paths config_dir = get_config_dir() config_schema_path = config_dir / "bridge_config_schema.json" # Find the schema file in the package directory # This works whether we're installed via pip or running from source current_file_dir = Path(__file__).parent.parent.parent.parent source_schema_path = current_file_dir / "bridge_config_schema.json" if not source_schema_path.exists(): logger.debug("Schema file not found in source directory, skipping schema copy") return # Check if we need to copy the schema should_copy = False if not config_schema_path.exists(): logger.debug("Config schema not found, will copy from source") should_copy = True else: # Check if the source is newer try: source_mtime = source_schema_path.stat().st_mtime config_mtime = config_schema_path.stat().st_mtime if source_mtime > config_mtime: logger.debug("Source schema is newer, will update config schema") should_copy = True except OSError: logger.debug("Could not compare schema file times, will copy to be safe") should_copy = True if should_copy: logger.info("Copying JSON schema to config directory for IDE support") shutil.copy2(source_schema_path, config_schema_path) logger.debug(f"Schema copied to: {config_schema_path}") except Exception as e: # Don't fail configuration loading if schema copy fails logger.debug(f"Failed to copy schema file: {e}") class ConfigLoader: """Comprehensive configuration loader with validation and expansion. This class provides a high-level interface for loading MCP Foxxy Bridge configurations from JSON files with support for environment variable expansion, command substitution, and comprehensive validation. Attributes: config_file_path: Path to the configuration file base_env: Base environment variables for server processes Example: >>> loader = ConfigLoader("config.json") >>> config = await loader.load_config() >>> >>> # With custom environment >>> loader = ConfigLoader("config.json", {"DEBUG": "1"}) >>> config = await loader.load_config() """ def __init__(self, config_file_path: str, base_env: dict[str, str] | None = None) -> None: """Initialize configuration loader. Args: config_file_path: Path to the JSON configuration file base_env: Base environment variables (defaults to empty dict) """ self.config_file_path = Path(config_file_path) self.base_env = base_env or {} logger.debug(f"Initialized ConfigLoader for: {config_file_path}") def load_config(self) -> "BridgeConfiguration": """Load and validate configuration from file. Returns: BridgeConfiguration object with all settings Raises: FileNotFoundError: If configuration file doesn't exist ValueError: If configuration is invalid Example: >>> loader = ConfigLoader("config.json") >>> config = loader.load_config() >>> print(f"Bridge listening on {config.bridge.host}:{config.bridge.port}") """ return load_bridge_config_from_file(str(self.config_file_path), self.base_env) def validate_config(self) -> list[str]: """Validate configuration file and return any errors. Returns: List of validation error messages (empty if valid) Example: >>> loader = ConfigLoader("config.json") >>> errors = loader.validate_config() >>> if errors: ... print("Configuration errors:", errors) """ try: self.load_config() return [] except (FileNotFoundError, json.JSONDecodeError, ValueError) as e: return [str(e)] except Exception as e: logger.warning("Unexpected configuration validation error: %s", str(e)) return [f"Unexpected configuration error: {e!s}"] # Configuration data classes @dataclass class OAuthConfig: """OAuth configuration for MCP servers. Attributes: enabled: Whether OAuth is enabled for this server issuer: OAuth issuer URL verify_ssl: Whether to verify SSL/TLS certificates keep_alive_interval: Keep-alive ping interval for OAuth servers in milliseconds token_refresh_interval: Proactive token refresh interval in milliseconds connection_check_interval: Connection health check interval in milliseconds """ enabled: bool = False issuer: str | None = None verify_ssl: bool = True keep_alive_interval: int = 20000 # 20 seconds - more frequent for OAuth token_refresh_interval: int = 1800000 # 30 minutes - proactive token refresh connection_check_interval: int = 10000 # 10 seconds - frequent connection checks # Additional fields for backward compatibility with existing configurations client_id: str | None = None authorization_url: str | None = None token_url: str | None = None type: str | None = None def __getitem__(self, key: str) -> Any: """Allow dictionary-style access for backward compatibility.""" return getattr(self, key) def get(self, key: str, default: Any = None) -> Any: """Allow dictionary-style get method for backward compatibility.""" return getattr(self, key, default) def __contains__(self, key: str) -> bool: """Allow 'key in oauth_config' checks.""" return hasattr(self, key) def to_dict(self) -> dict[str, Any]: """Convert to dictionary for backward compatibility with legacy APIs.""" return {field_name: field_value for field_name, field_value in self.__dict__.items() if field_value is not None} @dataclass class HealthCheckConfig: """Configuration for server health checks and monitoring. This class defines comprehensive health check settings including operation types, intervals, timeouts, and recovery behavior. Attributes: enabled: Whether health checks are enabled interval: Health check interval in milliseconds timeout: Health check timeout in milliseconds keep_alive_interval: Keep-alive ping interval in milliseconds keep_alive_timeout: Keep-alive ping timeout in milliseconds max_consecutive_failures: Maximum failures before marking server as failed auto_restart: Whether to automatically restart failed servers restart_delay: Delay before restart attempt in milliseconds max_restart_attempts: Maximum restart attempts before giving up operation: MCP operation to use for health checks tool_name: Specific tool name if operation is "call_tool" tool_arguments: Arguments for tool calls resource_uri: Resource URI if operation is "read_resource" prompt_name: Prompt name if operation is "get_prompt" prompt_arguments: Arguments for prompt calls http_path: Custom HTTP path for health checks http_method: HTTP method for health checks expected_status: Expected HTTP status code expected_content: Expected content substring """ enabled: bool = True interval: int = 30000 # milliseconds timeout: int = 5000 # milliseconds keep_alive_interval: int = 60000 # milliseconds - frequent keep-alive pings keep_alive_timeout: int = 10000 # milliseconds - timeout for keep-alive pings max_consecutive_failures: int = 3 # failures before marking server as failed auto_restart: bool = True # automatically restart failed servers restart_delay: int = 5000 # milliseconds - delay before restart attempt max_restart_attempts: int = 5 # maximum restart attempts before giving up # Health check operation configuration operation: str = "list_tools" # MCP operation to use for health checks tool_name: str | None = None # Specific tool name if operation is "call_tool" tool_arguments: dict[str, str] | None = None # Arguments for tool calls resource_uri: str | None = None # Resource URI if operation is "read_resource" prompt_name: str | None = None # Prompt name if operation is "get_prompt" prompt_arguments: dict[str, str] | None = None # Arguments for prompt calls # HTTP-specific health check options (for remote MCP servers) http_path: str | None = None # Custom HTTP path for health checks http_method: str = "GET" # HTTP method for health checks expected_status: int = 200 # Expected HTTP status code expected_content: str | None = None # Expected content substring @dataclass class BridgeServerConfig: """Enhanced configuration for a single MCP server in the bridge. This class represents the complete configuration for an individual MCP server including connection details, authentication, health checks, and namespace settings. Attributes: name: Server name (must be unique) enabled: Whether the server is enabled command: Command to execute for STDIO servers args: Command-line arguments env: Environment variables for the server process timeout: Server startup timeout in seconds transport_type: Transport protocol ("stdio" or "sse") url: URL for SSE transport servers retry_attempts: Number of connection retry attempts retry_delay: Delay between retries in milliseconds health_check: Health check configuration tool_namespace: Namespace prefix for tools resource_namespace: Namespace prefix for resources prompt_namespace: Namespace prefix for prompts priority: Server priority for conflict resolution tags: Server tags for filtering and grouping log_level: Logging level for this server oauth_config: OAuth configuration dictionary authentication: General authentication configuration headers: Custom HTTP headers verify_ssl: Whether to verify SSL/TLS certificates working_directory: Working directory for server process Example: >>> config = BridgeServerConfig( ... name="filesystem", ... command="npx", ... args=["-y", "@modelcontextprotocol/server-filesystem", "./"], ... transport_type="stdio" ... ) >>> print(f"Server enabled: {config.enabled}") """ name: str enabled: bool = True command: str = "" args: list[str] | None = None env: dict[str, str] | None = None timeout: int = 60 transport_type: str = "stdio" url: str | None = None # URL for SSE transport retry_attempts: int = 3 retry_delay: int = 1000 # milliseconds health_check: HealthCheckConfig | None = None tool_namespace: str | None = None resource_namespace: str | None = None prompt_namespace: str | None = None priority: int = 100 tags: list[str] | None = None log_level: str = "ERROR" # Default to quiet (only errors) oauth_config: OAuthConfig | None = None # OAuth configuration authentication: dict[str, Any] | None = None # General authentication config headers: dict[str, str] | None = None # Custom headers for HTTP requests verify_ssl: bool = True # SSL/TLS verification for HTTPS connections working_directory: str | None = None # Working directory for server process security: "ServerSecurityConfig | None" = None # Security configuration for this server def __post_init__(self) -> None: """Initialize default values for optional fields.""" if self.args is None: self.args = [] if self.env is None: self.env = {} if self.health_check is None: self.health_check = HealthCheckConfig() if self.tags is None: self.tags = [] def is_oauth_enabled(self) -> bool: """Check if OAuth is enabled for this server. Returns: True if OAuth is enabled, False otherwise Example: >>> if server.is_oauth_enabled(): ... print("OAuth authentication required") """ if self.oauth_config is None: return False return self.oauth_config.enabled def needs_oauth_proxy(self) -> bool: """Check if this server needs OAuth proxy routes (not passthrough). Returns: True if OAuth proxy is needed, False otherwise Example: >>> if server.needs_oauth_proxy(): ... print("Setting up OAuth proxy routes") """ if not self.is_oauth_enabled(): return False # Passthrough OAuth means the server handles its own OAuth flow if not self.oauth_config: return False oauth_type = self.oauth_config.get("type", "proxy") result: bool = oauth_type != "passthrough" return result @dataclass class AggregationConfig: """Configuration for capability aggregation. Controls which types of capabilities are aggregated from multiple servers into the main bridge endpoint. """ tools: bool = True resources: bool = True prompts: bool = True @dataclass class FailoverConfig: """Configuration for server failover behavior. Controls how the bridge handles server failures and recovery operations. """ enabled: bool = True max_failures: int = 3 recovery_interval: int = 60000 # milliseconds @dataclass class ConfigReloadConfig: """Configuration for dynamic config file reloading. Controls whether and how configuration files are monitored for changes and reloaded automatically. """ enabled: bool = True debounce_ms: int = 1000 # milliseconds validate_only: bool = False # if true, only validate but don't apply changes @dataclass class BridgeConfig: """Configuration for bridge-specific behavior. This class contains all bridge-level settings including networking, aggregation, failover, and operational behavior. Attributes: conflict_resolution: How to handle conflicts between servers default_namespace: Whether to use default namespacing aggregation: Capability aggregation configuration failover: Server failover configuration config_reload: Configuration reloading settings host: Host address to bind to port: Port number to listen on mcp_log_level: Default log level for MCP servers """ conflict_resolution: str = "namespace" # priority, namespace, first, error default_namespace: bool = True aggregation: AggregationConfig | None = None failover: FailoverConfig | None = None config_reload: ConfigReloadConfig | None = None host: str = "127.0.0.1" # Default to localhost for security port: int = 8080 # Default port oauth_port: int = 8090 # Dedicated OAuth port (always consistent, independent of bridge port) mcp_log_level: str = "ERROR" # Default log level for all MCP servers allow_command_substitution: bool = False # Enable command substitution in configuration allowed_commands: list[str] | None = None # Whitelist of allowed commands for substitution allow_dangerous_commands: bool = False # UNSAFE: Allow any command without validation read_only_mode: bool = True # Block write operations when True (defaults to True for security) security: "BridgeSecurityConfig | None" = None # Security configuration for the bridge def __post_init__(self) -> None: """Initialize default values for bridge configuration.""" if self.aggregation is None: self.aggregation = AggregationConfig() if self.failover is None: self.failover = FailoverConfig() if self.config_reload is None: self.config_reload = ConfigReloadConfig() @dataclass class BridgeConfiguration: """Complete bridge configuration including all servers and bridge settings. This is the top-level configuration object that contains all server configurations and bridge-level settings. Attributes: servers: Dictionary of server configurations by name bridge: Bridge-level configuration settings Example: >>> config = BridgeConfiguration( ... servers={"fs": filesystem_config}, ... bridge=BridgeConfig(port=8080) ... ) >>> print(f"Bridge has {len(config.servers)} servers") """ servers: dict[str, BridgeServerConfig] bridge: BridgeConfig | None = None def __post_init__(self) -> None: """Initialize default bridge configuration.""" if self.bridge is None: self.bridge = BridgeConfig() # Utility functions for configuration processing def normalize_server_name(server_name: str) -> str: """Normalize server name for URL-safe usage. Converts server names to lowercase, replaces spaces and special characters with underscores, and ensures the name is URL-safe for use in endpoints. Args: server_name: The original server name from configuration Returns: Normalized server name suitable for URLs Example: >>> normalize_server_name("File System") 'file_system' >>> normalize_server_name("Example API") 'example_api' >>> normalize_server_name("My_Special Server!") 'my_special_server' """ # Convert to lowercase normalized = server_name.lower() # Replace spaces, hyphens, and other non-alphanumeric chars with underscores normalized = re.sub(r"[^a-z0-9]+", "_", normalized) # Remove leading/trailing underscores normalized = normalized.strip("_") # Ensure we don't have empty string or just underscores if not normalized or normalized == "_": normalized = "unnamed_server" return normalized def expand_environment_variables(value: Any) -> Any: """Recursively expand environment variables and command substitutions. Supports both ${VAR_NAME:default} syntax for environment variables and $(command) syntax for secure command substitution. Args: value: The configuration value to expand (str, dict, list, or other) Returns: The value with environment variables and command substitutions expanded Example: >>> os.environ["API_KEY"] = "secret123" >>> expand_environment_variables("${API_KEY}") 'secret123' >>> expand_environment_variables("${MISSING:default}") 'default' >>> expand_environment_variables("$(echo hello)") 'hello' """ return expand_env_vars(value) # Command substitution with security validation def get_default_allowed_commands() -> set[str]: """Get the default set of allowed commands for command substitution. Returns: Set of command names that are considered safe for read-only operations """ return { # Basic safe output commands "echo", "printf", # Date/time (read-only) "date", # Environment info (read-only) "whoami", "hostname", "pwd", "uname", # Secret management tools (read-only operations only) "op", # 1Password CLI "vault", # HashiCorp Vault # Base64 encoding/decoding "base64", # JSON processing (safe) "jq", # Environment variable expansion "printenv", # Git operations (read-only) "git", # GitHub CLI (read-only operations) "gh", # Text processing (read-only) "grep", "cat", "head", "tail", # Network tools (read-only) "curl", "wget", } def validate_command_security( cmd_parts: list[str], allowed_commands: set[str] | None = None, allow_dangerous: bool | None = None ) -> None: """Validate command using an allow list approach for maximum security. Only explicitly allowed commands can be executed in command substitution. This prevents accidental execution of dangerous operations. Args: cmd_parts: List of command parts from shlex.split() allowed_commands: Set of allowed commands, defaults to safe read-only commands allow_dangerous: If True, skip all security validation (UNSAFE!) Raises: ValueError: If command is not in the allow list Example: >>> validate_command_security(["echo", "hello"]) # OK >>> validate_command_security(["rm", "-rf", "/"]) # Raises ValueError """ if not cmd_parts: return # Validate command length to prevent resource exhaustion full_command_string = " ".join(cmd_parts) if len(full_command_string) > MAX_COMMAND_LENGTH: raise ValueError("Command too long") # Validate individual argument lengths for arg in cmd_parts: if len(arg) > MAX_ARG_LENGTH: raise ValueError("Command validation failed") # Check for dangerous commands bypass (UNSAFE MODE) if allow_dangerous is None: allow_dangerous = os.getenv("MCP_ALLOW_DANGEROUS_COMMANDS", "false").lower() in ("true", "1", "yes", "on") if allow_dangerous: logger.warning(f"UNSAFE MODE: Allowing potentially dangerous command without validation: {' '.join(cmd_parts)}") return # Skip all validation! command = cmd_parts[0].lower() # Use provided allowed commands or get default safe commands if allowed_commands is None: allowed_commands = get_default_allowed_commands() else: # Convert to lowercase for comparison allowed_commands = {cmd.lower() for cmd in allowed_commands} # Also check environment variable for additional commands env_commands = os.getenv("MCP_ALLOWED_COMMANDS", "") if env_commands: # Add environment variable commands to the allowed set additional_commands = {cmd.strip().lower() for cmd in env_commands.split(",") if cmd.strip()} allowed_commands = allowed_commands.union(additional_commands) if command not in allowed_commands: raise ValueError(f"Command '{command}' not in allow list") # Enhanced validation: check for dangerous arguments if command in ["vault", "op"]: # Validate that these commands are used in read-only mode if "write" in " ".join(cmd_parts).lower() or "delete" in " ".join(cmd_parts).lower(): raise ValueError(f"Write operations not allowed for {command}") full_command_string = " ".join(cmd_parts) dangerous_patterns = ["|", "||", "&", "&&", ";", "`", ">", ">>", "<", "$()"] fork_bomb_patterns = [ ":()", ":bomb:", "while true", "for((;;))", "while(1)", "while :;", "while :", "exec", "ulimit -u unlimited", ] resource_exhaustion_patterns = [ "dd if=/dev/zero", "dd if=/dev/urandom", "yes", "cat /dev/zero", ">/dev/random", "mkfifo", "nohup", "disown", "setsid", "tail -f /dev/null", ] for pattern in dangerous_patterns: if pattern in full_command_string: raise ValueError("Command validation failed") for pattern in fork_bomb_patterns: if pattern.lower() in full_command_string.lower(): raise ValueError("Command validation failed") for pattern in resource_exhaustion_patterns: if pattern.lower() in full_command_string.lower(): raise ValueError("Command validation failed") suspicious_exact = ["sudo", "su", "chmod", "chown"] suspicious_substring = ["/bin/", "/usr/bin/", "$(", "`", "/dev/zero", "/dev/random", ">/tmp/", ">>/tmp/"] for arg in cmd_parts: if arg.lower() in suspicious_exact: raise ValueError("Command validation failed") for suspicious in suspicious_substring: if suspicious in arg.lower(): raise ValueError("Command validation failed") # Validate specific commands for read-only operations _validate_command_args(command, cmd_parts) def _validate_command_args(command: str, cmd_parts: list[str]) -> None: """Validate command arguments to ensure read-only operations. Args: command: The base command (already lowercased) cmd_parts: Full command parts including the command itself Raises: ValueError: If command contains write/destructive operations """ args = [arg.lower() for arg in cmd_parts[1:]] # Skip the command itself args_str = " ".join(args) if command == "git": # Only allow safe read-only git operations allowed_git_ops = { "status", "log", "show", "diff", "branch", "tag", "remote", "config", "rev-parse", "ls-files", "ls-tree", "cat-file", "describe", "blame", "shortlog", "reflog", "symbolic-ref", } if args: git_subcommand = args[0] if git_subcommand not in allowed_git_ops: raise ValueError( f"SECURITY: Git operation '{git_subcommand}' blocked - only read-only git " f"operations allowed to prevent repository modification. " f"Allowed: {', '.join(sorted(allowed_git_ops))}" ) # Check for dangerous flags even in allowed operations dangerous_git_flags = ["--force", "-f", "--delete", "-d"] for flag in dangerous_git_flags: if flag in args: raise ValueError(f"Dangerous git flag '{flag}' not allowed") elif command == "vault": # Only allow read operations if not args or args[0] not in ["read", "kv", "list", "auth", "status", "version"]: raise ValueError("Only vault read operations allowed (read, kv get, list, auth, status, version)") # Additional validation for kv operations if len(args) >= 2 and args[0] == "kv" and args[1] not in ["get", "list", "metadata"]: raise ValueError("Only vault kv read operations allowed (get, list, metadata)") # Check for write/delete flags write_flags = ["put", "delete", "destroy", "undelete", "patch"] if any(flag in args_str for flag in write_flags): raise ValueError("Vault write/delete operations not allowed") elif command == "op": # 1Password CLI - only allow read operations if not args or args[0] not in ["read", "get", "list", "whoami", "signin", "signout"]: raise ValueError( "SECURITY: 1Password operation blocked - only read-only operations " "allowed to prevent credential modification. Allowed: read, get, list, whoami, signin, signout" ) # Check for dangerous op flags write_flags = ["create", "edit", "delete", "archive", "restore"] if any(flag in args for flag in write_flags): raise ValueError( "SECURITY: 1Password write operation blocked - command substitution only allows " "credential reading, not modification" ) elif command == "gh": # GitHub CLI - only allow read operations if not args: return # `gh` by itself is fine read_only_gh_ops = { "repo", "issue", "pr", "release", "gist", "auth", "config", "status", "browse", "search", "api", "alias", "completion", } gh_subcommand = args[0] if gh_subcommand not in read_only_gh_ops: raise ValueError( f"SECURITY: GitHub CLI operation '{gh_subcommand}' blocked - only read-only " f"operations allowed to prevent repository/issue modification. " f"Allowed: {', '.join(sorted(read_only_gh_ops))}" ) # Check for write flags even in allowed operations if len(args) > 1: write_flags = ["create", "edit", "delete", "close", "merge", "reopen"] if any(flag in args[1:] for flag in write_flags): raise ValueError( "SECURITY: GitHub CLI write operation blocked - command substitution only allows " "reading repository/issue data, not modification" ) elif command in {"curl", "wget"}: # Network tools - check for upload/POST operations if any(flag in args for flag in ["-X", "--request", "--data", "--upload-file", "-T", "--form", "-F"]): raise ValueError( f"SECURITY: {command.title()} upload/POST operation blocked - command substitution " "only allows safe data retrieval, not data transmission" ) elif command == "cat": # Ensure we're not trying to write (though cat typically can't write) if ">" in args_str or ">>" in args_str: raise ValueError("File write operations not allowed with cat") def execute_command_substitution( command: str, allow_substitution: bool | None = None, allowed_commands: set[str] | None = None ) -> str: """Execute a command and return its output for command substitution. Args: command: The command to execute allow_substitution: Override to allow/disallow substitution, if None uses env var allowed_commands: Set of allowed commands for security validation Returns: The command output with trailing whitespace stripped Raises: ValueError: If command execution fails or contains dangerous operations Example: >>> execute_command_substitution("echo hello") 'hello' >>> execute_command_substitution("date +%Y") '2024' """ # Check if command substitution is allowed if allow_substitution is None: # Fall back to environment variable env_substitution = os.getenv("MCP_ALLOW_COMMAND_SUBSTITUTION", "false").lower() substitution_allowed = env_substitution in ("true", "1", "yes", "on") else: substitution_allowed = allow_substitution if not substitution_allowed: raise ValueError( "Command substitution is disabled by default for security. " "Set MCP_ALLOW_COMMAND_SUBSTITUTION=true to enable." ) try: # Validate command length before parsing if len(command) > MAX_COMMAND_LENGTH: raise ValueError("Command too long") # Parse command safely using shlex cmd_parts = shlex.split(command) if not cmd_parts: raise ValueError("Empty command in substitution") # Validate command for security issues validate_command_security(cmd_parts, allowed_commands) # Execute command with security considerations # S603: subprocess call with shell=False is safe with validated commands # S602: shell=False prevents shell injection, cmd_parts are validated above result = subprocess.run( # noqa: S603 cmd_parts, capture_output=True, text=True, timeout=COMMAND_TIMEOUT, # 30 second timeout check=True, shell=False, # Explicitly disable shell for security env=os.environ.copy(), # Inherit current environment ) # Return output with trailing whitespace stripped output = result.stdout.rstrip() logger.debug( "Command substitution '%s' completed successfully (%d chars)", _sanitize_command_for_logging(command), len(output), ) return output except subprocess.TimeoutExpired as e: error_msg = f"Command substitution timed out: {_sanitize_command_for_logging(command)}" logger.exception("Command substitution timed out: %s", _sanitize_command_for_logging(command)) raise ValueError(error_msg) from e except subprocess.CalledProcessError as e: error_msg = f"Command substitution failed: {_sanitize_command_for_logging(command)} (exit code {e.returncode})" if e.stderr: logger.debug("Command substitution stderr available (%d chars)", len(e.stderr.strip())) logger.exception("Command substitution failed: %s", _sanitize_command_for_logging(command)) raise ValueError(error_msg) from e except (OSError, ValueError) as e: error_msg = f"Invalid command substitution: {_sanitize_command_for_logging(command)} - {e}" logger.exception("Invalid command substitution: %s", _sanitize_command_for_logging(command)) raise ValueError(error_msg) from e def expand_env_vars( value: Any, allow_command_substitution: bool | None = None, allowed_commands: set[str] | None = None ) -> Any: """Recursively expand environment variables and command substitutions in configuration values. Supports: - ${VAR_NAME} syntax with optional defaults: ${VAR_NAME:default_value} - $(command) syntax for command substitution (bash-style) Args: value: The configuration value to expand (can be str, dict, list, or other) allow_command_substitution: Override to allow/disallow command substitution allowed_commands: Set of allowed commands for security validation Returns: The value with environment variables and command substitutions expanded """ if isinstance(value, str): # First expand command substitutions $(command) cmd_pattern = r"\$\(([^)]+)\)" def replace_command(match: re.Match[str]) -> str: command = match.group(1).strip() original_match = match.group(0) # Full original match including $() logger.debug(f"Executing command substitution: {command}") try: return execute_command_substitution(command, allow_command_substitution, allowed_commands) except ValueError as e: # Log the error but return the original substitution pattern unchanged # This allows the bridge to continue running with non-critical command failures logger.warning( f"Command substitution failed for '{command}': {e}. " "Leaving pattern unchanged to allow bridge to continue." ) # Return the original pattern so users can see what failed return original_match except Exception: # Handle any unexpected errors logger.exception( f"Unexpected error in command substitution for '{command}'. " "Leaving pattern unchanged to allow bridge to continue." ) return original_match # Apply command substitutions first value = re.sub(cmd_pattern, replace_command, value) # Then expand environment variables ${VAR_NAME} or ${VAR_NAME:default} env_pattern = r"\$\{([^}:]+)(?::([^}]*))?\}" def replace_env_var(match: re.Match[str]) -> str: var_name = match.group(1) default_value = match.group(2) if match.group(2) is not None else "" env_value = os.getenv(var_name, default_value) if env_value == "" and match.group(2) is None: logger.warning(f"Environment variable '{var_name}' not found and no default provided") return env_value return re.sub(env_pattern, replace_env_var, value) if isinstance(value, dict): return {k: expand_env_vars(v, allow_command_substitution, allowed_commands) for k, v in value.items()} if isinstance(value, list): return [expand_env_vars(item, allow_command_substitution, allowed_commands) for item in value] return value # Configuration validation functions def validate_server_config(name: str, server_config: dict[str, Any]) -> list[str]: """Validate individual server configuration and return list of warnings. Args: name: The server name server_config: The server configuration to validate Returns: List of warning messages Example: >>> config = {"command": "python", "args": ["server.py"]} >>> warnings = validate_server_config("test", config) >>> if not warnings: ... print("Configuration is valid") """ warnings = [] # Check required fields based on transport type transport_type = server_config.get("transport", "stdio") if transport_type == "stdio": if not server_config.get("command"): warnings.append(f"Server '{name}' missing required 'command' field for stdio transport") elif transport_type in ["sse", "http"] and not server_config.get("url"): warnings.append(f"Server '{name}' missing required 'url' field for {transport_type} transport") # Check args format args = server_config.get("args", []) if not isinstance(args, list): warnings.append(f"Server '{name}' has invalid 'args' field (must be array)") elif not all(isinstance(arg, str) for arg in args): warnings.append(f"Server '{name}' has non-string values in 'args' array") # Check env format env = server_config.get("env", {}) if not isinstance(env, dict): warnings.append(f"Server '{name}' has invalid 'env' field (must be object)") elif not all(isinstance(k, str) and isinstance(v, str) for k, v in env.items()): warnings.append(f"Server '{name}' has non-string keys/values in 'env' object") # Check timeout value timeout = server_config.get("timeout", 60) if not isinstance(timeout, (int, float)) or timeout <= 0: warnings.append(f"Server '{name}' has invalid 'timeout' value (must be positive number)") # Check retry settings retry_attempts = server_config.get("retryAttempts", 3) if not isinstance(retry_attempts, int) or retry_attempts < 0: warnings.append(f"Server '{name}' has invalid 'retryAttempts' value (must be non-negative integer)") retry_delay = server_config.get("retryDelay", 1000) if not isinstance(retry_delay, (int, float)) or retry_delay < 0: warnings.append(f"Server '{name}' has invalid 'retryDelay' value (must be non-negative number)") # Check priority priority = server_config.get("priority", 100) if not isinstance(priority, (int, float)) or priority < 0: warnings.append(f"Server '{name}' has invalid 'priority' value (must be non-negative number)") # Check tags tags = server_config.get("tags", []) if not isinstance(tags, list): warnings.append(f"Server '{name}' has invalid 'tags' field (must be array)") elif not all(isinstance(tag, str) for tag in tags): warnings.append(f"Server '{name}' has non-string values in 'tags' array") # Check namespace values for namespace_field in ["toolNamespace", "resourceNamespace", "promptNamespace"]: namespace = server_config.get(namespace_field) if namespace is not None and not isinstance(namespace, str): warnings.append(f"Server '{name}' has invalid '{namespace_field}' value (must be string)") elif namespace is not None and not namespace.strip(): warnings.append(f"Server '{name}' has empty '{namespace_field}' value") # Validate health check configuration health_check = server_config.get("healthCheck", {}) if not isinstance(health_check, dict): warnings.append(f"Server '{name}' has invalid 'healthCheck' field (must be object)") else: # Validate numeric fields with minimum values numeric_fields = [ ("interval", 1000), ("timeout", 1000), ("keepAliveInterval", 1000), ("keepAliveTimeout", 1000), ("maxConsecutiveFailures", 1), ("restartDelay", 0), ("maxRestartAttempts", 1), ("expectedStatus", 100), ] for field, min_val in numeric_fields: value = health_check.get(field) if value is not None and (not isinstance(value, (int, float)) or value < min_val): warnings.append(f"Server '{name}' has invalid healthCheck.{field} value (must be >= {min_val})") # Validate operation field operation = health_check.get("operation", "list_tools") valid_operations = [ "list_tools", "list_resources", "list_prompts", "call_tool", "read_resource", "get_prompt", "ping", "health", "status", ] if operation not in valid_operations: warnings.append( f"Server '{name}' has invalid healthCheck.operation '{operation}' (must be one of {valid_operations})" ) # Validate operation-specific requirements if operation == "call_tool" and not health_check.get("toolName"): warnings.append(f"Server '{name}' healthCheck operation 'call_tool' requires 'toolName'") elif operation == "read_resource" and not health_check.get("resourceUri"): warnings.append(f"Server '{name}' healthCheck operation 'read_resource' requires 'resourceUri'") elif operation == "get_prompt" and not health_check.get("promptName"): warnings.append(f"Server '{name}' healthCheck operation 'get_prompt' requires 'promptName'") # Validate HTTP-specific fields http_method = health_check.get("httpMethod", "GET") if http_method not in ["GET", "POST", "PUT", "HEAD"]: warnings.append(f"Server '{name}' has invalid healthCheck.httpMethod '{http_method}'") expected_status = health_check.get("expectedStatus", 200) if expected_status is not None and (expected_status < 100 or expected_status > 599): warnings.append( f"Server '{name}' has invalid healthCheck.expectedStatus '{expected_status}' (must be 100-599)" ) return warnings def normalize_server_config(server_config: dict[str, Any]) -> dict[str, Any]: """Normalize server configuration with default values. Args: server_config: Raw server configuration Returns: Normalized server configuration with defaults applied Example: >>> config = {"command": "python"} >>> normalized = normalize_server_config(config) >>> print(normalized["enabled"]) # True (default) """ normalized = server_config.copy() # Apply defaults normalized.setdefault("enabled", True) normalized.setdefault("args", []) normalized.setdefault("env", {}) normalized.setdefault("timeout", 60) normalized.setdefault("transport_type", "stdio") normalized.setdefault("retry_attempts", 3) normalized.setdefault("retry_delay", 1000) normalized.setdefault("priority", 100) normalized.setdefault("tags", []) normalized.setdefault("log_level", "ERROR") normalized.setdefault("verify_ssl", True) return normalized # Main configuration loading functions def load_bridge_config( config_file_path: str, base_env: dict[str, str] | None = None, allow_command_substitution: bool | None = None ) -> BridgeConfiguration: """Load bridge configuration from file. Convenience function for loading configuration without creating a ConfigLoader instance. Args: config_file_path: Path to the JSON configuration file base_env: Base environment variables (defaults to empty dict) allow_command_substitution: Whether to allow command substitution (overrides config file setting) Returns: BridgeConfiguration object with all settings Example: >>> config = load_bridge_config("config.json") >>> print(f"Loaded {len(config.servers)} servers") """ return load_bridge_config_from_file(config_file_path, base_env or {}, allow_command_substitution) def load_bridge_config_from_file( config_file_path: str, base_env: dict[str, str], allow_command_substitution: bool | None = None ) -> BridgeConfiguration: """Load enhanced bridge configuration from a JSON file. This is the main configuration loading function that handles file parsing, environment variable expansion, validation, and object creation. Args: config_file_path: Path to the JSON configuration file base_env: The base environment dictionary to be inherited by servers allow_command_substitution: Override for command substitution setting Returns: A BridgeConfiguration object with all server and bridge settings Raises: FileNotFoundError: If the config file is not found json.JSONDecodeError: If the config file contains invalid JSON ValueError: If the config file format is invalid Example: >>> config = load_bridge_config_from_file("config.json", {"DEBUG": "1"}) >>> for name, server in config.servers.items(): ... print(f"Server: {name}, Enabled: {server.enabled}") """ logger.info(f"Loading bridge configuration from: {config_file_path}") # Ensure schema is available in config directory for IDE support _ensure_config_schema() # Load and parse JSON file try: with Path(config_file_path).open() as f: config_data = json.load(f) except FileNotFoundError: logger.exception(f"Configuration file not found: {config_file_path}") raise except json.JSONDecodeError: logger.exception(f"Error decoding JSON from configuration file: {config_file_path}") raise except Exception as e: logger.exception(f"Unexpected error opening or reading configuration file {config_file_path}") error_message = f"Could not read configuration file: {e}" raise ValueError(error_message) from e if not isinstance(config_data, dict) or "mcpServers" not in config_data: msg = f"Invalid config file format in {config_file_path}. Missing 'mcpServers' key." logger.error(msg) raise ValueError(msg) # Ensure config has schema reference for IDE support schema_updated = _ensure_schema_reference(config_file_path, config_data) # Migrate legacy oauth field names oauth_migrated = _migrate_oauth_fields(config_data) # If either migration happened, reload to get the current state and apply all changes if schema_updated or oauth_migrated: # Reload to get schema reference that was written to disk try: with Path(config_file_path).open() as f: temp_config = json.load(f) # Apply oauth migration to the reloaded config (in case schema was updated) if oauth_migrated: _migrate_oauth_fields(temp_config) # Write the complete updated config _write_config_to_disk(config_file_path, temp_config) config_data = temp_config except Exception as e: logger.warning(f"Failed to consolidate migrations: {e}") # Reload config from disk if it was modified to ensure we have the updated version config_needs_reload = schema_updated or oauth_migrated if config_needs_reload: logger.debug("Reloading config from disk after migrations to prevent expansion persistence bug") try: with Path(config_file_path).open() as f: config_data = json.load(f) except Exception as e: logger.warning(f"Failed to reload config after migrations: {e}") # CRITICAL: Preserve original config data to prevent expansions from being persisted to disk original_config_data = json.loads(json.dumps(config_data)) # Load bridge configuration settings bridge_config = _load_bridge_settings(config_data, allow_command_substitution) logger.debug(f"Bridge config loaded: substitution={bridge_config.allow_command_substitution}") # Expand environment variables for internal use only logger.debug("Expanding environment variables in server configurations") default_allowed_commands = get_default_allowed_commands() expanded_config_data = expand_env_vars( config_data, bridge_config.allow_command_substitution, default_allowed_commands ) # Validate configuration against schema try: validate_bridge_config(expanded_config_data) except ValueError: logger.exception(f"Configuration validation failed for {config_file_path}") raise # Load server configurations with expanded variables servers, config_updated, name_mappings = _load_server_configurations( expanded_config_data, config_file_path, base_env ) # Save normalized config if server names were updated (use original data to preserve expansions) if config_updated: _apply_server_name_mappings(original_config_data, name_mappings) _save_normalized_config(original_config_data, config_file_path) # Show warning if dangerous commands are enabled via config if bridge_config.allow_dangerous_commands: logger.warning("DANGER: UNSAFE MODE ENABLED via configuration!") logger.warning("'allow_dangerous_commands: true' found in bridge config") logger.warning("Command substitution validation is DISABLED!") logger.warning("Any command can execute including rm, curl uploads, etc.") logger.warning("Only use this for testing/development environments!") return BridgeConfiguration(servers=servers, bridge=bridge_config) def _load_bridge_settings(config_data: dict[str, Any], allow_command_substitution: bool | None = None) -> BridgeConfig: """Load bridge configuration settings from config data. Args: config_data: The full configuration dictionary allow_command_substitution: CLI override for command substitution setting Returns: BridgeConfig object with all bridge settings """ bridge_data = config_data.get("bridge", {}) # Parse aggregation config aggregation_data = bridge_data.get("aggregation", {}) aggregation = AggregationConfig( tools=aggregation_data.get("tools", True), resources=aggregation_data.get("resources", True), prompts=aggregation_data.get("prompts", True), ) # Parse failover config failover_data = bridge_data.get("failover", {}) failover = FailoverConfig( enabled=failover_data.get("enabled", True), max_failures=failover_data.get("maxFailures", 3), recovery_interval=failover_data.get("recoveryInterval", 60000), ) # Parse config reload config config_reload_data = bridge_data.get("configReload", {}) config_reload = ConfigReloadConfig( enabled=config_reload_data.get("enabled", False), debounce_ms=config_reload_data.get("debounceMs", 1000), validate_only=config_reload_data.get("validateOnly", False), ) # Get command substitution setting - CLI parameter overrides config file, then env var, then default False config_allow_substitution = bridge_data.get("allow_command_substitution", False) if allow_command_substitution is not None: # CLI parameter takes highest priority effective_allow_substitution = allow_command_substitution elif config_allow_substitution: # Config file setting takes second priority effective_allow_substitution = config_allow_substitution else: # Fall back to environment variable as third priority env_substitution = os.getenv("MCP_ALLOW_COMMAND_SUBSTITUTION", "false").lower() effective_allow_substitution = env_substitution in ("true", "1", "yes", "on") return BridgeConfig( conflict_resolution=bridge_data.get("conflictResolution", "namespace"), default_namespace=bridge_data.get("defaultNamespace", True), aggregation=aggregation, failover=failover, config_reload=config_reload, host=bridge_data.get("host", "127.0.0.1"), port=bridge_data.get("port", 8080), oauth_port=bridge_data.get("oauth_port", bridge_data.get("port", 8080)), mcp_log_level=bridge_data.get("mcp_log_level", "ERROR"), allow_command_substitution=effective_allow_substitution, allowed_commands=bridge_data.get("allowed_commands"), allow_dangerous_commands=bridge_data.get("allow_dangerous_commands", False), read_only_mode=bridge_data.get("read_only_mode", True), security=_load_bridge_security_config(bridge_data.get("security", {})), ) def _load_bridge_security_config(security_data: dict[str, Any]) -> "BridgeSecurityConfig | None": """Load bridge security configuration from config data. Args: security_data: Security configuration dictionary Returns: BridgeSecurityConfig object or None if no security config """ if not security_data: return None from mcp_foxxy_bridge.security.config import BridgeSecurityConfig, ToolSecurityConfig # noqa: PLC0415 # Load tool security config if present tool_config = None if "tool_security" in security_data: tool_data = security_data["tool_security"] tool_config = ToolSecurityConfig( allow_patterns=tool_data.get("allow_patterns", []), block_patterns=tool_data.get("block_patterns", []), allow_tools=tool_data.get("allow_tools", []), block_tools=tool_data.get("block_tools", []), classification_overrides=tool_data.get("classification_overrides", {}), ) return BridgeSecurityConfig( read_only_mode=security_data.get("read_only_mode", True), tools=tool_config, ) def _load_server_configurations( config_data: dict[str, Any], config_file_path: str, base_env: dict[str, str] ) -> tuple[dict[str, BridgeServerConfig], bool, dict[str, str]]: """Load and parse server configurations from config data. Args: config_data: The full configuration dictionary with expanded variables config_file_path: Path to config file for error reporting base_env: Base environment variables for servers Returns: Tuple of (server configs dict, config_updated flag, name_mappings dict) """ servers = {} config_updated = False mcp_servers = config_data.get("mcpServers", {}) normalized_servers = {} name_mappings = {} # Track original_name -> normalized_name mappings for name, server_config in mcp_servers.items(): if not isinstance(server_config, dict): logger.warning( f"Skipping invalid server config for '{name}' in {config_file_path}. Entry is not a dictionary." ) continue # Validate server configuration and log warnings warnings = validate_server_config(name, server_config) for warning in warnings: logger.warning(warning) # Create health check config health_check_data = server_config.get("healthCheck", {}) health_check = HealthCheckConfig( enabled=health_check_data.get("enabled", True), interval=health_check_data.get("interval", 30000), timeout=health_check_data.get("timeout", 5000), keep_alive_interval=health_check_data.get("keepAliveInterval", 60000), keep_alive_timeout=health_check_data.get("keepAliveTimeout", 10000), max_consecutive_failures=health_check_data.get("maxConsecutiveFailures", 3), auto_restart=health_check_data.get("autoRestart", True), restart_delay=health_check_data.get("restartDelay", 5000), max_restart_attempts=health_check_data.get("maxRestartAttempts", 5), operation=health_check_data.get("operation", "list_tools"), tool_name=health_check_data.get("toolName"), tool_arguments=health_check_data.get("toolArguments"), resource_uri=health_check_data.get("resourceUri"), prompt_name=health_check_data.get("promptName"), prompt_arguments=health_check_data.get("promptArguments"), http_path=health_check_data.get("httpPath"), http_method=health_check_data.get("httpMethod", "GET"), expected_status=health_check_data.get("expectedStatus", 200), expected_content=health_check_data.get("expectedContent"), ) # Create server environment server_env = base_env.copy() server_env.update(server_config.get("env", {})) # Normalize server name for consistency with OAuth token storage from mcp_foxxy_bridge.oauth.utils import _validate_server_name # noqa: PLC0415 normalized_name = _validate_server_name(name) # Track if server name was normalized (changed) if normalized_name != name: config_updated = True logger.debug(f"Normalized server name '{name}' -> '{normalized_name}'") name_mappings[name] = normalized_name # Store normalized server config for potential config file update normalized_servers[normalized_name] = server_config # Create OAuth configuration oauth_data = server_config.get("oauth_config", {}) oauth_config = None if oauth_data: oauth_config = OAuthConfig( enabled=oauth_data.get("enabled", False), issuer=oauth_data.get("issuer"), verify_ssl=oauth_data.get("verify_ssl", True), keep_alive_interval=oauth_data.get("keepAliveInterval", 20000), token_refresh_interval=oauth_data.get("tokenRefreshInterval", 1800000), connection_check_interval=oauth_data.get("connectionCheckInterval", 10000), client_id=oauth_data.get("client_id"), authorization_url=oauth_data.get("authorization_url"), token_url=oauth_data.get("token_url"), type=oauth_data.get("type"), ) # Create server configuration server = BridgeServerConfig( name=normalized_name, enabled=server_config.get("enabled", True), command=server_config.get("command", ""), args=server_config.get("args", []), env=server_env, timeout=server_config.get("timeout", 60), transport_type=server_config.get("transport", "stdio"), url=server_config.get("url"), retry_attempts=server_config.get("retryAttempts", 3), retry_delay=server_config.get("retryDelay", 1000), health_check=health_check, tool_namespace=server_config.get("toolNamespace"), resource_namespace=server_config.get("resourceNamespace"), prompt_namespace=server_config.get("promptNamespace"), priority=server_config.get("priority", 100), tags=server_config.get("tags", []), log_level=server_config.get("log_level", "ERROR"), oauth_config=oauth_config, authentication=server_config.get("authentication"), headers=server_config.get("headers"), verify_ssl=server_config.get("verify_ssl", True), working_directory=server_config.get("working_directory") or server_config.get("cwd"), ) # Validate required fields based on transport type if server.transport_type in ["sse", "http"]: if not server.url: logger.warning( f"{server.transport_type.upper()} server '{name}' from config is missing 'url'. Skipping." ) continue # Default to stdio - requires command elif server.transport_type == "stdio" and not server.command: logger.warning(f"STDIO server '{name}' from config is missing 'command'. Skipping.") continue if not isinstance(server.args, list): logger.warning(f"Named server '{name}' from config has invalid 'args' (must be a list). Skipping.") continue servers[normalized_name] = server logger.debug(f'MCP Server configured: {name} - "{server.command}" {" ".join(server.args)}') return servers, config_updated, name_mappings def _apply_server_name_mappings(config_data: dict[str, Any], name_mappings: dict[str, str]) -> None: """Apply server name normalizations to config data. Args: config_data: The configuration data to modify name_mappings: Dictionary mapping original_name -> normalized_name """ if not name_mappings: return mcp_servers = config_data.get("mcpServers", {}) new_mcp_servers = {} for original_name, server_config in mcp_servers.items(): # Use normalized name if available, otherwise keep original normalized_name = name_mappings.get(original_name, original_name) new_mcp_servers[normalized_name] = server_config config_data["mcpServers"] = new_mcp_servers def _save_normalized_config(config_data: dict[str, Any], config_file_path: str) -> None: """Save the config file with normalized server names. Args: config_data: Updated configuration data with normalized server names config_file_path: Path to the config file to update """ try: config_path = Path(config_file_path) # Create backup of original config backup_path = config_path.with_suffix(config_path.suffix + ".backup") if config_path.exists() and not backup_path.exists(): shutil.copy2(config_path, backup_path) logger.debug(f"Created backup: {backup_path}") # Write normalized config with config_path.open("w", encoding="utf-8") as f: json.dump(config_data, f, indent=2, ensure_ascii=False) # Set secure permissions config_path.chmod(0o600) logger.debug("Updated config file with normalized server names.") except Exception as e: logger.warning(f"Failed to save normalized config: {e}") def validate_bridge_config(config_data: dict[str, Any]) -> None: """Validate bridge configuration against JSON schema. Args: config_data: The configuration data to validate Raises: ValueError: If the configuration is invalid Example: >>> config = {"mcpServers": {"fs": {"command": "npx"}}} >>> validate_bridge_config(config) # No exception = valid """ if not JSONSCHEMA_AVAILABLE: logger.warning("jsonschema not available, skipping configuration validation") return # JSON Schema for configuration validation schema = { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { "mcpServers": { "type": "object", "patternProperties": { "^[a-zA-Z0-9_-]+$": { "type": "object", "properties": { "enabled": {"type": "boolean"}, "command": {"type": "string"}, "args": {"type": "array", "items": {"type": "string"}}, "env": { "type": "object", "additionalProperties": {"type": "string"}, }, "timeout": {"type": "number", "minimum": 1}, "transport": { "type": "string", "enum": ["stdio", "sse", "http"], }, "url": {"type": "string", "format": "uri"}, "retryAttempts": {"type": "number", "minimum": 0}, "retryDelay": {"type": "number", "minimum": 0}, "priority": {"type": "number", "minimum": 0}, "tags": {"type": "array", "items": {"type": "string"}}, "headers": { "type": "object", "additionalProperties": {"type": "string"}, }, "working_directory": {"type": ["string", "null"]}, }, "anyOf": [ { "properties": {"transport": {"const": "stdio"}}, "required": ["command"], }, { "properties": {"transport": {"const": "sse"}}, "required": ["url"], }, { "properties": {"transport": {"const": "http"}}, "required": ["url"], }, ], }, }, }, "bridge": { "type": "object", "properties": { "conflictResolution": { "type": "string", "enum": ["priority", "namespace", "first", "error"], }, "defaultNamespace": {"type": "boolean"}, "host": {"type": "string"}, "port": {"type": "number", "minimum": 1, "maximum": 65535}, "oauth_port": {"type": "number", "minimum": 1, "maximum": 65535}, "mcp_log_level": {"type": "string"}, }, }, }, "required": ["mcpServers"], } try: jsonschema.validate(config_data, schema) # type: ignore[no-untyped-call] except jsonschema.ValidationError as e: logger.exception("Configuration validation failed") msg = f"Invalid configuration: {e.message}" raise ValueError(msg) from e except Exception as e: logger.exception("Unexpected error during configuration validation") msg = f"Configuration validation error: {e}" raise ValueError(msg) from e # Legacy compatibility functions def load_named_server_configs_from_file( config_file_path: str, base_env: dict[str, str], ) -> dict[str, StdioServerParameters]: """Load named server configurations in legacy format. This function provides compatibility with the original stdio-only configuration format for backwards compatibility. Args: config_file_path: Path to the JSON configuration file base_env: The base environment dictionary to be inherited by servers Returns: A dictionary of named server parameters Raises: FileNotFoundError: If the config file is not found json.JSONDecodeError: If the config file contains invalid JSON ValueError: If the config file format is invalid """ # Load the full bridge configuration bridge_config = load_bridge_config_from_file(config_file_path, base_env) # Convert to legacy format return bridge_config_to_stdio_params(bridge_config) def bridge_config_to_stdio_params( bridge_config: BridgeConfiguration, ) -> dict[str, StdioServerParameters]: """Convert BridgeConfiguration to the legacy StdioServerParameters format. Args: bridge_config: The bridge configuration to convert Returns: A dictionary of named server parameters compatible with existing code Example: >>> config = load_bridge_config("config.json") >>> stdio_params = bridge_config_to_stdio_params(config) >>> for name, params in stdio_params.items(): ... print(f"Server: {name}, Command: {params.command}") """ stdio_params = {} for name, server in bridge_config.servers.items(): if not server.enabled: logger.info(f"Named server '{name}' is disabled. Skipping.") continue # Only include STDIO servers in legacy format if server.transport_type == "stdio": stdio_params[name] = StdioServerParameters( command=server.command, args=server.args or [], env=server.env or {}, cwd=None, ) return stdio_params

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/billyjbryant/mcp-foxxy-bridge'

If you have feedback or need assistance with the MCP directory API, please join our Discord server