Skip to main content
Glama

Wazuh MCP Server

by gensecaihq
test_wazuh_integration.py26.6 kB
"""Integration tests for Wazuh 4.8.0+ API compatibility.""" import pytest import asyncio import json from unittest.mock import Mock, AsyncMock, patch from datetime import datetime from wazuh_mcp_server.config import WazuhConfig from wazuh_mcp_server.api.wazuh_client_manager import WazuhClientManager from wazuh_mcp_server.api.wazuh_indexer_client import WazuhIndexerClient from wazuh_mcp_server.api.wazuh_field_mappings import WazuhFieldMapper, WazuhVersion class TestWazuh48Integration: """Integration tests for Wazuh 4.8.0+ compatibility.""" @pytest.fixture def wazuh_config(self): """Production-like Wazuh 4.8.0+ configuration.""" return WazuhConfig( host="wazuh-test.local", port=55000, username="wazuh-mcp", password="SecureTestPassword123!", verify_ssl=True, api_version="v4", indexer_host="wazuh-test.local", indexer_port=9200, indexer_username="admin", indexer_password="SecureTestPassword123!", indexer_verify_ssl=True, wazuh_version="4.8.2", use_indexer_for_alerts=True, use_indexer_for_vulnerabilities=True ) @pytest.fixture def mock_server_api_responses(self): """Mock responses for Wazuh Server API.""" return { "version_info": { "data": { "title": "Wazuh API", "api_version": "v4.8.2", "revision": "40815", "license_name": "GPL 2.0", "license_url": "https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html", "hostname": "wazuh-manager", "timestamp": "2024-01-15T10:30:00Z" } }, "agents": { "data": { "affected_items": [ { "id": "000", "name": "wazuh-manager", "ip": "127.0.0.1", "status": "active", "os": {"platform": "ubuntu", "version": "20.04"}, "version": "v4.8.2", "manager": "wazuh-manager", "lastKeepAlive": "9999-12-31T23:59:59Z", "registerIP": "127.0.0.1" }, { "id": "001", "name": "web-server-01", "ip": "192.168.1.100", "status": "active", "os": {"platform": "centos", "version": "8"}, "version": "v4.8.2", "manager": "wazuh-manager", "lastKeepAlive": "2024-01-15T10:29:30Z", "registerIP": "192.168.1.100" } ], "total_affected_items": 2, "total_failed_items": 0, "failed_items": [] } }, "cluster_status": { "data": { "enabled": "yes", "running": "yes", "name": "wazuh-cluster", "node_name": "wazuh-manager", "node_type": "master" } } } @pytest.fixture def mock_indexer_api_responses(self): """Mock responses for Wazuh Indexer API.""" return { "cluster_health": { "cluster_name": "wazuh-cluster", "status": "green", "timed_out": False, "number_of_nodes": 3, "number_of_data_nodes": 3, "active_primary_shards": 10, "active_shards": 20, "relocating_shards": 0, "initializing_shards": 0, "unassigned_shards": 0 }, "alerts_search": { "took": 15, "timed_out": False, "hits": { "total": {"value": 150, "relation": "eq"}, "max_score": 1.0, "hits": [ { "_index": "wazuh-alerts-4.x-2024.01.15", "_id": "alert_001", "_score": 1.0, "_source": { "@timestamp": "2024-01-15T10:25:00.000Z", "rule": { "id": 1002, "level": 5, "description": "Unknown user attempted to login", "groups": ["authentication_failed", "syslog", "ssh"] }, "agent": { "id": "001", "name": "web-server-01", "ip": "192.168.1.100" }, "manager": {"name": "wazuh-manager"}, "location": "/var/log/auth.log", "full_log": "Jan 15 10:25:00 web-server-01 sshd[1234]: Failed password for invalid user test from 192.168.1.50 port 22 ssh2" } }, { "_index": "wazuh-alerts-4.x-2024.01.15", "_id": "alert_002", "_score": 1.0, "_source": { "@timestamp": "2024-01-15T10:26:00.000Z", "rule": { "id": 5716, "level": 12, "description": "Multiple SSH login failures", "groups": ["authentication_failures", "syslog", "ssh"] }, "agent": { "id": "001", "name": "web-server-01", "ip": "192.168.1.100" }, "manager": {"name": "wazuh-manager"}, "location": "/var/log/auth.log" } } ] }, "aggregations": { "rule_levels": { "buckets": [ {"key": 5, "doc_count": 80}, {"key": 12, "doc_count": 50}, {"key": 3, "doc_count": 20} ] }, "top_agents": { "buckets": [ {"key": "web-server-01", "doc_count": 100}, {"key": "db-server-01", "doc_count": 50} ] } } }, "vulnerabilities_search": { "took": 25, "timed_out": False, "hits": { "total": {"value": 45, "relation": "eq"}, "max_score": 1.0, "hits": [ { "_index": "wazuh-states-vulnerabilities-2024.01.15", "_id": "vuln_001", "_score": 1.0, "_source": { "@timestamp": "2024-01-15T10:00:00.000Z", "agent": { "id": "001", "name": "web-server-01", "ip": "192.168.1.100" }, "vulnerability": { "id": "CVE-2023-12345", "cve": "CVE-2023-12345", "title": "Critical vulnerability in OpenSSL", "severity": "Critical", "published": "2023-12-01T00:00:00Z", "cvss3": {"score": 9.8} }, "package": { "name": "openssl", "version": "1.1.1-vulnerable", "architecture": "amd64", "format": "deb" }, "state": "vulnerable" } } ] } } } @pytest.mark.asyncio async def test_version_detection_and_routing(self, wazuh_config, mock_server_api_responses): """Test automatic version detection and API routing.""" with patch('aiohttp.ClientSession') as mock_session: # Mock server API responses mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value=mock_server_api_responses["version_info"]) mock_session.return_value.__aenter__.return_value.request.return_value.__aenter__.return_value = mock_response client_manager = WazuhClientManager(wazuh_config) await client_manager.__aenter__() # Test version detection version = await client_manager.detect_wazuh_version() assert version == "v4.8.2", "Should detect Wazuh 4.8.2" # Test API routing decisions assert client_manager._should_use_indexer_for_alerts(), "Should use Indexer for alerts in 4.8+" assert client_manager._should_use_indexer_for_vulnerabilities(), "Should use Indexer for vulnerabilities in 4.8+" await client_manager.__aexit__(None, None, None) @pytest.mark.asyncio async def test_indexer_alerts_query_compatibility(self, wazuh_config, mock_indexer_api_responses): """Test Indexer API alerts query compatibility.""" with patch('aiohttp.ClientSession') as mock_session: # Mock Indexer API responses mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value=mock_indexer_api_responses["alerts_search"]) mock_session.return_value.__aenter__.return_value.request.return_value.__aenter__.return_value = mock_response indexer_client = WazuhIndexerClient(wazuh_config) await indexer_client.__aenter__() # Test alerts search result = await indexer_client.search_alerts( limit=100, level=5, time_range=3600 ) # Verify response format compatibility assert "data" in result, "Should have data field for compatibility" assert "affected_items" in result["data"], "Should have affected_items for compatibility" assert result["data"]["total_affected_items"] == 150, "Should preserve total count" # Verify alert structure alerts = result["data"]["affected_items"] assert len(alerts) == 2, "Should return correct number of alerts" first_alert = alerts[0] assert "@timestamp" in first_alert, "Should preserve @timestamp field" assert "rule" in first_alert, "Should have rule information" assert "agent" in first_alert, "Should have agent information" assert first_alert["rule"]["level"] == 5, "Should preserve rule level" await indexer_client.__aexit__(None, None, None) @pytest.mark.asyncio async def test_indexer_vulnerabilities_query_compatibility(self, wazuh_config, mock_indexer_api_responses): """Test Indexer API vulnerabilities query compatibility.""" with patch('aiohttp.ClientSession') as mock_session: # Mock Indexer API responses mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value=mock_indexer_api_responses["vulnerabilities_search"]) mock_session.return_value.__aenter__.return_value.request.return_value.__aenter__.return_value = mock_response indexer_client = WazuhIndexerClient(wazuh_config) await indexer_client.__aenter__() # Test vulnerabilities search result = await indexer_client.search_vulnerabilities( agent_id="001", limit=50 ) # Verify response format compatibility assert "data" in result, "Should have data field for compatibility" assert "affected_items" in result["data"], "Should have affected_items for compatibility" assert result["data"]["total_affected_items"] == 45, "Should preserve total count" # Verify vulnerability structure vulns = result["data"]["affected_items"] assert len(vulns) == 1, "Should return correct number of vulnerabilities" vuln = vulns[0] assert "vulnerability" in vuln, "Should have vulnerability information" assert "agent" in vuln, "Should have agent information" assert vuln["vulnerability"]["cve"] == "CVE-2023-12345", "Should preserve CVE information" assert vuln["vulnerability"]["severity"] == "Critical", "Should preserve severity" await indexer_client.__aexit__(None, None, None) @pytest.mark.asyncio async def test_field_mapping_accuracy(self): """Test field mapping accuracy for 4.8.0+ compatibility.""" field_mapper = WazuhFieldMapper(WazuhVersion.V4_8_X) # Test critical field mappings timestamp_field = field_mapper.map_server_to_indexer_field("timestamp", "alert") assert timestamp_field == "@timestamp", "Timestamp should map to @timestamp" rule_level_field = field_mapper.map_server_to_indexer_field("rule.level", "alert") assert rule_level_field == "rule.level", "Rule level should map consistently" agent_id_field = field_mapper.map_server_to_indexer_field("agent.id", "alert") assert agent_id_field == "agent.id", "Agent ID should map consistently" # Test reverse mapping server_timestamp = field_mapper.map_indexer_to_server_field("@timestamp", "alert") assert server_timestamp == "timestamp", "Should reverse map @timestamp to timestamp" # Test index patterns alerts_pattern = field_mapper.get_index_pattern("alerts") assert alerts_pattern == "wazuh-alerts-4.x-*", "Should use correct alerts index pattern" vulns_pattern = field_mapper.get_index_pattern("vulnerabilities") assert vulns_pattern == "wazuh-states-vulnerabilities-*", "Should use correct vulnerabilities index pattern" @pytest.mark.asyncio async def test_api_fallback_mechanism(self, wazuh_config, mock_server_api_responses, mock_indexer_api_responses): """Test API fallback mechanism when one API is unavailable.""" with patch('aiohttp.ClientSession') as mock_session: # Mock scenario: Indexer API fails, Server API works def side_effect(*args, **kwargs): url = kwargs.get('url', str(args[1]) if len(args) > 1 else '') mock_response = AsyncMock() if ':9200' in url: # Indexer API call mock_response.status = 503 # Service unavailable mock_response.json = AsyncMock(return_value={"error": "Service unavailable"}) else: # Server API call mock_response.status = 200 if '/alerts' in url: # In 4.8+, Server API alerts should return 404 mock_response.status = 404 mock_response.json = AsyncMock(return_value={"error": "Not found"}) else: mock_response.json = AsyncMock(return_value=mock_server_api_responses["agents"]) return mock_response mock_session.return_value.__aenter__.return_value.request.return_value.__aenter__ = AsyncMock(side_effect=side_effect) client_manager = WazuhClientManager(wazuh_config) await client_manager.__aenter__() # Test agents query (should work via Server API) agents_result = await client_manager.get_agents() assert "data" in agents_result, "Should get agents via Server API" assert agents_result["data"]["total_affected_items"] == 2, "Should return correct agent count" # Test alerts query (should fail gracefully) try: await client_manager.get_alerts(limit=10) assert False, "Should raise exception when both APIs fail for alerts" except Exception as e: assert "404" in str(e) or "Service unavailable" in str(e), "Should provide meaningful error" await client_manager.__aexit__(None, None, None) @pytest.mark.asyncio async def test_ssl_configuration_enforcement(self, wazuh_config): """Test SSL configuration enforcement in production.""" # Test production SSL enforcement prod_config = wazuh_config.copy() prod_config.verify_ssl = True prod_config.indexer_verify_ssl = True with patch('aiohttp.ClientSession') as mock_session: # Verify SSL context is properly configured mock_connector = Mock() mock_session.return_value = Mock() indexer_client = WazuhIndexerClient(prod_config) await indexer_client._create_session() # SSL should be enforced assert indexer_client.verify_ssl is True, "SSL should be enforced" @pytest.mark.asyncio async def test_health_check_comprehensive(self, wazuh_config, mock_server_api_responses, mock_indexer_api_responses): """Test comprehensive health check for both APIs.""" with patch('aiohttp.ClientSession') as mock_session: def side_effect(*args, **kwargs): url = kwargs.get('url', str(args[1]) if len(args) > 1 else '') mock_response = AsyncMock() mock_response.status = 200 if ':9200' in url: # Indexer API if '_cluster/health' in url: mock_response.json = AsyncMock(return_value=mock_indexer_api_responses["cluster_health"]) else: # Server API if url.endswith('/'): # Root endpoint mock_response.json = AsyncMock(return_value=mock_server_api_responses["version_info"]) return mock_response mock_session.return_value.__aenter__.return_value.request.return_value.__aenter__ = AsyncMock(side_effect=side_effect) client_manager = WazuhClientManager(wazuh_config) await client_manager.__aenter__() # Test comprehensive health check health_result = await client_manager.health_check() # Verify health check structure assert "server_api" in health_result, "Should include Server API health" assert "indexer_api" in health_result, "Should include Indexer API health" assert "overall_status" in health_result, "Should include overall status" assert "wazuh_version" in health_result, "Should include version info" assert "using_indexer_for_alerts" in health_result, "Should include API routing info" # Verify individual API health assert health_result["server_api"]["status"] == "healthy", "Server API should be healthy" assert health_result["indexer_api"]["status"] in ["healthy", "green"], "Indexer API should be healthy" await client_manager.__aexit__(None, None, None) @pytest.mark.asyncio async def test_query_optimization_and_performance(self, wazuh_config, mock_indexer_api_responses): """Test query optimization features for performance.""" with patch('aiohttp.ClientSession') as mock_session: # Track query parameters captured_queries = [] async def capture_request(*args, **kwargs): if 'json' in kwargs: captured_queries.append(kwargs['json']) mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value=mock_indexer_api_responses["alerts_search"]) return mock_response mock_session.return_value.__aenter__.return_value.request = AsyncMock(side_effect=capture_request) indexer_client = WazuhIndexerClient(wazuh_config) await indexer_client.__aenter__() # Test optimized query await indexer_client.search_alerts( limit=100, level=5, sort="-timestamp", agent_id="001" ) # Verify query optimization query = captured_queries[0] # Should use bool query with filters for performance assert "bool" in query["query"], "Should use bool query" assert "filter" in query["query"]["bool"], "Should use filters for performance" # Should exclude large fields assert "_source" in query, "Should have source filtering" assert "excludes" in query["_source"], "Should exclude large fields" assert "full_log" in query["_source"]["excludes"], "Should exclude full_log field" # Should use proper sort field assert "sort" in query, "Should have sort configuration" sort_field = list(query["sort"][0].keys())[0] assert "@timestamp" in sort_field, "Should sort by @timestamp" # Should have aggregations for monitoring assert "aggs" in query, "Should include aggregations" assert "rule_levels" in query["aggs"], "Should have rule levels aggregation" await indexer_client.__aexit__(None, None, None) def test_version_compatibility_matrix(self): """Test version compatibility matrix.""" # Test 4.8.0+ compatibility config_48 = WazuhConfig( host="test-host", username="test", password="SecurePassword123!", wazuh_version="4.8.0" ) client_manager = WazuhClientManager(config_48) assert client_manager._is_version_48_or_later(), "4.8.0 should be considered 4.8+" # Test 4.9.x compatibility config_49 = WazuhConfig( host="test-host", username="test", password="SecurePassword123!", wazuh_version="4.9.1" ) client_manager = WazuhClientManager(config_49) assert client_manager._is_version_48_or_later(), "4.9.1 should be considered 4.8+" # Test 4.7.x compatibility (should not use Indexer by default) config_47 = WazuhConfig( host="test-host", username="test", password="SecurePassword123!", wazuh_version="4.7.2", use_indexer_for_alerts=False ) client_manager = WazuhClientManager(config_47) assert not client_manager._is_version_48_or_later(), "4.7.2 should not be considered 4.8+" @pytest.mark.asyncio async def test_production_error_recovery(self, wazuh_config): """Test production error recovery scenarios.""" from wazuh_mcp_server.utils.production_error_handler import production_error_handler # Test network error recovery retry_count = 0 async def flaky_operation(): nonlocal retry_count retry_count += 1 if retry_count < 3: raise ConnectionError("Network error") return {"success": True, "attempt": retry_count} result = await production_error_handler.execute_with_retry( flaky_operation, "test_recovery", "indexer", "/test" ) assert result["success"] is True, "Should recover from network errors" assert result["attempt"] == 3, "Should retry correct number of times" def test_minimum_version_enforcement(self): """Test that minimum Wazuh version is properly enforced.""" from wazuh_mcp_server.__version__ import __min_wazuh_version__ # Verify minimum version is set correctly assert __min_wazuh_version__ == "4.5.0", "Minimum version should be 4.5.0" # Test version comparison logic config = WazuhConfig( host="test-host", username="test", password="SecurePassword123!", wazuh_version="4.7.5" ) client_manager = WazuhClientManager(config) # With explicit configuration, it should still work but not use Indexer by default assert not client_manager._should_use_indexer_for_alerts(), "Should not use Indexer for pre-4.8" if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"])

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gensecaihq/Wazuh-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server