Skip to main content
Glama

MCP Server for Splunk

Apache 2.0
16
  • Apple
  • Linux
test_mcp_server.pyβ€’13.7 kB
""" Pytest tests for MCP Server for Splunk Tests using FastMCP's in-memory testing patterns following best practices from: https://gofastmcp.com/patterns/testing """ import time import pytest from fastmcp.exceptions import ToolError # Integration tests using FastMCP Client (recommended approach) @pytest.mark.integration class TestMCPClientIntegration: """Integration tests using FastMCP in-memory client following FastMCP best practices""" async def test_fastmcp_client_health_check(self, fastmcp_client, extract_tool_result): """Test health check via FastMCP client""" async with fastmcp_client as client: # Call the health check tool result = await client.call_tool("get_splunk_health") health_data = extract_tool_result(result) # The test should handle all possible states assert "status" in health_data assert health_data["status"] in ["connected", "disconnected", "error"] async def test_fastmcp_client_list_tools(self, fastmcp_client): """Test listing tools via FastMCP client""" async with fastmcp_client as client: tools = await client.list_tools() # Check that we have the expected tools tool_names = [tool.name for tool in tools] expected_tools = [ "get_splunk_health", "list_indexes", "run_oneshot_search", "run_splunk_search", "list_apps", "list_users", ] for expected_tool in expected_tools: assert expected_tool in tool_names async def test_fastmcp_client_list_resources(self, fastmcp_client): """Test listing resources via FastMCP client""" async with fastmcp_client as client: resources = await client.list_resources() # Check that we have the health resource resource_uris = [str(resource.uri) for resource in resources] assert "health://status" in resource_uris async def test_fastmcp_client_read_health_resource(self, fastmcp_client): """Test reading health resource via FastMCP client""" async with fastmcp_client as client: result = await client.read_resource("health://status") assert len(result) > 0 assert hasattr(result[0], "text") assert result[0].text == "OK" async def test_fastmcp_client_ping(self, fastmcp_client): """Test ping functionality""" async with fastmcp_client as client: # Should not raise an exception await client.ping() @pytest.mark.integration class TestSplunkToolsIntegration: """Integration tests for Splunk tools using FastMCP in-memory testing""" async def test_splunk_health_check(self, fastmcp_client, extract_tool_result): """Test Splunk health check tool via FastMCP client""" async with fastmcp_client as client: result = await client.call_tool("get_splunk_health") health_data = extract_tool_result(result) assert "status" in health_data # In test environment without Splunk connection, we expect error or disconnected assert health_data["status"] in ["connected", "disconnected", "error"] if health_data["status"] == "connected": assert "version" in health_data assert "server_name" in health_data async def test_list_indexes(self, fastmcp_client, extract_tool_result): """Test listing Splunk indexes via FastMCP client""" async with fastmcp_client as client: result = await client.call_tool("list_indexes") indexes_data = extract_tool_result(result) # Should have either success response or error response if "status" in indexes_data and indexes_data["status"] == "success": assert "indexes" in indexes_data assert "count" in indexes_data assert isinstance(indexes_data["indexes"], list) elif "status" in indexes_data and indexes_data["status"] == "error": assert "error" in indexes_data async def test_oneshot_search(self, fastmcp_client, extract_tool_result): """Test oneshot search via FastMCP client""" async with fastmcp_client as client: search_params = { "query": "index=_internal | head 5", "earliest_time": "-15m", "latest_time": "now", "max_results": 5, } result = await client.call_tool("run_oneshot_search", search_params) search_data = extract_tool_result(result) # Should have either results or error if "status" in search_data and search_data["status"] == "success": assert "results" in search_data assert "results_count" in search_data assert "query_executed" in search_data elif "status" in search_data and search_data["status"] == "error": assert "error" in search_data async def test_job_search(self, fastmcp_client, extract_tool_result): """Test job-based search via FastMCP client""" async with fastmcp_client as client: search_params = { "query": "index=_internal | stats count", "earliest_time": "-5m", "latest_time": "now", } result = await client.call_tool("run_splunk_search", search_params) search_data = extract_tool_result(result) # Should have either results or error if "job_id" in search_data: assert "results" in search_data assert "scan_count" in search_data or "event_count" in search_data elif "status" in search_data and search_data["status"] == "error": assert "error" in search_data async def test_list_apps(self, fastmcp_client, extract_tool_result): """Test listing Splunk apps via FastMCP client""" async with fastmcp_client as client: result = await client.call_tool("list_apps") apps_data = extract_tool_result(result) # Should have either apps or error if "apps" in apps_data: assert "count" in apps_data assert isinstance(apps_data["apps"], list) elif "status" in apps_data and apps_data["status"] == "error": assert "error" in apps_data async def test_list_users(self, fastmcp_client, extract_tool_result): """Test listing Splunk users via FastMCP client""" async with fastmcp_client as client: result = await client.call_tool("list_users") users_data = extract_tool_result(result) # Should have either users or error if "users" in users_data: assert "count" in users_data assert isinstance(users_data["users"], list) elif "status" in users_data and users_data["status"] == "error": assert "error" in users_data # Helper function tests @pytest.mark.unit class TestHelperFunctions: """Test helper functions and utilities""" def test_extract_tool_result_with_json(self, extract_tool_result): """Test extracting JSON from tool result""" class MockContent: text = '{"status": "success", "data": "test"}' mock_result = [MockContent()] result = extract_tool_result(mock_result) assert result["status"] == "success" assert result["data"] == "test" def test_extract_tool_result_with_plain_text(self, extract_tool_result): """Test extracting plain text from tool result""" class MockContent: text = "plain text response" mock_result = [MockContent()] result = extract_tool_result(mock_result) assert result["raw_text"] == "plain text response" def test_extract_tool_result_with_direct_data(self, extract_tool_result): """Test extracting data that's already in the right format""" direct_data = {"status": "success", "count": 5} result = extract_tool_result(direct_data) assert result == direct_data # Error handling tests @pytest.mark.integration class TestErrorHandling: """Test error handling and edge cases using FastMCP patterns""" async def test_invalid_tool_call(self, fastmcp_client): """Test calling non-existent tool""" async with fastmcp_client as client: with pytest.raises(ToolError): await client.call_tool("non_existent_tool") async def test_invalid_tool_parameters(self, fastmcp_client): """Test calling tool with invalid parameters""" async with fastmcp_client as client: # Missing required parameter should raise an error with pytest.raises(ToolError): await client.call_tool("get_configurations", {}) async def test_search_with_invalid_query(self, fastmcp_client, extract_tool_result): """Test search tool with invalid query""" async with fastmcp_client as client: search_params = { "query": "index=nonexistent_index invalid_command", "earliest_time": "-1h", "max_results": 5, } result = await client.call_tool("run_oneshot_search", search_params) search_data = extract_tool_result(result) # Should return an error status or handle gracefully if "status" in search_data: assert search_data["status"] in ["success", "error"] # Performance/load testing @pytest.mark.slow @pytest.mark.integration class TestPerformance: """Performance and load tests using FastMCP patterns""" async def test_multiple_rapid_health_checks(self, fastmcp_client, extract_tool_result): """Test multiple rapid health check calls""" async with fastmcp_client as client: start_time = time.time() # Call health check multiple times for _ in range(10): # Reduced from 100 to be more reasonable result = await client.call_tool("get_splunk_health") health_data = extract_tool_result(result) assert "status" in health_data end_time = time.time() duration = end_time - start_time # Should complete 10 calls in under 5 seconds assert duration < 5.0, f"Health checks took too long: {duration}s" # Workflow integration tests @pytest.mark.integration class TestWorkflowIntegration: """Test realistic workflows using FastMCP patterns""" async def test_discovery_workflow(self, fastmcp_client, extract_tool_result): """Test a realistic discovery workflow""" async with fastmcp_client as client: # 1. Check health first health_result = await client.call_tool("get_splunk_health") health_data = extract_tool_result(health_result) assert "status" in health_data # 2. List available indexes indexes_result = await client.call_tool("list_indexes") indexes_data = extract_tool_result(indexes_result) # 3. List apps apps_result = await client.call_tool("list_apps") apps_data = extract_tool_result(apps_result) # All should return structured data for data in [health_data, indexes_data, apps_data]: assert isinstance(data, dict) async def test_search_workflow(self, fastmcp_client, extract_tool_result): """Test a realistic search workflow""" async with fastmcp_client as client: # 1. Start with a simple search simple_search = await client.call_tool( "run_oneshot_search", {"query": "| metadata type=hosts", "max_results": 5} ) simple_data = extract_tool_result(simple_search) assert isinstance(simple_data, dict) # 2. Try a more complex search complex_search = await client.call_tool( "run_splunk_search", { "query": "| rest /services/server/info", "earliest_time": "-1m", "latest_time": "now", }, ) complex_data = extract_tool_result(complex_search) assert isinstance(complex_data, dict) # Backward compatibility tests (for migration period) @pytest.mark.integration class TestBackwardCompatibility: """Test backward compatibility during migration to FastMCP patterns""" @pytest.mark.skip(reason="Legacy fixtures deprecated - use fastmcp_client") async def test_traefik_connection(self, traefik_client, mcp_helpers): """Legacy test - use fastmcp_client instead""" pass @pytest.mark.skip(reason="Legacy fixtures deprecated - use fastmcp_client") async def test_direct_connection(self, direct_client, mcp_helpers): """Legacy test - use fastmcp_client instead""" pass async def test_resource_access_patterns(self, fastmcp_client): """Test that resources are accessible in expected ways""" async with fastmcp_client as client: # Test resource listing resources = await client.list_resources() assert len(resources) > 0 # Test specific resource access health_resource = await client.read_resource("health://status") assert len(health_resource) > 0

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/deslicer/mcp-for-splunk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server