test_mcp_server.pyβ’13.7 kB
"""
Pytest tests for MCP Server for Splunk
Tests using FastMCP's in-memory testing patterns following best practices from:
https://gofastmcp.com/patterns/testing
"""
import time
import pytest
from fastmcp.exceptions import ToolError
# Integration tests using FastMCP Client (recommended approach)
@pytest.mark.integration
class TestMCPClientIntegration:
"""Integration tests using FastMCP in-memory client following FastMCP best practices"""
async def test_fastmcp_client_health_check(self, fastmcp_client, extract_tool_result):
"""Test health check via FastMCP client"""
async with fastmcp_client as client:
# Call the health check tool
result = await client.call_tool("get_splunk_health")
health_data = extract_tool_result(result)
# The test should handle all possible states
assert "status" in health_data
assert health_data["status"] in ["connected", "disconnected", "error"]
async def test_fastmcp_client_list_tools(self, fastmcp_client):
"""Test listing tools via FastMCP client"""
async with fastmcp_client as client:
tools = await client.list_tools()
# Check that we have the expected tools
tool_names = [tool.name for tool in tools]
expected_tools = [
"get_splunk_health",
"list_indexes",
"run_oneshot_search",
"run_splunk_search",
"list_apps",
"list_users",
]
for expected_tool in expected_tools:
assert expected_tool in tool_names
async def test_fastmcp_client_list_resources(self, fastmcp_client):
"""Test listing resources via FastMCP client"""
async with fastmcp_client as client:
resources = await client.list_resources()
# Check that we have the health resource
resource_uris = [str(resource.uri) for resource in resources]
assert "health://status" in resource_uris
async def test_fastmcp_client_read_health_resource(self, fastmcp_client):
"""Test reading health resource via FastMCP client"""
async with fastmcp_client as client:
result = await client.read_resource("health://status")
assert len(result) > 0
assert hasattr(result[0], "text")
assert result[0].text == "OK"
async def test_fastmcp_client_ping(self, fastmcp_client):
"""Test ping functionality"""
async with fastmcp_client as client:
# Should not raise an exception
await client.ping()
@pytest.mark.integration
class TestSplunkToolsIntegration:
"""Integration tests for Splunk tools using FastMCP in-memory testing"""
async def test_splunk_health_check(self, fastmcp_client, extract_tool_result):
"""Test Splunk health check tool via FastMCP client"""
async with fastmcp_client as client:
result = await client.call_tool("get_splunk_health")
health_data = extract_tool_result(result)
assert "status" in health_data
# In test environment without Splunk connection, we expect error or disconnected
assert health_data["status"] in ["connected", "disconnected", "error"]
if health_data["status"] == "connected":
assert "version" in health_data
assert "server_name" in health_data
async def test_list_indexes(self, fastmcp_client, extract_tool_result):
"""Test listing Splunk indexes via FastMCP client"""
async with fastmcp_client as client:
result = await client.call_tool("list_indexes")
indexes_data = extract_tool_result(result)
# Should have either success response or error response
if "status" in indexes_data and indexes_data["status"] == "success":
assert "indexes" in indexes_data
assert "count" in indexes_data
assert isinstance(indexes_data["indexes"], list)
elif "status" in indexes_data and indexes_data["status"] == "error":
assert "error" in indexes_data
async def test_oneshot_search(self, fastmcp_client, extract_tool_result):
"""Test oneshot search via FastMCP client"""
async with fastmcp_client as client:
search_params = {
"query": "index=_internal | head 5",
"earliest_time": "-15m",
"latest_time": "now",
"max_results": 5,
}
result = await client.call_tool("run_oneshot_search", search_params)
search_data = extract_tool_result(result)
# Should have either results or error
if "status" in search_data and search_data["status"] == "success":
assert "results" in search_data
assert "results_count" in search_data
assert "query_executed" in search_data
elif "status" in search_data and search_data["status"] == "error":
assert "error" in search_data
async def test_job_search(self, fastmcp_client, extract_tool_result):
"""Test job-based search via FastMCP client"""
async with fastmcp_client as client:
search_params = {
"query": "index=_internal | stats count",
"earliest_time": "-5m",
"latest_time": "now",
}
result = await client.call_tool("run_splunk_search", search_params)
search_data = extract_tool_result(result)
# Should have either results or error
if "job_id" in search_data:
assert "results" in search_data
assert "scan_count" in search_data or "event_count" in search_data
elif "status" in search_data and search_data["status"] == "error":
assert "error" in search_data
async def test_list_apps(self, fastmcp_client, extract_tool_result):
"""Test listing Splunk apps via FastMCP client"""
async with fastmcp_client as client:
result = await client.call_tool("list_apps")
apps_data = extract_tool_result(result)
# Should have either apps or error
if "apps" in apps_data:
assert "count" in apps_data
assert isinstance(apps_data["apps"], list)
elif "status" in apps_data and apps_data["status"] == "error":
assert "error" in apps_data
async def test_list_users(self, fastmcp_client, extract_tool_result):
"""Test listing Splunk users via FastMCP client"""
async with fastmcp_client as client:
result = await client.call_tool("list_users")
users_data = extract_tool_result(result)
# Should have either users or error
if "users" in users_data:
assert "count" in users_data
assert isinstance(users_data["users"], list)
elif "status" in users_data and users_data["status"] == "error":
assert "error" in users_data
# Helper function tests
@pytest.mark.unit
class TestHelperFunctions:
"""Test helper functions and utilities"""
def test_extract_tool_result_with_json(self, extract_tool_result):
"""Test extracting JSON from tool result"""
class MockContent:
text = '{"status": "success", "data": "test"}'
mock_result = [MockContent()]
result = extract_tool_result(mock_result)
assert result["status"] == "success"
assert result["data"] == "test"
def test_extract_tool_result_with_plain_text(self, extract_tool_result):
"""Test extracting plain text from tool result"""
class MockContent:
text = "plain text response"
mock_result = [MockContent()]
result = extract_tool_result(mock_result)
assert result["raw_text"] == "plain text response"
def test_extract_tool_result_with_direct_data(self, extract_tool_result):
"""Test extracting data that's already in the right format"""
direct_data = {"status": "success", "count": 5}
result = extract_tool_result(direct_data)
assert result == direct_data
# Error handling tests
@pytest.mark.integration
class TestErrorHandling:
"""Test error handling and edge cases using FastMCP patterns"""
async def test_invalid_tool_call(self, fastmcp_client):
"""Test calling non-existent tool"""
async with fastmcp_client as client:
with pytest.raises(ToolError):
await client.call_tool("non_existent_tool")
async def test_invalid_tool_parameters(self, fastmcp_client):
"""Test calling tool with invalid parameters"""
async with fastmcp_client as client:
# Missing required parameter should raise an error
with pytest.raises(ToolError):
await client.call_tool("get_configurations", {})
async def test_search_with_invalid_query(self, fastmcp_client, extract_tool_result):
"""Test search tool with invalid query"""
async with fastmcp_client as client:
search_params = {
"query": "index=nonexistent_index invalid_command",
"earliest_time": "-1h",
"max_results": 5,
}
result = await client.call_tool("run_oneshot_search", search_params)
search_data = extract_tool_result(result)
# Should return an error status or handle gracefully
if "status" in search_data:
assert search_data["status"] in ["success", "error"]
# Performance/load testing
@pytest.mark.slow
@pytest.mark.integration
class TestPerformance:
"""Performance and load tests using FastMCP patterns"""
async def test_multiple_rapid_health_checks(self, fastmcp_client, extract_tool_result):
"""Test multiple rapid health check calls"""
async with fastmcp_client as client:
start_time = time.time()
# Call health check multiple times
for _ in range(10): # Reduced from 100 to be more reasonable
result = await client.call_tool("get_splunk_health")
health_data = extract_tool_result(result)
assert "status" in health_data
end_time = time.time()
duration = end_time - start_time
# Should complete 10 calls in under 5 seconds
assert duration < 5.0, f"Health checks took too long: {duration}s"
# Workflow integration tests
@pytest.mark.integration
class TestWorkflowIntegration:
"""Test realistic workflows using FastMCP patterns"""
async def test_discovery_workflow(self, fastmcp_client, extract_tool_result):
"""Test a realistic discovery workflow"""
async with fastmcp_client as client:
# 1. Check health first
health_result = await client.call_tool("get_splunk_health")
health_data = extract_tool_result(health_result)
assert "status" in health_data
# 2. List available indexes
indexes_result = await client.call_tool("list_indexes")
indexes_data = extract_tool_result(indexes_result)
# 3. List apps
apps_result = await client.call_tool("list_apps")
apps_data = extract_tool_result(apps_result)
# All should return structured data
for data in [health_data, indexes_data, apps_data]:
assert isinstance(data, dict)
async def test_search_workflow(self, fastmcp_client, extract_tool_result):
"""Test a realistic search workflow"""
async with fastmcp_client as client:
# 1. Start with a simple search
simple_search = await client.call_tool(
"run_oneshot_search", {"query": "| metadata type=hosts", "max_results": 5}
)
simple_data = extract_tool_result(simple_search)
assert isinstance(simple_data, dict)
# 2. Try a more complex search
complex_search = await client.call_tool(
"run_splunk_search",
{
"query": "| rest /services/server/info",
"earliest_time": "-1m",
"latest_time": "now",
},
)
complex_data = extract_tool_result(complex_search)
assert isinstance(complex_data, dict)
# Backward compatibility tests (for migration period)
@pytest.mark.integration
class TestBackwardCompatibility:
"""Test backward compatibility during migration to FastMCP patterns"""
@pytest.mark.skip(reason="Legacy fixtures deprecated - use fastmcp_client")
async def test_traefik_connection(self, traefik_client, mcp_helpers):
"""Legacy test - use fastmcp_client instead"""
pass
@pytest.mark.skip(reason="Legacy fixtures deprecated - use fastmcp_client")
async def test_direct_connection(self, direct_client, mcp_helpers):
"""Legacy test - use fastmcp_client instead"""
pass
async def test_resource_access_patterns(self, fastmcp_client):
"""Test that resources are accessible in expected ways"""
async with fastmcp_client as client:
# Test resource listing
resources = await client.list_resources()
assert len(resources) > 0
# Test specific resource access
health_resource = await client.read_resource("health://status")
assert len(health_resource) > 0