test_alerts.pyβ’7.12 kB
"""
Tests for alerts tools.
Tests the ListTriggeredAlerts tool using FastMCP patterns with minimal mocking.
"""
from unittest.mock import Mock, patch
import pytest
class TestListTriggeredAlerts:
"""Test suite for ListTriggeredAlerts tool."""
@pytest.fixture
def mock_service(self):
"""Create mock Splunk service for testing."""
service = Mock()
# Mock alert group with multiple alerts
alert_group1 = Mock()
alert_group1.name = "High CPU Alert"
alert_group1.count = 2
alert_group1.content = {"description": "CPU usage alert"}
alert_group1.state = {"enabled": True}
alert_group1.access = {"read": True, "write": True}
# Mock individual alerts in the group
alert1 = Mock()
alert1.trigger_time = "2024-01-15T10:30:00"
alert1.sid = "scheduler_admin_U3BsdW5rX01DTFNWTAV2ZXI_YWxlcnRfMDE"
alert1.saved_search_name = "High CPU Alert"
alert1.app = "search"
alert1.owner = "admin"
alert1.trigger_reason = "Number of results: 5 (greater than 3)"
alert1.digest_mode = False
alert1.result_count = 5
alert1.server_host = "splunk-server1"
alert1.server_uri = "https://splunk-server1:8089"
alert2 = Mock()
alert2.trigger_time = "2024-01-15T10:25:00"
alert2.sid = "scheduler_admin_U3BsdW5rX01DTFNWTAV2ZXI_YWxlcnRfMDI"
alert2.saved_search_name = "High CPU Alert"
alert2.app = "search"
alert2.owner = "admin"
alert2.trigger_reason = "Number of results: 4 (greater than 3)"
alert2.digest_mode = False
alert2.result_count = 4
alert2.server_host = "splunk-server2"
alert2.server_uri = "https://splunk-server2:8089"
alert_group1.alerts = [alert1, alert2]
# Mock second alert group with one alert
alert_group2 = Mock()
alert_group2.name = "Disk Space Alert"
alert_group2.count = 1
alert_group2.content = {"description": "Disk space alert"}
alert_group2.state = {"enabled": True}
alert_group2.access = {"read": True, "write": False}
alert3 = Mock()
alert3.trigger_time = "2024-01-15T09:45:00"
alert3.sid = "scheduler_admin_U3BsdW5rX01DTFNWTAV2ZXI_YWxlcnRfMDM"
alert3.saved_search_name = "Disk Space Alert"
alert3.app = "unix"
alert3.owner = "admin"
alert3.trigger_reason = "Number of results: 1 (greater than 0)"
alert3.digest_mode = True
alert3.result_count = 1
alert3.server_host = "splunk-server1"
alert3.server_uri = "https://splunk-server1:8089"
alert_group2.alerts = [alert3]
# Configure service.fired_alerts to return our mock groups
service.fired_alerts.return_value = [alert_group1, alert_group2]
return service
async def test_list_triggered_alerts_success(
self, fastmcp_client, extract_tool_result, mock_service
):
"""Test successful listing of triggered alerts."""
# Mock the fired_alerts method to return our test data
with patch.object(
mock_service, "fired_alerts", return_value=mock_service.fired_alerts.return_value
):
async with fastmcp_client as client:
# Execute tool through FastMCP
result = await client.call_tool("list_triggered_alerts", {})
data = extract_tool_result(result)
# In test mode, we expect either success or error status
if data.get("status") == "success":
# Verify response structure
assert "triggered_alerts" in data
assert "total_alert_groups" in data
assert "total_individual_alerts" in data
assert "search_parameters" in data
# Verify search parameters
params = data["search_parameters"]
assert params["count"] == 50 # default
assert params["earliest_time"] == "-24h@h" # default
assert params["latest_time"] == "now" # default
assert params["search_filter"] is None
elif data.get("status") == "error":
# In test environment, Splunk may not be available - this is expected
assert "error" in data
else:
# Ensure we get some kind of response
assert isinstance(data, dict)
async def test_list_triggered_alerts_with_parameters(self, fastmcp_client, extract_tool_result):
"""Test listing triggered alerts with custom parameters."""
async with fastmcp_client as client:
# Execute tool with custom parameters
result = await client.call_tool(
"list_triggered_alerts",
{"count": 100, "earliest_time": "-1h@h", "latest_time": "-30m@m", "search": "CPU"},
)
data = extract_tool_result(result)
# Verify we get a response
assert isinstance(data, dict)
# If successful, verify parameters were processed
if data.get("status") == "success":
params = data["search_parameters"]
assert params["count"] == 100
assert params["earliest_time"] == "-1h@h"
assert params["latest_time"] == "-30m@m"
assert params["search_filter"] == "CPU"
async def test_list_triggered_alerts_basic_functionality(
self, fastmcp_client, extract_tool_result
):
"""Test basic alerts tool functionality."""
async with fastmcp_client as client:
# Execute tool with basic parameters
result = await client.call_tool("list_triggered_alerts", {})
data = extract_tool_result(result)
# Verify we get a proper response structure
assert isinstance(data, dict)
# Should have status field
assert "status" in data
# If successful, should have expected structure
if data.get("status") == "success":
assert "triggered_alerts" in data
assert "total_alert_groups" in data
assert "total_individual_alerts" in data
assert "search_parameters" in data
async def test_list_triggered_alerts_tool_availability(self, fastmcp_client):
"""Test that the list_triggered_alerts tool is available."""
async with fastmcp_client as client:
tools = await client.list_tools()
tool_names = [tool.name for tool in tools]
# Verify our new tool is available
assert "list_triggered_alerts" in tool_names
# Find the tool and check its metadata
alerts_tool = next(tool for tool in tools if tool.name == "list_triggered_alerts")
assert alerts_tool.description is not None
assert "alert" in alerts_tool.description.lower()
assert alerts_tool.inputSchema is not None