Skip to main content
Glama

MCP Server for Splunk

Apache 2.0
16
  • Apple
  • Linux
test_alerts.pyβ€’7.12 kB
""" Tests for alerts tools. Tests the ListTriggeredAlerts tool using FastMCP patterns with minimal mocking. """ from unittest.mock import Mock, patch import pytest class TestListTriggeredAlerts: """Test suite for ListTriggeredAlerts tool.""" @pytest.fixture def mock_service(self): """Create mock Splunk service for testing.""" service = Mock() # Mock alert group with multiple alerts alert_group1 = Mock() alert_group1.name = "High CPU Alert" alert_group1.count = 2 alert_group1.content = {"description": "CPU usage alert"} alert_group1.state = {"enabled": True} alert_group1.access = {"read": True, "write": True} # Mock individual alerts in the group alert1 = Mock() alert1.trigger_time = "2024-01-15T10:30:00" alert1.sid = "scheduler_admin_U3BsdW5rX01DTFNWTAV2ZXI_YWxlcnRfMDE" alert1.saved_search_name = "High CPU Alert" alert1.app = "search" alert1.owner = "admin" alert1.trigger_reason = "Number of results: 5 (greater than 3)" alert1.digest_mode = False alert1.result_count = 5 alert1.server_host = "splunk-server1" alert1.server_uri = "https://splunk-server1:8089" alert2 = Mock() alert2.trigger_time = "2024-01-15T10:25:00" alert2.sid = "scheduler_admin_U3BsdW5rX01DTFNWTAV2ZXI_YWxlcnRfMDI" alert2.saved_search_name = "High CPU Alert" alert2.app = "search" alert2.owner = "admin" alert2.trigger_reason = "Number of results: 4 (greater than 3)" alert2.digest_mode = False alert2.result_count = 4 alert2.server_host = "splunk-server2" alert2.server_uri = "https://splunk-server2:8089" alert_group1.alerts = [alert1, alert2] # Mock second alert group with one alert alert_group2 = Mock() alert_group2.name = "Disk Space Alert" alert_group2.count = 1 alert_group2.content = {"description": "Disk space alert"} alert_group2.state = {"enabled": True} alert_group2.access = {"read": True, "write": False} alert3 = Mock() alert3.trigger_time = "2024-01-15T09:45:00" alert3.sid = "scheduler_admin_U3BsdW5rX01DTFNWTAV2ZXI_YWxlcnRfMDM" alert3.saved_search_name = "Disk Space Alert" alert3.app = "unix" alert3.owner = "admin" alert3.trigger_reason = "Number of results: 1 (greater than 0)" alert3.digest_mode = True alert3.result_count = 1 alert3.server_host = "splunk-server1" alert3.server_uri = "https://splunk-server1:8089" alert_group2.alerts = [alert3] # Configure service.fired_alerts to return our mock groups service.fired_alerts.return_value = [alert_group1, alert_group2] return service async def test_list_triggered_alerts_success( self, fastmcp_client, extract_tool_result, mock_service ): """Test successful listing of triggered alerts.""" # Mock the fired_alerts method to return our test data with patch.object( mock_service, "fired_alerts", return_value=mock_service.fired_alerts.return_value ): async with fastmcp_client as client: # Execute tool through FastMCP result = await client.call_tool("list_triggered_alerts", {}) data = extract_tool_result(result) # In test mode, we expect either success or error status if data.get("status") == "success": # Verify response structure assert "triggered_alerts" in data assert "total_alert_groups" in data assert "total_individual_alerts" in data assert "search_parameters" in data # Verify search parameters params = data["search_parameters"] assert params["count"] == 50 # default assert params["earliest_time"] == "-24h@h" # default assert params["latest_time"] == "now" # default assert params["search_filter"] is None elif data.get("status") == "error": # In test environment, Splunk may not be available - this is expected assert "error" in data else: # Ensure we get some kind of response assert isinstance(data, dict) async def test_list_triggered_alerts_with_parameters(self, fastmcp_client, extract_tool_result): """Test listing triggered alerts with custom parameters.""" async with fastmcp_client as client: # Execute tool with custom parameters result = await client.call_tool( "list_triggered_alerts", {"count": 100, "earliest_time": "-1h@h", "latest_time": "-30m@m", "search": "CPU"}, ) data = extract_tool_result(result) # Verify we get a response assert isinstance(data, dict) # If successful, verify parameters were processed if data.get("status") == "success": params = data["search_parameters"] assert params["count"] == 100 assert params["earliest_time"] == "-1h@h" assert params["latest_time"] == "-30m@m" assert params["search_filter"] == "CPU" async def test_list_triggered_alerts_basic_functionality( self, fastmcp_client, extract_tool_result ): """Test basic alerts tool functionality.""" async with fastmcp_client as client: # Execute tool with basic parameters result = await client.call_tool("list_triggered_alerts", {}) data = extract_tool_result(result) # Verify we get a proper response structure assert isinstance(data, dict) # Should have status field assert "status" in data # If successful, should have expected structure if data.get("status") == "success": assert "triggered_alerts" in data assert "total_alert_groups" in data assert "total_individual_alerts" in data assert "search_parameters" in data async def test_list_triggered_alerts_tool_availability(self, fastmcp_client): """Test that the list_triggered_alerts tool is available.""" async with fastmcp_client as client: tools = await client.list_tools() tool_names = [tool.name for tool in tools] # Verify our new tool is available assert "list_triggered_alerts" in tool_names # Find the tool and check its metadata alerts_tool = next(tool for tool in tools if tool.name == "list_triggered_alerts") assert alerts_tool.description is not None assert "alert" in alerts_tool.description.lower() assert alerts_tool.inputSchema is not None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/deslicer/mcp-for-splunk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server