Skip to main content
Glama

Coreflux MQTT MCP Server

Official
config_validator.py9.93 kB
""" Configuration validation and management for Coreflux MCP Server This module provides comprehensive configuration validation, environment variable checking, and secure configuration management. """ import os import logging import re from typing import Dict, List, Tuple, Any, Optional from pathlib import Path import json class ConfigurationError(Exception): """Raised when configuration validation fails""" pass class ConfigurationValidator: """Validates and manages server configuration""" REQUIRED_MQTT_VARS = ['MQTT_BROKER'] OPTIONAL_MQTT_VARS = ['MQTT_PORT', 'MQTT_USER', 'MQTT_PASSWORD', 'MQTT_CLIENT_ID'] TLS_VARS = ['MQTT_CA_CERT', 'MQTT_CLIENT_CERT', 'MQTT_CLIENT_KEY'] def __init__(self, logger: logging.Logger): self.logger = logger self.config = {} self.validation_errors = [] self.validation_warnings = [] def validate_environment(self) -> Tuple[bool, List[str], List[str]]: """ Validate all environment variables and configuration Returns: (is_valid, errors, warnings) """ self.validation_errors = [] self.validation_warnings = [] # Check required variables self._check_required_vars() # Validate MQTT configuration self._validate_mqtt_config() # Validate TLS configuration self._validate_tls_config() # Validate API configuration self._validate_api_config() # Validate logging configuration self._validate_logging_config() # Check file permissions self._check_file_permissions() is_valid = len(self.validation_errors) == 0 return is_valid, self.validation_errors, self.validation_warnings def _check_required_vars(self): """Check if required environment variables are set""" for var in self.REQUIRED_MQTT_VARS: value = os.environ.get(var) if not value: self.validation_errors.append(f"Required environment variable {var} is not set") elif var == 'MQTT_BROKER': if not self._is_valid_hostname(value): self.validation_errors.append(f"Invalid MQTT_BROKER hostname: {value}") def _validate_mqtt_config(self): """Validate MQTT configuration parameters""" # Validate port port = os.environ.get('MQTT_PORT', '1883') try: port_int = int(port) if not (1 <= port_int <= 65535): self.validation_errors.append(f"MQTT_PORT must be between 1-65535, got: {port}") except ValueError: self.validation_errors.append(f"MQTT_PORT must be a valid integer, got: {port}") # Validate client ID client_id = os.environ.get('MQTT_CLIENT_ID') if client_id: if len(client_id) > 65535: # MQTT spec limit self.validation_errors.append(f"MQTT_CLIENT_ID too long (max 65535 chars): {len(client_id)}") if not re.match(r'^[a-zA-Z0-9_-]+$', client_id): self.validation_warnings.append(f"MQTT_CLIENT_ID contains special characters: {client_id}") # Check credentials mqtt_user = os.environ.get('MQTT_USER') mqtt_password = os.environ.get('MQTT_PASSWORD') if mqtt_user and not mqtt_password: self.validation_warnings.append("MQTT_USER set but MQTT_PASSWORD is empty") elif mqtt_password and not mqtt_user: self.validation_warnings.append("MQTT_PASSWORD set but MQTT_USER is empty") def _validate_tls_config(self): """Validate TLS configuration""" use_tls = os.environ.get('MQTT_USE_TLS', 'false').lower() == 'true' if use_tls: ca_cert = os.environ.get('MQTT_CA_CERT') client_cert = os.environ.get('MQTT_CLIENT_CERT') client_key = os.environ.get('MQTT_CLIENT_KEY') if not ca_cert: self.validation_errors.append("MQTT_USE_TLS=true but MQTT_CA_CERT not specified") else: if not Path(ca_cert).exists(): self.validation_errors.append(f"CA certificate file not found: {ca_cert}") if client_cert and not Path(client_cert).exists(): self.validation_errors.append(f"Client certificate file not found: {client_cert}") if client_key and not Path(client_key).exists(): self.validation_errors.append(f"Client key file not found: {client_key}") if client_cert and not client_key: self.validation_errors.append("MQTT_CLIENT_CERT specified but MQTT_CLIENT_KEY missing") elif client_key and not client_cert: self.validation_errors.append("MQTT_CLIENT_KEY specified but MQTT_CLIENT_CERT missing") def _validate_api_config(self): """Validate API configuration""" api_key = os.environ.get('DO_AGENT_API_KEY') if api_key: # Basic API key format validation if len(api_key) < 10: self.validation_warnings.append("DO_AGENT_API_KEY appears to be too short") elif api_key.startswith('your_') or api_key.endswith('_here'): self.validation_warnings.append("DO_AGENT_API_KEY appears to be a placeholder value") def _validate_logging_config(self): """Validate logging configuration""" log_level = os.environ.get('LOG_LEVEL', 'INFO').upper() valid_levels = ['NONE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] if log_level not in valid_levels: self.validation_errors.append(f"Invalid LOG_LEVEL: {log_level}. Valid options: {valid_levels}") def _check_file_permissions(self): """Check file permissions for certificate files""" cert_files = [] for var in self.TLS_VARS: cert_path = os.environ.get(var) if cert_path and Path(cert_path).exists(): cert_files.append(cert_path) for cert_file in cert_files: try: path = Path(cert_file) # Check if file is readable if not os.access(cert_file, os.R_OK): self.validation_errors.append(f"Certificate file not readable: {cert_file}") # Warn about overly permissive permissions (Unix-like systems) if hasattr(os, 'stat'): stat_info = path.stat() # Check if file is world-readable (others can read) if stat_info.st_mode & 0o044: self.validation_warnings.append(f"Certificate file has permissive permissions: {cert_file}") except Exception as e: self.validation_warnings.append(f"Could not check permissions for {cert_file}: {str(e)}") def _is_valid_hostname(self, hostname: str) -> bool: """Validate hostname format""" if not hostname or len(hostname) > 253: return False # Allow localhost and IP addresses if hostname in ['localhost', '127.0.0.1', '::1']: return True # Basic IP address check if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', hostname): return True # Basic hostname validation if re.match(r'^[a-zA-Z0-9.-]+$', hostname): return True return False def get_sanitized_config_summary(self) -> Dict[str, Any]: """Get a summary of current configuration with sensitive data redacted""" summary = { 'mqtt': { 'broker': os.environ.get('MQTT_BROKER', 'NOT_SET'), 'port': os.environ.get('MQTT_PORT', '1883'), 'user': '[REDACTED]' if os.environ.get('MQTT_USER') else 'NOT_SET', 'password': '[REDACTED]' if os.environ.get('MQTT_PASSWORD') else 'NOT_SET', 'client_id': os.environ.get('MQTT_CLIENT_ID', 'AUTO_GENERATED'), 'use_tls': os.environ.get('MQTT_USE_TLS', 'false'), }, 'tls': { 'ca_cert': '[SET]' if os.environ.get('MQTT_CA_CERT') else 'NOT_SET', 'client_cert': '[SET]' if os.environ.get('MQTT_CLIENT_CERT') else 'NOT_SET', 'client_key': '[SET]' if os.environ.get('MQTT_CLIENT_KEY') else 'NOT_SET', }, 'api': { 'do_agent_key': '[SET]' if os.environ.get('DO_AGENT_API_KEY') else 'NOT_SET', }, 'logging': { 'level': os.environ.get('LOG_LEVEL', 'INFO'), } } return summary def log_configuration_status(self): """Log current configuration status""" is_valid, errors, warnings = self.validate_environment() self.logger.info("Configuration validation completed") if errors: self.logger.error(f"Configuration errors found ({len(errors)}):") for error in errors: self.logger.error(f" ❌ {error}") if warnings: self.logger.warning(f"Configuration warnings ({len(warnings)}):") for warning in warnings: self.logger.warning(f" ⚠️ {warning}") if is_valid: self.logger.info("✅ Configuration validation passed") else: self.logger.error("❌ Configuration validation failed") # Log sanitized configuration summary config_summary = self.get_sanitized_config_summary() self.logger.debug(f"Configuration summary: {json.dumps(config_summary, indent=2)}") return is_valid

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/CorefluxCommunity/Coreflux-MQTT-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server