Skip to main content
Glama
validation.py29.5 kB
# # MCP Foxxy Bridge - Configuration Validation # # Copyright (C) 2024 Billy Bryant # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # """Configuration Validation Utilities. This module provides comprehensive validation functionality for MCP Foxxy Bridge configurations, including schema validation, constraint checking, and detailed error reporting. Key Features: - JSON Schema validation for configuration structure - Business logic validation for configuration constraints - Detailed error reporting with suggestions - Validation for different configuration aspects - Support for custom validation rules Example: Basic validation: >>> errors = validate_bridge_config(config_data) >>> if errors: ... for error in errors: ... print(f"Error: {error}") Schema validation: >>> try: ... validate_config_schema(config_data) ... print("Schema is valid") ... except ConfigValidationError as e: ... print(f"Schema error: {e}") """ import re import urllib.parse from typing import Any try: import jsonschema JSONSCHEMA_AVAILABLE = True except ImportError: JSONSCHEMA_AVAILABLE = False from mcp_foxxy_bridge.utils.logging import get_logger logger = get_logger(__name__, facility="CONFIG") class ConfigValidationError(Exception): """Exception raised when configuration validation fails. This exception provides detailed information about configuration validation failures including the specific error, location, and suggested fixes. Attributes: message: Human-readable error message path: Path to the configuration element that failed validation suggestion: Optional suggestion for fixing the error Example: >>> try: ... validate_config_schema(config) ... except ConfigValidationError as e: ... print(f"Error at {e.path}: {e.message}") ... if e.suggestion: ... print(f"Suggestion: {e.suggestion}") """ def __init__(self, message: str, path: str | None = None, suggestion: str | None = None) -> None: """Initialize configuration validation error. Args: message: Human-readable error message path: Path to the configuration element that failed suggestion: Optional suggestion for fixing the error """ self.message = message self.path = path self.suggestion = suggestion full_message = message if path: full_message = f"At {path}: {message}" if suggestion: full_message += f" (Suggestion: {suggestion})" super().__init__(full_message) class ValidationResult: """Container for validation results with errors and warnings. This class provides a structured way to collect and report validation issues with different severity levels. Attributes: errors: List of error messages warnings: List of warning messages is_valid: Whether validation passed (no errors) Example: >>> result = ValidationResult() >>> result.add_error("Missing required field", "servers.fs.command") >>> result.add_warning("Deprecated option", "servers.fs.legacy_mode") >>> if not result.is_valid: ... print("Validation failed") """ def __init__(self) -> None: """Initialize empty validation result.""" self.errors: list[str] = [] self.warnings: list[str] = [] @property def is_valid(self) -> bool: """Check if validation passed (no errors).""" return len(self.errors) == 0 def add_error(self, message: str, path: str | None = None) -> None: """Add an error to the validation result. Args: message: Error message path: Optional path to the problematic configuration element """ if path: self.errors.append(f"At {path}: {message}") else: self.errors.append(message) def add_warning(self, message: str, path: str | None = None) -> None: """Add a warning to the validation result. Args: message: Warning message path: Optional path to the configuration element """ if path: self.warnings.append(f"At {path}: {message}") else: self.warnings.append(message) def merge(self, other: "ValidationResult") -> None: """Merge another validation result into this one. Args: other: Another ValidationResult to merge """ self.errors.extend(other.errors) self.warnings.extend(other.warnings) # Core validation functions def validate_bridge_config(config_data: dict[str, Any]) -> list[str]: """Validate complete bridge configuration and return list of errors. Performs comprehensive validation including schema validation, business logic checks, and constraint validation. Args: config_data: The configuration data to validate Returns: List of error messages (empty if configuration is valid) Example: >>> config = {"mcpServers": {"fs": {"command": "npx"}}} >>> errors = validate_bridge_config(config) >>> if not errors: ... print("Configuration is valid") """ result = ValidationResult() # Schema validation try: validate_config_schema(config_data) except ConfigValidationError as e: result.add_error(e.message, e.path) # Business logic validation _validate_business_logic(config_data, result) # Constraint validation _validate_constraints(config_data, result) return result.errors def validate_config_schema(config_data: dict[str, Any]) -> None: """Validate configuration against JSON schema. Args: config_data: The configuration data to validate Raises: ConfigValidationError: If schema validation fails Example: >>> try: ... validate_config_schema(config) ... print("Schema is valid") ... except ConfigValidationError as e: ... print(f"Schema error: {e}") """ if not JSONSCHEMA_AVAILABLE: logger.warning("jsonschema not available, skipping schema validation") return schema = _get_configuration_schema() try: jsonschema.validate(config_data, schema) # type: ignore[no-untyped-call] except jsonschema.ValidationError as e: path = ".".join(str(p) for p in e.path) if e.path else "root" suggestion = _get_schema_error_suggestion(e) raise ConfigValidationError(message=e.message, path=path, suggestion=suggestion) from e except Exception as e: raise ConfigValidationError( message=f"Schema validation failed: {e}", suggestion="Check that your configuration file is valid JSON", ) from e def get_validation_errors(config_data: dict[str, Any]) -> ValidationResult: """Get comprehensive validation results with errors and warnings. Args: config_data: The configuration data to validate Returns: ValidationResult with detailed error and warning information Example: >>> result = get_validation_errors(config) >>> for error in result.errors: ... print(f"ERROR: {error}") >>> for warning in result.warnings: ... print(f"WARNING: {warning}") """ result = ValidationResult() # Schema validation try: validate_config_schema(config_data) except ConfigValidationError as e: result.add_error(e.message, e.path) # Business logic validation _validate_business_logic(config_data, result) # Constraint validation _validate_constraints(config_data, result) # Additional warnings _add_configuration_warnings(config_data, result) return result # Internal validation functions def _get_configuration_schema() -> dict[str, Any]: """Get the JSON schema for configuration validation.""" return { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { "mcpServers": { "type": "object", "patternProperties": { "^[a-zA-Z0-9_-]+$": { "type": "object", "properties": { "enabled": {"type": "boolean"}, "command": {"type": "string"}, "args": {"type": "array", "items": {"type": "string"}}, "env": {"type": "object", "additionalProperties": {"type": "string"}}, "timeout": {"type": "number", "minimum": 1}, "transport": {"type": "string", "enum": ["stdio", "sse", "http"]}, "url": {"type": "string"}, "retryAttempts": {"type": "number", "minimum": 0}, "retryDelay": {"type": "number", "minimum": 0}, "priority": {"type": "number", "minimum": 0}, "tags": {"type": "array", "items": {"type": "string"}}, "toolNamespace": {"type": "string"}, "resourceNamespace": {"type": "string"}, "promptNamespace": {"type": "string"}, "log_level": {"type": "string"}, "verify_ssl": {"type": "boolean"}, "oauth_config": { "type": "object", "properties": { "enabled": {"type": ["boolean", "string"]}, "type": {"type": "string"}, "client_id": {"type": "string"}, "client_secret": {"type": "string"}, }, }, "authentication": { "type": "object", "properties": { "type": { "type": "string", "enum": ["bearer", "api_key", "basic", "custom"], }, "token": {"type": "string"}, "key": {"type": "string"}, "header": {"type": "string"}, "username": {"type": "string"}, "password": {"type": "string"}, "headers": { "type": "object", "additionalProperties": {"type": "string"}, }, }, }, "headers": { "type": "object", "additionalProperties": {"type": "string"}, }, "healthCheck": { "type": "object", "properties": { "enabled": {"type": "boolean"}, "interval": {"type": "number", "minimum": 1000}, "timeout": {"type": "number", "minimum": 1000}, "keepAliveInterval": {"type": "number", "minimum": 1000}, "keepAliveTimeout": {"type": "number", "minimum": 1000}, "maxConsecutiveFailures": {"type": "number", "minimum": 1}, "autoRestart": {"type": "boolean"}, "restartDelay": {"type": "number", "minimum": 0}, "maxRestartAttempts": {"type": "number", "minimum": 1}, "operation": { "type": "string", "enum": [ "list_tools", "list_resources", "list_prompts", "call_tool", "read_resource", "get_prompt", "ping", "health", "status", ], }, "toolName": {"type": "string"}, "toolArguments": { "type": "object", "additionalProperties": {"type": "string"}, }, "resourceUri": {"type": "string"}, "promptName": {"type": "string"}, "promptArguments": { "type": "object", "additionalProperties": {"type": "string"}, }, "httpPath": {"type": "string"}, "httpMethod": { "type": "string", "enum": ["GET", "POST", "PUT", "HEAD"], }, "expectedStatus": { "type": "number", "minimum": 100, "maximum": 599, }, "expectedContent": {"type": "string"}, }, }, }, "anyOf": [ { "properties": {"transport": {"const": "stdio"}}, "required": ["command"], }, { "properties": {"transport": {"const": "sse"}}, "required": ["url"], }, { "properties": {"transport": {"const": "http"}}, "required": ["url"], }, ], }, }, }, "bridge": { "type": "object", "properties": { "conflictResolution": { "type": "string", "enum": ["priority", "namespace", "first", "error"], }, "defaultNamespace": {"type": "boolean"}, "host": {"type": "string"}, "port": {"type": "number", "minimum": 1, "maximum": 65535}, "mcp_log_level": {"type": "string"}, "aggregation": { "type": "object", "properties": { "tools": {"type": "boolean"}, "resources": {"type": "boolean"}, "prompts": {"type": "boolean"}, }, }, "failover": { "type": "object", "properties": { "enabled": {"type": "boolean"}, "maxFailures": {"type": "number", "minimum": 1}, "recoveryInterval": {"type": "number", "minimum": 1000}, }, }, "configReload": { "type": "object", "properties": { "enabled": {"type": "boolean"}, "debounceMs": {"type": "number", "minimum": 100}, "validateOnly": {"type": "boolean"}, }, }, }, }, }, "required": ["mcpServers"], } def _get_schema_error_suggestion(error: "jsonschema.ValidationError") -> str | None: """Get suggestion for fixing schema validation error.""" if "is not valid under any of the given schemas" in error.message: return "Check that stdio servers have 'command' and sse servers have 'url'" if "is not one of" in error.message and isinstance(error.schema, dict): return f"Valid values are: {', '.join(error.schema.get('enum', []))}" if "is not of type" in error.message and isinstance(error.schema, dict): expected_type = error.schema.get("type", "unknown") return f"Expected type: {expected_type}" if "is a required property" in error.message: return "Add the missing required field to your configuration" if "does not match" in error.message: return "Server names must contain only letters, numbers, underscores, and hyphens" return None def _validate_business_logic(config_data: dict[str, Any], result: ValidationResult) -> None: """Validate business logic constraints.""" servers = config_data.get("mcpServers", {}) # Validate server configurations for server_name, server_config in servers.items(): if not isinstance(server_config, dict): result.add_error("Server configuration must be an object", f"mcpServers.{server_name}") continue _validate_server_business_logic(server_name, server_config, result) # Validate bridge configuration bridge_config = config_data.get("bridge", {}) if bridge_config: _validate_bridge_business_logic(bridge_config, result) def _validate_server_business_logic(server_name: str, server_config: dict[str, Any], result: ValidationResult) -> None: """Validate business logic for individual server configuration.""" base_path = f"mcpServers.{server_name}" # Transport-specific validation transport_type = server_config.get("transport", "stdio") if transport_type == "stdio": if not server_config.get("command"): result.add_error("STDIO servers require a 'command' field", base_path) elif transport_type in ["sse", "http"]: url = server_config.get("url") if not url: result.add_error(f"{transport_type.upper()} servers require a 'url' field", base_path) elif not _is_valid_url(url): result.add_error("Invalid URL format", f"{base_path}.url") # Authentication validation auth_config = server_config.get("authentication", {}) if auth_config: _validate_authentication_config(auth_config, result, f"{base_path}.authentication") # OAuth validation oauth_config = server_config.get("oauth_config", {}) if oauth_config: _validate_oauth_config(oauth_config, result, f"{base_path}.oauth_config") # Health check validation health_check = server_config.get("healthCheck", {}) if health_check: _validate_health_check_config(health_check, result, f"{base_path}.healthCheck") def _validate_bridge_business_logic(bridge_config: dict[str, Any], result: ValidationResult) -> None: """Validate bridge-level business logic.""" base_path = "bridge" # Port validation port = bridge_config.get("port") if port is not None and port < 1024 and port not in {80, 443}: result.add_warning( f"Port {port} is a privileged port and may require elevated permissions", f"{base_path}.port", ) # Host validation host = bridge_config.get("host") if host == "0.0.0.0": # noqa: S104 result.add_warning( "Binding to 0.0.0.0 exposes the bridge to all network interfaces. " "Consider using 127.0.0.1 for local-only access.", f"{base_path}.host", ) def _validate_authentication_config(auth_config: dict[str, Any], result: ValidationResult, base_path: str) -> None: """Validate authentication configuration.""" auth_type = auth_config.get("type") if not auth_type: result.add_error("Authentication type is required", f"{base_path}.type") return if auth_type == "bearer": if not auth_config.get("token"): result.add_error("Bearer authentication requires a 'token' field", base_path) elif auth_type == "api_key": if not auth_config.get("key"): result.add_error("API key authentication requires a 'key' field", base_path) elif auth_type == "basic": if not auth_config.get("username"): result.add_error("Basic authentication requires a 'username' field", base_path) elif auth_type == "custom" and not auth_config.get("headers"): result.add_error("Custom authentication requires a 'headers' field", base_path) def _validate_oauth_config(oauth_config: dict[str, Any], result: ValidationResult, base_path: str) -> None: """Validate OAuth configuration.""" enabled = oauth_config.get("enabled") if enabled: oauth_type = oauth_config.get("type", "proxy") if oauth_type == "proxy": # Proxy OAuth requires client configuration if not oauth_config.get("client_id"): result.add_warning("OAuth proxy mode typically requires a 'client_id'", f"{base_path}.client_id") def _validate_health_check_config(health_check: dict[str, Any], result: ValidationResult, base_path: str) -> None: """Validate health check configuration.""" operation = health_check.get("operation", "list_tools") # Operation-specific validation if operation == "call_tool" and not health_check.get("toolName"): result.add_error("Health check operation 'call_tool' requires 'toolName'", f"{base_path}.toolName") elif operation == "read_resource" and not health_check.get("resourceUri"): result.add_error( "Health check operation 'read_resource' requires 'resourceUri'", f"{base_path}.resourceUri", ) elif operation == "get_prompt" and not health_check.get("promptName"): result.add_error("Health check operation 'get_prompt' requires 'promptName'", f"{base_path}.promptName") # Interval validation interval = health_check.get("interval", 30000) timeout = health_check.get("timeout", 5000) if timeout >= interval: result.add_warning( f"Health check timeout ({timeout}ms) should be less than interval ({interval}ms)", base_path, ) def _validate_constraints(config_data: dict[str, Any], result: ValidationResult) -> None: """Validate configuration constraints and relationships.""" servers = config_data.get("mcpServers", {}) # Check for duplicate server names (case-insensitive) server_names_lower = {} for server_name in servers: lower_name = server_name.lower() if lower_name in server_names_lower: result.add_error( f"Duplicate server name '{server_name}' (case-insensitive)", f"mcpServers.{server_name}", ) server_names_lower[lower_name] = server_name # Check for port conflicts _validate_port_conflicts(servers, result) # Check for namespace conflicts _validate_namespace_conflicts(servers, result) def _validate_port_conflicts(servers: dict[str, Any], result: ValidationResult) -> None: """Validate for port conflicts between servers.""" used_ports: dict[int, str] = {} for server_name, server_config in servers.items(): if not isinstance(server_config, dict): continue # Check for embedded servers that might use ports if server_config.get("transport") in ["sse", "http"]: url = server_config.get("url", "") port = _extract_port_from_url(url) if port and port in used_ports: result.add_warning( f"Potential port conflict with server '{used_ports[port]}' on port {port}", f"mcpServers.{server_name}.url", ) elif port: used_ports[port] = server_name def _validate_namespace_conflicts(servers: dict[str, Any], result: ValidationResult) -> None: """Validate for namespace conflicts between servers.""" used_namespaces: dict[str, set[str]] = {"tool": set(), "resource": set(), "prompt": set()} for server_name, server_config in servers.items(): if not isinstance(server_config, dict): continue # Check tool namespace tool_ns = server_config.get("toolNamespace") if tool_ns: if tool_ns in used_namespaces["tool"]: result.add_warning( f"Tool namespace '{tool_ns}' is used by multiple servers", f"mcpServers.{server_name}.toolNamespace", ) used_namespaces["tool"].add(tool_ns) # Check resource namespace resource_ns = server_config.get("resourceNamespace") if resource_ns: if resource_ns in used_namespaces["resource"]: result.add_warning( f"Resource namespace '{resource_ns}' is used by multiple servers", f"mcpServers.{server_name}.resourceNamespace", ) used_namespaces["resource"].add(resource_ns) # Check prompt namespace prompt_ns = server_config.get("promptNamespace") if prompt_ns: if prompt_ns in used_namespaces["prompt"]: result.add_warning( f"Prompt namespace '{prompt_ns}' is used by multiple servers", f"mcpServers.{server_name}.promptNamespace", ) used_namespaces["prompt"].add(prompt_ns) def _add_configuration_warnings(config_data: dict[str, Any], result: ValidationResult) -> None: """Add additional configuration warnings.""" servers = config_data.get("mcpServers", {}) # Warn about insecure configurations for server_name, server_config in servers.items(): if not isinstance(server_config, dict): continue # SSL verification warning if server_config.get("verify_ssl") is False: result.add_warning( "SSL verification is disabled - this may be insecure", f"mcpServers.{server_name}.verify_ssl", ) # Plaintext URLs warning url = server_config.get("url", "") if ( url.startswith("http://") and not url.startswith("http://localhost") and not url.startswith("http://127.0.0.1") ): result.add_warning( "Using HTTP (not HTTPS) for remote server - this may be insecure", f"mcpServers.{server_name}.url", ) # Empty environment variables warning env = server_config.get("env", {}) for env_var, env_value in env.items(): if not env_value: result.add_warning( f"Environment variable '{env_var}' has empty value", f"mcpServers.{server_name}.env.{env_var}", ) # Utility functions def _is_valid_url(url: str) -> bool: """Check if URL is valid.""" try: parsed = urllib.parse.urlparse(url) return all([parsed.scheme, parsed.netloc]) except (ValueError, TypeError) as e: logger.debug("URL parsing error: %s", e) return False except Exception as e: logger.warning("Unexpected URL validation error: %s", str(e)) return False def _extract_port_from_url(url: str) -> int | None: """Extract port number from URL.""" try: parsed = urllib.parse.urlparse(url) return parsed.port except (ValueError, TypeError) as e: logger.debug("URL port extraction error: %s", e) return None except Exception as e: logger.warning("Unexpected port extraction error: %s", str(e)) return None def _is_valid_server_name(name: str) -> bool: """Check if server name is valid.""" return re.match(r"^[a-zA-Z0-9_-]+$", name) is not None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/billyjbryant/mcp-foxxy-bridge'

If you have feedback or need assistance with the MCP directory API, please join our Discord server