Skip to main content
Glama

Wazuh MCP Server

by gensecaihq
test_running_agents.py31.7 kB
"""Tests for the get_wazuh_running_agents tool.""" import pytest import json from datetime import datetime, timedelta from unittest.mock import AsyncMock, MagicMock, patch from wazuh_mcp_server.main import WazuhMCPServer from wazuh_mcp_server.utils.validation import validate_running_agents_query, ValidationError @pytest.fixture def mock_agents_data(): """Mock agent data for testing.""" base_time = datetime.utcnow() return [ { "id": "001", "name": "web-server-01", "status": "active", "last_keep_alive": (base_time - timedelta(minutes=2)).isoformat() + "Z", "os": { "platform": "ubuntu", "version": "20.04" }, "version": "4.8.0", "group": ["web", "production"], "node_name": "master", "ip": "192.168.1.10", "register_ip": "192.168.1.10" }, { "id": "002", "name": "db-server-01", "status": "active", "last_keep_alive": (base_time - timedelta(minutes=10)).isoformat() + "Z", "os": { "platform": "centos", "version": "8" }, "version": "4.8.0", "group": ["database", "production"], "node_name": "master", "ip": "192.168.1.20", "register_ip": "192.168.1.20" }, { "id": "003", "name": "win-server-01", "status": "disconnected", "last_keep_alive": (base_time - timedelta(hours=2)).isoformat() + "Z", "os": { "platform": "windows", "version": "2019" }, "version": "4.7.5", "group": ["windows", "production"], "node_name": "worker-1", "ip": "192.168.1.30", "register_ip": "192.168.1.30" }, { "id": "004", "name": "test-server-01", "status": "active", "last_keep_alive": (base_time - timedelta(minutes=1)).isoformat() + "Z", "os": { "platform": "ubuntu", "version": "22.04" }, "version": "4.8.1", "group": ["test"], "node_name": "master", "ip": "192.168.1.40", "register_ip": "192.168.1.40" }, { "id": "005", "name": "dev-server-01", "status": "never_connected", "os": { "platform": "debian", "version": "11" }, "version": "4.8.0", "group": ["development"], "node_name": "worker-2", "ip": "192.168.1.50", "register_ip": "192.168.1.50" } ] @pytest.fixture def mock_agent_stats(): """Mock agent stats data.""" return { "001": { "data": { "cpu_usage": 15.2, "memory_usage": 45.8, "disk_usage": 67.3, "events_received": 1250, "events_sent": 1248 } }, "002": { "data": { "cpu_usage": 8.1, "memory_usage": 52.3, "disk_usage": 78.9, "events_received": 890, "events_sent": 888 } } } class TestRunningAgentsValidation: """Test validation of running agents query parameters.""" def test_valid_basic_query(self): """Test validation with basic valid parameters.""" params = {"group_by": "status"} result = validate_running_agents_query(params) assert result.group_by == "status" assert result.inactive_threshold == 300 assert result.include_disconnected is False assert result.max_agents == 1000 def test_valid_complete_query(self): """Test validation with all parameters.""" params = { "status_filter": ["active", "disconnected"], "os_filter": ["ubuntu", "centos"], "version_filter": "4.8.0", "group_filter": ["production", "web"], "inactive_threshold": 600, "include_disconnected": True, "include_health_metrics": False, "include_last_activity": False, "group_by": "os", "max_agents": 500 } result = validate_running_agents_query(params) assert result.status_filter == ["active", "disconnected"] assert result.os_filter == ["ubuntu", "centos"] assert result.version_filter == "4.8.0" assert result.group_filter == ["production", "web"] assert result.inactive_threshold == 600 assert result.include_disconnected is True assert result.group_by == "os" assert result.max_agents == 500 def test_invalid_status_filter(self): """Test validation with invalid status.""" params = {"status_filter": ["invalid_status"]} with pytest.raises(ValidationError): validate_running_agents_query(params) def test_invalid_group_by(self): """Test validation with invalid group_by field.""" params = {"group_by": "invalid_field"} with pytest.raises(ValidationError): validate_running_agents_query(params) def test_invalid_inactive_threshold(self): """Test validation with invalid inactive threshold.""" # Test below minimum params = {"inactive_threshold": 30} with pytest.raises(ValidationError): validate_running_agents_query(params) # Test above maximum params = {"inactive_threshold": 4000} with pytest.raises(ValidationError): validate_running_agents_query(params) def test_max_agents_boundary(self): """Test max_agents boundary validation.""" # Test minimum params = {"max_agents": 1} result = validate_running_agents_query(params) assert result.max_agents == 1 # Test maximum params = {"max_agents": 5000} result = validate_running_agents_query(params) assert result.max_agents == 5000 # Test over maximum params = {"max_agents": 6000} with pytest.raises(ValidationError): validate_running_agents_query(params) @pytest.mark.asyncio class TestRunningAgentsTool: """Test the running agents tool functionality.""" @pytest.fixture async def wazuh_server(self): """Create a mock Wazuh MCP server.""" with patch('wazuh_mcp_server.main.WazuhConfig') as mock_config: mock_config.from_env.return_value = MagicMock() mock_config.from_env.return_value.log_level = "INFO" mock_config.from_env.return_value.debug = False mock_config.from_env.return_value.request_timeout_seconds = 30 server = WazuhMCPServer() server.api_client = AsyncMock() return server async def test_basic_running_agents(self, wazuh_server, mock_agents_data): """Test basic running agents functionality.""" # Mock API response wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": mock_agents_data} } arguments = {"group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) assert len(result) == 1 response_data = json.loads(result[0].text) # Check basic structure assert "query_parameters" in response_data assert "summary" in response_data assert "grouped_analysis" in response_data assert "infrastructure_health" in response_data assert "agent_details" in response_data assert "recommendations" in response_data assert "analysis_metadata" in response_data # Check summary data assert response_data["summary"]["total_agents"] == 5 assert response_data["summary"]["active_agents"] == 3 assert response_data["summary"]["disconnected_agents"] == 1 assert response_data["summary"]["never_connected_agents"] == 1 # Check grouped analysis assert response_data["grouped_analysis"]["grouping_field"] == "status" groups = response_data["grouped_analysis"]["groups"] assert "active" in groups assert "disconnected" in groups assert "never_connected" in groups async def test_status_grouping(self, wazuh_server, mock_agents_data): """Test grouping by status.""" wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": mock_agents_data} } arguments = {"group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) groups = response_data["grouped_analysis"]["groups"] # Check status distribution assert groups["active"]["count"] == 3 assert groups["disconnected"]["count"] == 1 assert groups["never_connected"]["count"] == 1 # Check percentages assert groups["active"]["percentage"] == 60.0 assert groups["disconnected"]["percentage"] == 20.0 assert groups["never_connected"]["percentage"] == 20.0 async def test_os_grouping(self, wazuh_server, mock_agents_data): """Test grouping by operating system.""" wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": mock_agents_data} } arguments = {"group_by": "os"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) groups = response_data["grouped_analysis"]["groups"] # Check OS distribution assert "ubuntu" in groups assert "centos" in groups assert "windows" in groups assert "debian" in groups assert groups["ubuntu"]["count"] == 2 # web-server-01, test-server-01 assert groups["centos"]["count"] == 1 # db-server-01 assert groups["windows"]["count"] == 1 # win-server-01 assert groups["debian"]["count"] == 1 # dev-server-01 async def test_version_grouping(self, wazuh_server, mock_agents_data): """Test grouping by agent version.""" wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": mock_agents_data} } arguments = {"group_by": "version"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) groups = response_data["grouped_analysis"]["groups"] # Check version distribution assert "4.8.0" in groups assert "4.7.5" in groups assert "4.8.1" in groups assert groups["4.8.0"]["count"] == 3 # web, db, dev servers assert groups["4.7.5"]["count"] == 1 # win server assert groups["4.8.1"]["count"] == 1 # test server async def test_status_filtering(self, wazuh_server, mock_agents_data): """Test filtering by agent status.""" # Filter only active agents active_agents = [agent for agent in mock_agents_data if agent["status"] == "active"] wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": active_agents} } arguments = {"status_filter": ["active"], "group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) # Should only have active agents assert response_data["summary"]["total_agents"] == 3 assert response_data["summary"]["active_agents"] == 3 assert response_data["summary"]["disconnected_agents"] == 0 async def test_os_filtering(self, wazuh_server, mock_agents_data): """Test filtering by operating system.""" # Filter only Ubuntu agents ubuntu_agents = [agent for agent in mock_agents_data if agent["os"]["platform"] == "ubuntu"] wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": ubuntu_agents} } arguments = {"os_filter": ["ubuntu"], "group_by": "os"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) # Should only have Ubuntu agents assert response_data["summary"]["total_agents"] == 2 groups = response_data["grouped_analysis"]["groups"] assert "ubuntu" in groups assert groups["ubuntu"]["count"] == 2 async def test_inactive_threshold_analysis(self, wazuh_server, mock_agents_data): """Test inactive threshold analysis.""" wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": mock_agents_data} } # Set a very low threshold (120 seconds) to make some agents inactive arguments = {"inactive_threshold": 120, "group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) # db-server-01 should be considered inactive (last seen 10 minutes ago) assert response_data["summary"]["inactive_agents"] > 0 assert response_data["summary"]["health_issues_count"] > 0 # Check health issues health_issues = response_data["infrastructure_health"]["health_issues"] assert len(health_issues) > 0 assert any(issue["issue"] == "inactive" for issue in health_issues) async def test_health_metrics_collection(self, wazuh_server, mock_agents_data, mock_agent_stats): """Test health metrics collection.""" wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": mock_agents_data} } def mock_get_agent_stats(agent_id): return mock_agent_stats.get(agent_id, {"data": {}}) wazuh_server.api_client.get_agent_stats.side_effect = mock_get_agent_stats arguments = {"include_health_metrics": True, "group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) # Should have health metrics assert "health_metrics" in response_data health_metrics = response_data["health_metrics"] assert "agent_stats" in health_metrics assert len(health_metrics["agent_stats"]) > 0 # Check that stats were collected for stat in health_metrics["agent_stats"]: assert "agent_id" in stat assert "stats" in stat async def test_activity_analysis(self, wazuh_server, mock_agents_data): """Test activity analysis functionality.""" wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": mock_agents_data} } arguments = {"include_last_activity": True, "group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) # Should have activity analysis assert "activity_analysis" in response_data activity = response_data["activity_analysis"] assert "last_seen_analysis" in activity assert "activity_patterns" in activity # Check last seen analysis buckets last_seen = activity["last_seen_analysis"] assert "last_5_minutes" in last_seen assert "last_15_minutes" in last_seen assert "last_hour" in last_seen assert "last_day" in last_seen assert "older" in last_seen # Should have some agents in recent buckets assert last_seen["last_5_minutes"] > 0 or last_seen["last_15_minutes"] > 0 async def test_infrastructure_health_scoring(self, wazuh_server, mock_agents_data): """Test infrastructure health scoring.""" wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": mock_agents_data} } arguments = {"group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) health = response_data["infrastructure_health"] # Check health score structure assert "overall_health_score" in health assert "health_rating" in health assert "active_percentage" in health assert "coverage_analysis" in health # Health score should be reasonable assert 0 <= health["overall_health_score"] <= 100 assert health["health_rating"] in ["Excellent", "Good", "Fair", "Poor", "Critical"] # Coverage analysis should show diversity coverage = health["coverage_analysis"] assert coverage["operating_systems"] >= 3 # ubuntu, centos, windows, debian assert coverage["agent_versions"] >= 2 # Multiple versions in test data async def test_recommendations_generation(self, wazuh_server, mock_agents_data): """Test recommendations generation.""" wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": mock_agents_data} } arguments = {"group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) recommendations = response_data["recommendations"] # Should have recommendations due to disconnected agents assert len(recommendations) > 0 # Check recommendation structure for rec in recommendations: assert "priority" in rec assert "category" in rec assert "title" in rec assert "description" in rec assert "action" in rec assert "impact" in rec assert rec["priority"] in ["HIGH", "MEDIUM", "LOW"] # Should have connectivity recommendation due to disconnected agent connectivity_recs = [r for r in recommendations if r["category"] == "connectivity"] assert len(connectivity_recs) > 0 async def test_empty_agent_response(self, wazuh_server): """Test handling of empty agent response.""" wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": []} } arguments = {"group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) # Should handle empty response gracefully assert response_data["summary"]["total_agents"] == 0 assert "No agents found" in response_data["summary"]["message"] assert response_data["analysis_metadata"]["total_agents_analyzed"] == 0 async def test_agent_details_structure(self, wazuh_server, mock_agents_data): """Test agent details structure.""" wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": mock_agents_data} } arguments = {"group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) agent_details = response_data["agent_details"] # Should have agent details assert len(agent_details) > 0 # Check agent detail structure for agent in agent_details: assert "id" in agent assert "name" in agent assert "status" in agent assert "os" in agent assert "version" in agent assert "group" in agent assert "node" in agent assert "ip" in agent async def test_error_handling(self, wazuh_server): """Test error handling in running agents analysis.""" # Mock API error wazuh_server.api_client.get_agents.side_effect = Exception("API Error") arguments = {"group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) # Should return error response response_data = json.loads(result[0].text) assert "error" in response_data assert "API Error" in response_data["error"] class TestRunningAgentsHelperMethods: """Test helper methods for running agents analysis.""" @pytest.fixture def wazuh_server(self): """Create a minimal server instance for testing helper methods.""" with patch('wazuh_mcp_server.main.WazuhConfig') as mock_config: mock_config.from_env.return_value = MagicMock() mock_config.from_env.return_value.log_level = "INFO" mock_config.from_env.return_value.debug = False server = WazuhMCPServer() return server def test_calculate_infrastructure_health_score(self, wazuh_server): """Test infrastructure health score calculation.""" agents = [{"id": f"{i:03d}", "version": "4.8.0"} for i in range(10)] active_agents = agents[:8] # 80% active health_issues = [{"issue": "test"}] * 2 # 2 issues score = wazuh_server._calculate_infrastructure_health_score(agents, active_agents, health_issues) # Should be around 80% - 4% (2 issues * 2) = 76% assert 70 <= score <= 80 assert isinstance(score, float) def test_get_health_rating(self, wazuh_server): """Test health rating classification.""" assert wazuh_server._get_health_rating(95) == "Excellent" assert wazuh_server._get_health_rating(85) == "Good" assert wazuh_server._get_health_rating(65) == "Fair" assert wazuh_server._get_health_rating(45) == "Poor" assert wazuh_server._get_health_rating(25) == "Critical" def test_analyze_agent_activity(self, wazuh_server, mock_agents_data): """Test agent activity analysis.""" activity = wazuh_server._analyze_agent_activity(mock_agents_data, 300) assert "last_seen_analysis" in activity assert "activity_patterns" in activity # Check buckets structure buckets = activity["last_seen_analysis"] assert all(bucket in buckets for bucket in [ "last_5_minutes", "last_15_minutes", "last_hour", "last_day", "older" ]) # Should have some recent activity recent_activity = buckets["last_5_minutes"] + buckets["last_15_minutes"] assert recent_activity > 0 def test_generate_infrastructure_recommendations(self, wazuh_server): """Test recommendation generation logic.""" # Mock summary with issues summary = { "disconnected_agents": 2, "inactive_agents": 3 } # Mock health with low score health = { "overall_health_score": 65, "coverage_analysis": { "agent_versions": 5 } } issues = [{"issue": "test"}] * 2 recommendations = wazuh_server._generate_infrastructure_recommendations(summary, health, issues) # Should generate multiple recommendations assert len(recommendations) > 0 # Check for expected recommendation categories categories = [rec["category"] for rec in recommendations] assert "connectivity" in categories # For disconnected agents assert "health" in categories # For low health score assert "monitoring" in categories # For inactive agents # Check priority levels priorities = [rec["priority"] for rec in recommendations] assert all(priority in ["HIGH", "MEDIUM", "LOW"] for priority in priorities) @pytest.mark.asyncio class TestRunningAgentsEdgeCases: """Test edge cases for running agents analysis.""" @pytest.fixture async def wazuh_server(self): """Create a mock server for edge case testing.""" with patch('wazuh_mcp_server.main.WazuhConfig') as mock_config: mock_config.from_env.return_value = MagicMock() mock_config.from_env.return_value.log_level = "INFO" mock_config.from_env.return_value.debug = False mock_config.from_env.return_value.request_timeout_seconds = 30 server = WazuhMCPServer() server.api_client = AsyncMock() return server async def test_malformed_agent_data(self, wazuh_server): """Test handling of malformed agent data.""" # Mock response with malformed data malformed_agents = [ {"id": "001"}, # Missing required fields {"invalid": "structure"}, # Completely wrong structure { "id": "002", "name": "test-agent", "status": "active", "last_keep_alive": "invalid_timestamp" } ] wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": malformed_agents} } arguments = {"group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) # Should not crash and should handle gracefully response_data = json.loads(result[0].text) assert "summary" in response_data assert response_data["summary"]["total_agents"] >= 0 async def test_health_metrics_collection_errors(self, wazuh_server, mock_agents_data): """Test handling of health metrics collection errors.""" wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": mock_agents_data[:2]} # Limit for testing } # Mock stats collection to always fail wazuh_server.api_client.get_agent_stats.side_effect = Exception("Stats API Error") arguments = {"include_health_metrics": True, "group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) # Should handle errors gracefully assert "health_metrics" in response_data health_metrics = response_data["health_metrics"] assert "collection_errors" in health_metrics assert len(health_metrics["collection_errors"]) > 0 async def test_large_agent_dataset(self, wazuh_server): """Test handling of large agent datasets.""" # Create large dataset large_agent_list = [] for i in range(1000): large_agent_list.append({ "id": f"{i:03d}", "name": f"agent-{i:03d}", "status": "active" if i % 4 != 0 else "disconnected", "os": {"platform": "ubuntu", "version": "20.04"}, "version": "4.8.0", "group": ["production"], "node_name": "master", "last_keep_alive": datetime.utcnow().isoformat() + "Z" }) wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": large_agent_list} } arguments = {"group_by": "status", "max_agents": 1000} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) # Should handle large dataset successfully assert response_data["summary"]["total_agents"] == 1000 assert "analysis_metadata" in response_data assert "processing_time_seconds" in response_data["analysis_metadata"] # Agent details should be limited for performance assert len(response_data["agent_details"]) <= 50 async def test_missing_last_keep_alive(self, wazuh_server): """Test handling agents without last_keep_alive data.""" agents_without_keepalive = [ { "id": "001", "name": "agent-001", "status": "active", "os": {"platform": "ubuntu", "version": "20.04"}, "version": "4.8.0" # Missing last_keep_alive }, { "id": "002", "name": "agent-002", "status": "active", "last_keep_alive": None, # Explicit None "os": {"platform": "centos", "version": "8"}, "version": "4.8.0" } ] wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": agents_without_keepalive} } arguments = {"include_last_activity": True, "group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) # Should handle missing keepalive gracefully assert response_data["summary"]["total_agents"] == 2 assert response_data["summary"]["truly_active_agents"] == 2 # Should assume active # Activity analysis should not crash if "activity_analysis" in response_data: assert "last_seen_analysis" in response_data["activity_analysis"] async def test_all_agents_disconnected(self, wazuh_server): """Test scenario where all agents are disconnected.""" disconnected_agents = [ { "id": f"{i:03d}", "name": f"agent-{i:03d}", "status": "disconnected", "os": {"platform": "ubuntu", "version": "20.04"}, "version": "4.8.0" } for i in range(5) ] wazuh_server.api_client.get_agents.return_value = { "data": {"affected_items": disconnected_agents} } arguments = {"include_disconnected": True, "group_by": "status"} result = await wazuh_server._handle_get_wazuh_running_agents(arguments) response_data = json.loads(result[0].text) # Should handle all disconnected scenario assert response_data["summary"]["total_agents"] == 5 assert response_data["summary"]["active_agents"] == 0 assert response_data["summary"]["disconnected_agents"] == 5 # Health score should be very low assert response_data["infrastructure_health"]["overall_health_score"] < 50 assert response_data["infrastructure_health"]["health_rating"] in ["Poor", "Critical"] # Should have high priority recommendations recommendations = response_data["recommendations"] high_priority_recs = [r for r in recommendations if r["priority"] == "HIGH"] assert len(high_priority_recs) > 0

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gensecaihq/Wazuh-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server