Skip to main content
Glama

Wazuh MCP Server

by gensecaihq
ssl_config.py10.3 kB
#!/usr/bin/env python3 """ Production-grade SSL configuration and certificate handling for Wazuh MCP Server. Provides secure defaults while supporting various deployment scenarios. """ import os import ssl import warnings import urllib3 import logging from pathlib import Path from typing import Optional, Dict, Any from dataclasses import dataclass logger = logging.getLogger(__name__) @dataclass class SSLConfig: """SSL configuration settings for production deployment.""" verify_ssl: bool = True ca_bundle_path: Optional[str] = None client_cert_path: Optional[str] = None client_key_path: Optional[str] = None allow_self_signed: bool = False ssl_timeout: int = 30 auto_detect_issues: bool = True min_tls_version: str = "TLSv1.2" class SSLConfigurationManager: """Production-grade SSL configuration manager.""" def __init__(self): self.logger = logger self._ssl_warnings_disabled = False def create_ssl_context(self, config: SSLConfig) -> Optional[ssl.SSLContext]: """ Create SSL context with security-first configuration. Args: config: SSL configuration settings Returns: Configured SSL context or None for unverified connections """ if not config.verify_ssl: self._handle_unverified_ssl(config) return None # Create secure SSL context context = ssl.create_default_context() # Set minimum TLS version if config.min_tls_version == "TLSv1.3": context.minimum_version = ssl.TLSVersion.TLSv1_3 elif config.min_tls_version == "TLSv1.2": context.minimum_version = ssl.TLSVersion.TLSv1_2 else: self.logger.warning(f"Unsupported TLS version: {config.min_tls_version}, using TLSv1.2") context.minimum_version = ssl.TLSVersion.TLSv1_2 # Configure custom CA bundle if config.ca_bundle_path: if Path(config.ca_bundle_path).exists(): context.load_verify_locations(config.ca_bundle_path) self.logger.info(f"Loaded custom CA bundle: {config.ca_bundle_path}") else: self.logger.error(f"CA bundle not found: {config.ca_bundle_path}") raise FileNotFoundError(f"CA bundle not found: {config.ca_bundle_path}") # Configure client certificates if config.client_cert_path and config.client_key_path: if Path(config.client_cert_path).exists() and Path(config.client_key_path).exists(): context.load_cert_chain(config.client_cert_path, config.client_key_path) self.logger.info("Client certificate authentication configured") else: self.logger.error("Client certificate or key file not found") raise FileNotFoundError("Client certificate or key file not found") # Handle self-signed certificates if config.allow_self_signed: context.check_hostname = False context.verify_mode = ssl.CERT_NONE self.logger.warning("Self-signed certificates allowed - not recommended for production") return context def _handle_unverified_ssl(self, config: SSLConfig): """Handle unverified SSL connections with appropriate warnings.""" if not self._ssl_warnings_disabled: # Show warning but don't disable globally self.logger.warning( "SSL verification disabled - not recommended for production. " "Consider using proper certificates or configuring ca_bundle_path." ) # Only disable urllib3 warnings for this specific case urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self._ssl_warnings_disabled = True def get_aiohttp_connector_args(self, config: SSLConfig) -> Dict[str, Any]: """ Get aiohttp connector arguments for SSL configuration. Args: config: SSL configuration settings Returns: Dictionary of connector arguments """ connector_args = {} if config.verify_ssl: ssl_context = self.create_ssl_context(config) if ssl_context: connector_args["ssl_context"] = ssl_context else: connector_args["ssl"] = False return connector_args def validate_ssl_config(self, config: SSLConfig) -> bool: """ Validate SSL configuration for security best practices. Args: config: SSL configuration to validate Returns: True if configuration is valid and secure """ issues = [] # Check for insecure configurations if not config.verify_ssl: issues.append("SSL verification is disabled") if config.allow_self_signed: issues.append("Self-signed certificates are allowed") if config.min_tls_version not in ["TLSv1.2", "TLSv1.3"]: issues.append(f"Insecure TLS version: {config.min_tls_version}") # Check file paths if config.ca_bundle_path and not Path(config.ca_bundle_path).exists(): issues.append(f"CA bundle file not found: {config.ca_bundle_path}") if config.client_cert_path and not Path(config.client_cert_path).exists(): issues.append(f"Client certificate file not found: {config.client_cert_path}") if config.client_key_path and not Path(config.client_key_path).exists(): issues.append(f"Client key file not found: {config.client_key_path}") # Log issues if issues: self.logger.warning("SSL configuration issues found:") for issue in issues: self.logger.warning(f" - {issue}") return False self.logger.info("SSL configuration validated successfully") return True def auto_detect_ssl_issues(self, host: str, port: int) -> Dict[str, Any]: """ Auto-detect SSL configuration issues for a given host. Args: host: Target hostname port: Target port Returns: Dictionary with detection results and recommendations """ import socket import ssl results = { "host": host, "port": port, "ssl_available": False, "certificate_valid": False, "self_signed": False, "tls_versions": [], "recommendations": [] } try: # Test SSL connection context = ssl.create_default_context() with socket.create_connection((host, port), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=host) as ssock: results["ssl_available"] = True results["certificate_valid"] = True # Get certificate info cert = ssock.getpeercert() if cert: issuer = dict(x[0] for x in cert.get('issuer', [])) subject = dict(x[0] for x in cert.get('subject', [])) # Check for self-signed if issuer == subject: results["self_signed"] = True results["recommendations"].append( "Certificate appears to be self-signed. Consider using a proper CA." ) except ssl.SSLError as e: results["ssl_available"] = True results["certificate_valid"] = False self.logger.warning(f"SSL error connecting to {host}:{port}: {e}") if "CERTIFICATE_VERIFY_FAILED" in str(e): results["recommendations"].append( "Certificate verification failed. Check if using self-signed certificates." ) except Exception as e: self.logger.error(f"Failed to connect to {host}:{port}: {e}") results["recommendations"].append( "Connection failed. Check if SSL/TLS is enabled on the target." ) return results @classmethod def from_environment(cls, prefix: str = "") -> SSLConfig: """ Create SSL configuration from environment variables. Args: prefix: Environment variable prefix (e.g., "WAZUH_") Returns: SSL configuration object """ return SSLConfig( verify_ssl=os.getenv(f"{prefix}VERIFY_SSL", "true").lower() == "true", ca_bundle_path=os.getenv(f"{prefix}CA_BUNDLE_PATH"), client_cert_path=os.getenv(f"{prefix}CLIENT_CERT_PATH"), client_key_path=os.getenv(f"{prefix}CLIENT_KEY_PATH"), allow_self_signed=os.getenv(f"{prefix}ALLOW_SELF_SIGNED", "false").lower() == "true", ssl_timeout=int(os.getenv(f"{prefix}SSL_TIMEOUT", "30")), auto_detect_issues=os.getenv(f"{prefix}AUTO_DETECT_SSL_ISSUES", "true").lower() == "true", min_tls_version=os.getenv(f"{prefix}MIN_TLS_VERSION", "TLSv1.2") ) # Global SSL manager instance ssl_manager = SSLConfigurationManager() def get_ssl_config(prefix: str = "") -> SSLConfig: """Get SSL configuration from environment variables.""" return SSLConfigurationManager.from_environment(prefix) def create_secure_ssl_context(config: SSLConfig) -> Optional[ssl.SSLContext]: """Create a secure SSL context.""" return ssl_manager.create_ssl_context(config) def validate_ssl_setup(config: SSLConfig) -> bool: """Validate SSL configuration.""" return ssl_manager.validate_ssl_config(config) # SSL configuration testing and validation module # For testing, run: python -m wazuh_mcp_server.utils.ssl_config

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gensecaihq/Wazuh-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server