Skip to main content
Glama
deslicer

MCP Server for Splunk

test_mcp_server.py18.5 kB
""" Pytest tests for MCP Server for Splunk Tests using FastMCP's in-memory testing patterns following best practices from: https://gofastmcp.com/patterns/testing """ import time import pytest from fastmcp.exceptions import ToolError # Integration tests using FastMCP Client (recommended approach) @pytest.mark.integration class TestMCPClientIntegration: """Integration tests using FastMCP in-memory client following FastMCP best practices""" async def test_fastmcp_client_health_check(self, fastmcp_client, extract_tool_result): """Test health check via FastMCP client""" async with fastmcp_client as client: # Call the health check tool result = await client.call_tool("get_splunk_health") health_data = extract_tool_result(result) # The test should handle all possible states assert "status" in health_data assert health_data["status"] in ["connected", "disconnected", "error"] async def test_fastmcp_client_list_tools(self, fastmcp_client): """Test listing tools via FastMCP client""" async with fastmcp_client as client: tools = await client.list_tools() # Check that we have the expected tools tool_names = [tool.name for tool in tools] expected_tools = [ "get_splunk_health", "list_indexes", "run_oneshot_search", "run_splunk_search", "list_apps", "list_users", ] for expected_tool in expected_tools: assert expected_tool in tool_names async def test_fastmcp_client_list_resources(self, fastmcp_client): """Test listing resources via FastMCP client""" async with fastmcp_client as client: resources = await client.list_resources() # Check that we have the health resource resource_uris = [str(resource.uri) for resource in resources] assert "health://status" in resource_uris async def test_fastmcp_client_read_health_resource(self, fastmcp_client): """Test reading health resource via FastMCP client""" async with fastmcp_client as client: result = await client.read_resource("health://status") assert len(result) > 0 assert hasattr(result[0], "text") assert result[0].text == "OK" async def test_fastmcp_client_ping(self, fastmcp_client): """Test ping functionality""" async with fastmcp_client as client: # Should not raise an exception await client.ping() @pytest.mark.integration class TestSplunkToolsIntegration: """Integration tests for Splunk tools using FastMCP in-memory testing""" async def test_splunk_health_check(self, fastmcp_client, extract_tool_result): """Test Splunk health check tool via FastMCP client""" async with fastmcp_client as client: result = await client.call_tool("get_splunk_health") health_data = extract_tool_result(result) assert "status" in health_data # In test environment without Splunk connection, we expect error or disconnected assert health_data["status"] in ["connected", "disconnected", "error"] if health_data["status"] == "connected": assert "version" in health_data assert "server_name" in health_data async def test_list_indexes(self, fastmcp_client, extract_tool_result): """Test listing Splunk indexes via FastMCP client""" async with fastmcp_client as client: result = await client.call_tool("list_indexes") indexes_data = extract_tool_result(result) # Should have either success response or error response if "status" in indexes_data and indexes_data["status"] == "success": assert "indexes" in indexes_data assert "count" in indexes_data assert isinstance(indexes_data["indexes"], list) elif "status" in indexes_data and indexes_data["status"] == "error": assert "error" in indexes_data async def test_oneshot_search(self, fastmcp_client, extract_tool_result): """Test oneshot search via FastMCP client""" async with fastmcp_client as client: search_params = { "query": "index=_internal | head 5", "earliest_time": "-15m", "latest_time": "now", "max_results": 5, } result = await client.call_tool("run_oneshot_search", search_params) search_data = extract_tool_result(result) # Should have either results or error if "status" in search_data and search_data["status"] == "success": assert "results" in search_data assert "results_count" in search_data assert "query_executed" in search_data elif "status" in search_data and search_data["status"] == "error": assert "error" in search_data async def test_job_search(self, fastmcp_client, extract_tool_result): """Test job-based search via FastMCP client""" async with fastmcp_client as client: search_params = { "query": "index=_internal | stats count", "earliest_time": "-5m", "latest_time": "now", } result = await client.call_tool("run_splunk_search", search_params) search_data = extract_tool_result(result) # Should have either results or error if "job_id" in search_data: assert "results" in search_data assert "scan_count" in search_data or "event_count" in search_data elif "status" in search_data and search_data["status"] == "error": assert "error" in search_data async def test_list_apps(self, fastmcp_client, extract_tool_result): """Test listing Splunk apps via FastMCP client""" async with fastmcp_client as client: result = await client.call_tool("list_apps") apps_data = extract_tool_result(result) # Should have either apps or error if "apps" in apps_data: assert "count" in apps_data assert isinstance(apps_data["apps"], list) elif "status" in apps_data and apps_data["status"] == "error": assert "error" in apps_data async def test_list_users(self, fastmcp_client, extract_tool_result): """Test listing Splunk users via FastMCP client""" async with fastmcp_client as client: result = await client.call_tool("list_users") users_data = extract_tool_result(result) # Should have either users or error if "users" in users_data: assert "count" in users_data assert isinstance(users_data["users"], list) elif "status" in users_data and users_data["status"] == "error": assert "error" in users_data # Helper function tests @pytest.mark.unit class TestHelperFunctions: """Test helper functions and utilities""" def test_extract_tool_result_with_json(self, extract_tool_result): """Test extracting JSON from tool result""" class MockContent: text = '{"status": "success", "data": "test"}' mock_result = [MockContent()] result = extract_tool_result(mock_result) assert result["status"] == "success" assert result["data"] == "test" def test_extract_tool_result_with_plain_text(self, extract_tool_result): """Test extracting plain text from tool result""" class MockContent: text = "plain text response" mock_result = [MockContent()] result = extract_tool_result(mock_result) assert result["raw_text"] == "plain text response" def test_extract_tool_result_with_direct_data(self, extract_tool_result): """Test extracting data that's already in the right format""" direct_data = {"status": "success", "count": 5} result = extract_tool_result(direct_data) assert result == direct_data # Error handling tests @pytest.mark.integration class TestErrorHandling: """Test error handling and edge cases using FastMCP patterns""" async def test_invalid_tool_call(self, fastmcp_client): """Test calling non-existent tool""" async with fastmcp_client as client: with pytest.raises(ToolError): await client.call_tool("non_existent_tool") async def test_invalid_tool_parameters(self, fastmcp_client): """Test calling tool with invalid parameters""" async with fastmcp_client as client: # Missing required parameter should raise an error with pytest.raises(ToolError): await client.call_tool("get_configurations", {}) async def test_search_with_invalid_query(self, fastmcp_client, extract_tool_result): """Test search tool with invalid query""" async with fastmcp_client as client: search_params = { "query": "index=nonexistent_index invalid_command", "earliest_time": "-1h", "max_results": 5, } result = await client.call_tool("run_oneshot_search", search_params) search_data = extract_tool_result(result) # Should return an error status or handle gracefully if "status" in search_data: assert search_data["status"] in ["success", "error"] # Performance/load testing @pytest.mark.slow @pytest.mark.integration class TestPerformance: """Performance and load tests using FastMCP patterns""" async def test_multiple_rapid_health_checks(self, fastmcp_client, extract_tool_result): """Test multiple rapid health check calls""" async with fastmcp_client as client: start_time = time.time() # Call health check multiple times for _ in range(10): # Reduced from 100 to be more reasonable result = await client.call_tool("get_splunk_health") health_data = extract_tool_result(result) assert "status" in health_data end_time = time.time() duration = end_time - start_time # Should complete 10 calls in under 5 seconds assert duration < 5.0, f"Health checks took too long: {duration}s" # Workflow integration tests @pytest.mark.integration class TestWorkflowIntegration: """Test realistic workflows using FastMCP patterns""" async def test_discovery_workflow(self, fastmcp_client, extract_tool_result): """Test a realistic discovery workflow""" async with fastmcp_client as client: # 1. Check health first health_result = await client.call_tool("get_splunk_health") health_data = extract_tool_result(health_result) assert "status" in health_data # 2. List available indexes indexes_result = await client.call_tool("list_indexes") indexes_data = extract_tool_result(indexes_result) # 3. List apps apps_result = await client.call_tool("list_apps") apps_data = extract_tool_result(apps_result) # All should return structured data for data in [health_data, indexes_data, apps_data]: assert isinstance(data, dict) async def test_search_workflow(self, fastmcp_client, extract_tool_result): """Test a realistic search workflow""" async with fastmcp_client as client: # 1. Start with a simple search simple_search = await client.call_tool( "run_oneshot_search", {"query": "| metadata type=hosts", "max_results": 5} ) simple_data = extract_tool_result(simple_search) assert isinstance(simple_data, dict) # 2. Try a more complex search complex_search = await client.call_tool( "run_splunk_search", { "query": "| rest /services/server/info", "earliest_time": "-1m", "latest_time": "now", }, ) complex_data = extract_tool_result(complex_search) assert isinstance(complex_data, dict) # Backward compatibility tests (for migration period) @pytest.mark.integration class TestBackwardCompatibility: """Test backward compatibility during migration to FastMCP patterns""" @pytest.mark.skip(reason="Legacy fixtures deprecated - use fastmcp_client") async def test_traefik_connection(self, traefik_client, mcp_helpers): """Legacy test - use fastmcp_client instead""" pass @pytest.mark.skip(reason="Legacy fixtures deprecated - use fastmcp_client") async def test_direct_connection(self, direct_client, mcp_helpers): """Legacy test - use fastmcp_client instead""" pass async def test_resource_access_patterns(self, fastmcp_client): """Test that resources are accessible in expected ways""" async with fastmcp_client as client: # Test resource listing resources = await client.list_resources() assert len(resources) > 0 # Test specific resource access health_resource = await client.read_resource("health://status") assert len(health_resource) > 0 # HTTP header/session propagation tests for Streamable HTTP transport @pytest.mark.integration class TestHttpSessionHeaders: """Verify both X-Session-ID and MCP-Session-ID are honored over HTTP.""" @pytest.mark.asyncio async def test_session_header_x_session_id(self): import json import httpx from src.server import create_root_app, get_mcp mcp = get_mcp() app = create_root_app(mcp) headers = { "accept": "application/json, text/event-stream", "content-type": "application/json", "X-Session-ID": "test-x-session-123", "X-Splunk-Host": "example", "X-Splunk-Port": "8089", "X-Splunk-Username": "admin", "X-Splunk-Password": "pass", "X-Splunk-Scheme": "https", "X-Splunk-Verify-SSL": "false", } body = { "jsonrpc": "2.0", "id": "1", "method": "tools/call", "params": {"name": "user_agent_info", "arguments": {}}, } # Ensure app lifespan is started for FastMCP's session manager try: from asgi_lifespan import LifespanManager # type: ignore async with LifespanManager(app): transport = httpx.ASGITransport(app=app) async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: resp = await client.post("/mcp", headers=headers, content=json.dumps(body)) # 400 is acceptable - server may reject without valid session initialization assert resp.status_code in (200, 303, 400) except ImportError: import inspect params = list(getattr(inspect.signature(httpx.ASGITransport), "parameters", {}).keys()) if "lifespan" in params: transport = httpx.ASGITransport(app=app, lifespan="on") # type: ignore[arg-type] async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: resp = await client.post("/mcp", headers=headers, content=json.dumps(body)) # 400 is acceptable - server may reject without valid session initialization assert resp.status_code in (200, 303, 400) else: pytest.skip("ASGI lifespan not available; install asgi_lifespan for this test.") # When 303 (See Other), FastMCP is streaming; follow-up GET may be needed. # For our purposes, 200 indicates JSON-response mode (e.g., tests). @pytest.mark.asyncio async def test_session_header_mcp_session_id(self): import json import httpx from src.server import create_root_app, get_mcp mcp = get_mcp() app = create_root_app(mcp) headers = { "accept": "application/json, text/event-stream", "content-type": "application/json", "MCP-Session-ID": "test-mcp-session-456", "X-Splunk-Host": "example", "X-Splunk-Port": "8089", "X-Splunk-Username": "admin", "X-Splunk-Password": "pass", "X-Splunk-Scheme": "https", "X-Splunk-Verify-SSL": "false", } body = { "jsonrpc": "2.0", "id": "1", "method": "tools/call", "params": {"name": "user_agent_info", "arguments": {}}, } try: from asgi_lifespan import LifespanManager # type: ignore async with LifespanManager(app): transport = httpx.ASGITransport(app=app) async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: resp = await client.post("/mcp", headers=headers, content=json.dumps(body)) # 400 is acceptable - server may reject without valid session initialization assert resp.status_code in (200, 303, 400) except ImportError: import inspect params = list(getattr(inspect.signature(httpx.ASGITransport), "parameters", {}).keys()) if "lifespan" in params: transport = httpx.ASGITransport(app=app, lifespan="on") # type: ignore[arg-type] async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: resp = await client.post("/mcp", headers=headers, content=json.dumps(body)) # 400 is acceptable - server may reject without valid session initialization assert resp.status_code in (200, 303, 400) else: pytest.skip("ASGI lifespan not available; install asgi_lifespan for this test.")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/deslicer/mcp-for-splunk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server