Skip to main content
Glama

Wazuh MCP Server

by gensecaihq
connection_validator.py15.1 kB
#!/usr/bin/env python3 """Intelligent connection validator for Wazuh MCP Server. This module provides comprehensive connection testing with automatic protocol detection, SSL validation, and configuration recommendations. """ import asyncio import socket import ssl import aiohttp import json import platform from typing import Dict, List, Optional, Tuple from pathlib import Path import sys # Add src to path for imports sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from wazuh_mcp_server.config import WazuhConfig from wazuh_mcp_server.utils.logging import get_logger logger = get_logger(__name__) # Windows console compatibility def _supports_unicode(): """Check if the terminal supports Unicode characters.""" try: "✅".encode(sys.stdout.encoding or 'utf-8') return True except (UnicodeEncodeError, LookupError): return False def _safe_print(text): """Print text with Windows console compatibility.""" try: print(text) except UnicodeEncodeError: # Replace Unicode characters with ASCII equivalents replacements = { '✅': '[OK]', '❌': '[FAIL]', '🔍': '[SEARCH]', '📊': '[CHART]', '📡': '[SIGNAL]', '📋': '[INFO]', '🔒': '[SECURE]', '🔓': '[UNSECURE]', '⚠️': '[WARN]', '🎉': '[SUCCESS]', '💡': '[TIP]' } safe_text = text for unicode_char, ascii_replacement in replacements.items(): safe_text = safe_text.replace(unicode_char, ascii_replacement) print(safe_text.encode('ascii', errors='replace').decode('ascii')) class ConnectionValidator: """Intelligent connection validator with protocol detection.""" def __init__(self, config: WazuhConfig): self.config = config self.results = { 'manager': {'reachable': False, 'protocol': None, 'ssl_valid': False}, 'indexer': {'reachable': False, 'protocol': None, 'ssl_valid': False}, 'recommendations': [] } async def validate_all_connections(self) -> Dict: """Validate all configured connections.""" _safe_print("🔍 Starting comprehensive connection validation...") _safe_print("") # Test Wazuh Manager if self.config.host: _safe_print(f"📡 Testing Wazuh Manager: {self.config.host}:{self.config.port}") manager_result = await self.test_manager_connection() self.results['manager'] = manager_result self._print_connection_result("Manager", manager_result) # Test Wazuh Indexer if hasattr(self.config, 'indexer_host') and self.config.indexer_host: _safe_print(f"📊 Testing Wazuh Indexer: {self.config.indexer_host}:{self.config.indexer_port}") indexer_result = await self.test_indexer_connection() self.results['indexer'] = indexer_result self._print_connection_result("Indexer", indexer_result) # Generate recommendations self._generate_recommendations() return self.results async def test_manager_connection(self) -> Dict: """Test Wazuh Manager connection with protocol detection.""" result = { 'reachable': False, 'protocol': None, 'ssl_valid': False, 'self_signed': False, 'auth_success': False, 'api_version': None, 'error': None } # Test HTTPS first https_result = await self._test_https_connection( self.config.host, self.config.port ) if https_result['success']: result.update(https_result) result['protocol'] = 'https' # Test API authentication auth_result = await self._test_api_authentication() result['auth_success'] = auth_result.get('success', False) result['api_version'] = auth_result.get('version') if not auth_result.get('success'): result['error'] = auth_result.get('error') return result async def test_indexer_connection(self) -> Dict: """Test Wazuh Indexer connection.""" result = { 'reachable': False, 'protocol': None, 'ssl_valid': False, 'self_signed': False, 'cluster_status': None, 'error': None } # Test HTTPS connection https_result = await self._test_https_connection( self.config.indexer_host, self.config.indexer_port ) if https_result['success']: result.update(https_result) result['protocol'] = 'https' # Test Indexer API cluster_result = await self._test_indexer_api() result['cluster_status'] = cluster_result.get('status') if not cluster_result.get('success'): result['error'] = cluster_result.get('error') return result async def _test_https_connection(self, host: str, port: int) -> Dict: """Test HTTPS connection with SSL validation.""" result = { 'success': False, 'ssl_valid': False, 'self_signed': False, 'error': None } # Test with SSL verification try: context = ssl.create_default_context() with socket.create_connection((host, port), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=host) as ssock: result['success'] = True result['ssl_valid'] = True return result except ssl.SSLCertVerificationError: result['self_signed'] = True except Exception as e: result['error'] = str(e) # Test with disabled SSL verification try: context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE with socket.create_connection((host, port), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=host) as ssock: result['success'] = True result['ssl_valid'] = False return result except Exception as e: result['error'] = str(e) return result async def _test_api_authentication(self) -> Dict: """Test Wazuh API authentication.""" result = {'success': False, 'error': None, 'version': None} # Create SSL context based on configuration ssl_context = ssl.create_default_context() if not self.config.verify_ssl: ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE connector = aiohttp.TCPConnector(ssl=ssl_context) try: async with aiohttp.ClientSession(connector=connector) as session: # Test authentication endpoint auth_url = f"{self.config.base_url}/security/user/authenticate" auth = aiohttp.BasicAuth(self.config.username, self.config.password) async with session.post(auth_url, auth=auth) as response: if response.status == 200: result['success'] = True # Get API version version_url = f"{self.config.base_url}/" async with session.get(version_url, headers={'Authorization': f'Bearer {(await response.json()).get("token", "")}'}) as version_response: if version_response.status == 200: version_data = await version_response.json() result['version'] = version_data.get('data', {}).get('api_version') else: result['error'] = f"Authentication failed: HTTP {response.status}" except Exception as e: result['error'] = str(e) return result async def _test_indexer_api(self) -> Dict: """Test Wazuh Indexer API.""" result = {'success': False, 'error': None, 'status': None} # Create SSL context ssl_context = ssl.create_default_context() if not self.config.verify_ssl: ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE connector = aiohttp.TCPConnector(ssl=ssl_context) try: async with aiohttp.ClientSession(connector=connector) as session: indexer_url = f"https://{self.config.indexer_host}:{self.config.indexer_port}" auth = aiohttp.BasicAuth( getattr(self.config, 'indexer_username', self.config.username), getattr(self.config, 'indexer_password', self.config.password) ) # Test cluster health health_url = f"{indexer_url}/_cluster/health" async with session.get(health_url, auth=auth) as response: if response.status == 200: result['success'] = True health_data = await response.json() result['status'] = health_data.get('status', 'unknown') else: result['error'] = f"Cluster health check failed: HTTP {response.status}" except Exception as e: result['error'] = str(e) return result def _print_connection_result(self, service: str, result: Dict): """Print formatted connection result.""" if result['reachable']: if _supports_unicode(): status = "✅ CONNECTED" else: status = "[CONNECTED]" if result.get('auth_success') or result.get('cluster_status'): status += " & AUTHENTICATED" else: if _supports_unicode(): status = "❌ FAILED" else: status = "[FAILED]" _safe_print(f" {status}") if result['reachable']: if result.get('ssl_valid'): _safe_print(" 🔒 SSL: Valid certificate") elif result.get('self_signed'): _safe_print(" 🔓 SSL: Self-signed certificate") else: _safe_print(" ⚠️ SSL: Verification disabled") if result.get('api_version'): _safe_print(f" 📋 API Version: {result['api_version']}") if result.get('cluster_status'): _safe_print(f" 📊 Cluster Status: {result['cluster_status']}") if result.get('error'): _safe_print(f" ❗ Error: {result['error']}") _safe_print("") def _generate_recommendations(self): """Generate configuration recommendations.""" recommendations = [] # SSL/TLS recommendations if self.results['manager']['reachable']: if self.results['manager']['ssl_valid']: recommendations.append( "🔒 Manager has valid SSL - set VERIFY_SSL=true for production" ) elif self.results['manager']['self_signed']: recommendations.append( "🔓 Manager uses self-signed certificate - current VERIFY_SSL=false is correct" ) if self.results['indexer']['reachable']: if self.results['indexer']['ssl_valid']: recommendations.append( "🔒 Indexer has valid SSL - enable SSL verification" ) elif self.results['indexer']['self_signed']: recommendations.append( "🔓 Indexer uses self-signed certificate - SSL verification disabled is appropriate" ) # Authentication recommendations if not self.results['manager'].get('auth_success'): recommendations.append( "🔑 Authentication failed - verify WAZUH_USER and WAZUH_PASS credentials" ) # Performance recommendations if self.results['manager']['reachable'] and self.results['indexer']['reachable']: recommendations.append( "🚀 Both services reachable - full functionality available" ) elif self.results['manager']['reachable']: recommendations.append( "⚠️ Only Manager reachable - limited to basic operations" ) self.results['recommendations'] = recommendations if recommendations: _safe_print("💡 RECOMMENDATIONS:") for rec in recommendations: _safe_print(f" {rec}") _safe_print("") async def main(): """Main function for connection validation.""" try: # Load configuration config = WazuhConfig.from_env() # Create validator validator = ConnectionValidator(config) # Run validation results = await validator.validate_all_connections() # Print summary _safe_print("📋 VALIDATION SUMMARY:") if _supports_unicode(): manager_icon = '✅' if results['manager']['reachable'] else '❌' indexer_icon = '✅' if results['indexer']['reachable'] else '❌' else: manager_icon = '[OK]' if results['manager']['reachable'] else '[FAIL]' indexer_icon = '[OK]' if results['indexer']['reachable'] else '[FAIL]' _safe_print(f" Manager: {manager_icon}") _safe_print(f" Indexer: {indexer_icon}") # Exit with appropriate code if results['manager']['reachable']: _safe_print("\n🎉 Validation completed successfully!") return 0 else: _safe_print("\n❌ Validation failed - check configuration and network connectivity") return 1 except Exception as e: _safe_print(f"❌ Validation error: {e}") return 1 if __name__ == "__main__": # Set up console encoding for Windows if platform.system() == "Windows": try: # Try to set UTF-8 encoding for Windows console import codecs sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer, 'replace') sys.stderr = codecs.getwriter('utf-8')(sys.stderr.buffer, 'replace') except Exception: # If that fails, we'll use ASCII fallbacks in the print functions pass sys.exit(asyncio.run(main()))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gensecaihq/Wazuh-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server