Skip to main content
Glama
deslicer

MCP Server for Splunk

conftest.py29.7 kB
""" Test configuration and fixtures for MCP Server for Splunk tests. """ import json import os import sys from typing import Any from unittest.mock import AsyncMock, Mock, patch import pytest # Add src to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) # Import FastMCP for proper testing try: from fastmcp import Client, Context except ImportError: # Create fallback if FastMCP not available Context = None Client = None # Mock classes that match the actual structure class MockSplunkService: """Mock Splunk service that mimics splunklib.client.Service""" def __init__(self): # Mock indexes self.indexes = [ Mock(name="_internal"), Mock(name="main"), Mock(name="security"), Mock(name="test"), ] for idx in self.indexes: idx.name = idx._mock_name # Mock info self.info = {"version": "9.0.0", "host": "so1"} # Mock jobs for search operations self.jobs = Mock() # Mock job creation and oneshot mock_job = Mock() mock_job.sid = "test_job_123" mock_job.is_done.return_value = True mock_job.content = { "scanCount": "100", "eventCount": "10", "isDone": "1", "isFinalized": "1", "isFailed": "0", "doneProgress": "1.0", } # Mock job results def mock_results(): return [ { "_time": "2024-01-01T00:00:00", "source": "/var/log/system.log", "log_level": "INFO", }, { "_time": "2024-01-01T00:01:00", "source": "/var/log/app.log", "log_level": "ERROR", }, ] mock_job.results.return_value = mock_results() self.jobs.oneshot.return_value = mock_results() self.jobs.create.return_value = mock_job # Mock apps self.apps = [ Mock(name="search"), Mock(name="splunk_monitoring_console"), Mock(name="learned"), ] for app in self.apps: app.name = app._mock_name app.content = {"version": "1.0", "visible": True} # Mock users self.users = [Mock(name="admin"), Mock(name="splunk-system-user")] for user in self.users: user.name = user._mock_name user.content = { "roles": ["admin"], "email": "admin@example.com", "realname": user._mock_name, "type": "Splunk", "defaultApp": "search", } # Mock KV Store self.kvstore = {} # Mock configurations self.confs = {} # Mock configuration files with stanzas self._setup_mock_configurations() # In-memory dashboards store self._dashboards: dict[str, dict] = {} def post(self, endpoint: str, **kwargs): """Mock POST handler for Splunk REST endpoints used by dashboard tools.""" # Basic response mock mock_response = Mock() # Create dashboard if endpoint.endswith("/data/ui/views") and "name" in kwargs: name = kwargs.get("name") # Simulate conflict for overwrite path testing if name == "exists_dashboard": raise RuntimeError("409 Conflict: Dashboard already exists") label = kwargs.get("label") or name description = kwargs.get("description") or "" eai_data = kwargs.get("eai:data") or "" owner = "nobody" app = "search" entry = { "name": name, "id": f"https://localhost:8089/servicesNS/{owner}/{app}/data/ui/views/{name}", "content": { "label": label, "description": description, "eai:data": eai_data, "version": "1.0", }, "acl": { "app": app, "owner": owner, "sharing": "global", "perms": {"read": ["*"], "write": ["admin"]}, }, } self._dashboards[name] = entry payload = {"entry": [entry]} mock_response.body.read.return_value = json.dumps(payload).encode("utf-8") return mock_response # Update dashboard (overwrite) if "/data/ui/views/" in endpoint and not endpoint.endswith("/acl"): # Endpoint like /servicesNS/owner/app/data/ui/views/{name} name = endpoint.split("/data/ui/views/")[-1] existing = self._dashboards.get(name) label = kwargs.get("label") or (existing or {}).get("content", {}).get("label", name) description = kwargs.get("description") or (existing or {}).get("content", {}).get( "description", "" ) eai_data = kwargs.get("eai:data") or (existing or {}).get("content", {}).get( "eai:data", "" ) if existing: existing["content"]["label"] = label existing["content"]["description"] = description existing["content"]["eai:data"] = eai_data entry = existing else: entry = { "name": name, "id": endpoint.replace("http://", "https://"), "content": { "label": label, "description": description, "eai:data": eai_data, "version": "1.0", }, "acl": { "app": "search", "owner": "nobody", "sharing": "global", "perms": {"read": ["*"], "write": ["admin"]}, }, } self._dashboards[name] = entry payload = {"entry": [entry]} mock_response.body.read.return_value = json.dumps(payload).encode("utf-8") return mock_response # ACL update if endpoint.endswith("/acl"): # Extract dashboard name parts = endpoint.split("/data/ui/views/")[-1].split("/") name = parts[0] entry = self._dashboards.get(name) if entry: sharing = kwargs.get("sharing") read_perms = kwargs.get("perms.read") write_perms = kwargs.get("perms.write") if sharing: entry["acl"]["sharing"] = sharing if read_perms: entry["acl"]["perms"]["read"] = read_perms.split(",") if write_perms: entry["acl"]["perms"]["write"] = write_perms.split(",") mock_response.body.read.return_value = json.dumps({"success": True}).encode("utf-8") return mock_response # Config create stanza: /servicesNS/{owner}/{app}/configs/conf-<conf> if "/configs/conf-" in endpoint and not any(seg in endpoint for seg in ["/data/", "/acl"]): # Determine if this is create or update based on presence of trailing stanza base, conf_part = endpoint.split("/configs/conf-", 1) if "/" not in conf_part: # Create stanza conf_name = conf_part name = kwargs.get("name") if not name: raise RuntimeError("Missing stanza name in create config POST") # Ensure underlying dict exists self._ensure_conf(conf_name) # type: ignore[attr-defined] conf_obj = self.confs[conf_name] # Ensure _stanzas_dict exists and get reference to it # Use try/except to reliably check attribute existence with Mock objects try: stanzas_dict = conf_obj._stanzas_dict # type: ignore[attr-defined] if not isinstance(stanzas_dict, dict): stanzas_dict = {} conf_obj._stanzas_dict = stanzas_dict # type: ignore[attr-defined] except AttributeError: stanzas_dict = {} conf_obj._stanzas_dict = stanzas_dict # type: ignore[attr-defined] # Build content from kwargs excluding 'name' content = {k: v for k, v in kwargs.items() if k != "name"} stanzas_dict[name] = content # Build response payload owner = base.split("/servicesNS/")[-1].split("/")[0] or "nobody" app = ( base.split("/servicesNS/")[-1].split("/")[1] if "/" in base.split("/servicesNS/")[-1] else "search" ) entry = { "name": name, "content": content, "acl": {"app": app, "owner": owner}, } mock_response.body.read.return_value = json.dumps({"entry": [entry]}).encode( "utf-8" ) return mock_response else: # Update existing stanza: /configs/conf-<conf>/<stanza> conf_name, stanza_name = conf_part.split("/", 1) self._ensure_conf(conf_name) # type: ignore[attr-defined] conf_obj = self.confs[conf_name] # Ensure _stanzas_dict exists and get reference to it # Use getattr with sentinel to avoid hasattr issues with Mock objects if getattr(conf_obj, "_stanzas_dict", None) is None: conf_obj._stanzas_dict = {} # type: ignore[attr-defined] stanzas_dict = conf_obj._stanzas_dict # type: ignore[attr-defined] current = stanzas_dict.get(stanza_name, {}) # Merge provided keys into existing for k, v in kwargs.items(): current[k] = v stanzas_dict[stanza_name] = current owner = base.split("/servicesNS/")[-1].split("/")[0] or "nobody" app = ( base.split("/servicesNS/")[-1].split("/")[1] if "/" in base.split("/servicesNS/")[-1] else "search" ) entry = { "name": stanza_name, "content": current, "acl": {"app": app, "owner": owner}, } mock_response.body.read.return_value = json.dumps({"entry": [entry]}).encode( "utf-8" ) return mock_response # Default empty response mock_response.body.read.return_value = json.dumps({}).encode("utf-8") return mock_response def get(self, endpoint: str, **kwargs): """Mock GET handler for Splunk REST endpoints used by admin/config tools.""" mock_response = Mock() owner = kwargs.get("owner") or "nobody" app = kwargs.get("app") or "search" output_mode = kwargs.get("output_mode", "json") # Config specific stanza: /services/configs/conf-<conf>/<stanza> # Check this BEFORE the listing pattern to avoid matching listing endpoints if ( endpoint.startswith("/services/configs/conf-") and "/" in endpoint[len("/services/configs/conf-") :] ): parts = endpoint.split("/services/configs/conf-")[-1].split("/") conf_name = parts[0] stanza_name = parts[1] entries = [] conf_obj = self.confs.get(conf_name) if conf_obj: stanzas_dict = getattr(conf_obj, "_stanzas_dict", None) if isinstance(stanzas_dict, dict): # Only return entry if stanza actually exists in dict if stanza_name in stanzas_dict: content = stanzas_dict[stanza_name] entries.append( { "name": stanza_name, "content": content, "acl": {"app": app, "owner": owner}, } ) else: # Fallback to iterator-based access content = ( conf_obj.__getitem__(stanza_name).content if hasattr(conf_obj, "__getitem__") else {} ) if content is not None: entries.append( { "name": stanza_name, "content": content, "acl": {"app": app, "owner": owner}, } ) payload = {"entry": entries} mock_response.body.read.return_value = json.dumps(payload).encode("utf-8") return mock_response # Config listing: /services/configs/conf-<conf> if endpoint.startswith("/services/configs/conf-") and output_mode == "json": conf_name = endpoint.split("/services/configs/conf-")[-1].split("/")[0] entries = [] conf_obj = self.confs.get(conf_name) if conf_obj: # Prefer dynamic dict if available stanzas_dict = getattr(conf_obj, "_stanzas_dict", None) if isinstance(stanzas_dict, dict): for name, content in stanzas_dict.items(): entries.append( {"name": name, "content": content, "acl": {"app": app, "owner": owner}} ) else: for stanza in conf_obj: entries.append( { "name": stanza.name, "content": stanza.content, "acl": {"app": app, "owner": owner}, } ) payload = {"entry": entries} mock_response.body.read.return_value = json.dumps(payload).encode("utf-8") return mock_response # Default empty JSON mock_response.body.read.return_value = json.dumps({}).encode("utf-8") return mock_response def _setup_mock_configurations(self): """Set up mock configuration files with stanzas""" # Mock configuration stanzas for different conf files mock_configs = { "props": { "default": {"SHOULD_LINEMERGE": "true", "BREAK_ONLY_BEFORE_DATE": "true"}, "source::/var/log/messages": {"sourcetype": "linux_messages_syslog"}, "splunk_web_access": {"EXTRACT-status": r"(?i)\s(?P<status>\d+)\s"}, }, "transforms": { "force_sourcetype_for_syslog": { "DEST_KEY": "_MetaData:Sourcetype", "FORMAT": "syslog", }, "dnslookup": {"external_cmd": "dnslookup.py", "fields_list": "clientip"}, }, "tags": { "eventtype=failed_login": {"authentication": "enabled", "failure": "enabled"}, "sourcetype=syslog": {"os": "enabled", "unix": "enabled"}, }, "macros": { "get_eventtype(1)": {"definition": 'eventtype="$eventtype$"', "args": "eventtype"}, "index_earliest": {"definition": "earliest=-24h@h"}, }, "inputs": { "default": {"host": "$decideOnStartup"}, "monitor:///var/log/messages": { "sourcetype": "linux_messages_syslog", "disabled": "false", }, }, "outputs": { "tcpout": {"defaultGroup": "splunk_indexers", "disabled": "false"}, "tcpout:splunk_indexers": {"server": "10.1.1.100:9997", "compressed": "true"}, }, "server": { "general": {"serverName": "splunk-server", "sessionTimeout": "1h"}, "license": {"active_group": "Enterprise"}, }, "web": { "settings": {"httpport": "8000", "mgmtHostPort": "127.0.0.1:8089"}, "feature:quarantine_files": {"enable": "true"}, }, } # Create mock configuration objects for conf_name, stanzas in mock_configs.items(): mock_conf = Mock() mock_conf.name = conf_name # Store stanzas dict for direct access (avoids recursion issues) mock_conf._stanzas_dict = stanzas.copy() # type: ignore[attr-defined] # Create mock stanza objects for iteration mock_stanzas = [] for stanza_name, content in stanzas.items(): mock_stanza = Mock() mock_stanza.name = stanza_name mock_stanza.content = content mock_stanzas.append(mock_stanza) # Make the conf object iterable to return stanzas # Generate iterator from _stanzas_dict on-demand to avoid sync issues def make_iter(mock_conf=mock_conf): # Generate mock stanza objects from dict to avoid recursion stanzas_dict = getattr(mock_conf, "_stanzas_dict", {}) return iter([Mock(name=n, content=c) for n, c in stanzas_dict.items()]) mock_conf.__iter__ = make_iter # Allow accessing specific stanzas by name mock_conf.__getitem__ = lambda key, stanzas_dict=stanzas: Mock( name=key, content=stanzas_dict.get(key, {}) ) self.confs[conf_name] = mock_conf # Support updates/creates via REST endpoints by mutating self.confs def _ensure_conf(conf_name: str): if conf_name not in self.confs: mock_conf = Mock() mock_conf.name = conf_name stanzas = {} mock_conf._stanzas_dict = stanzas # type: ignore[attr-defined] # Create iterator function that generates from dict on-demand def make_iter(mock_conf=mock_conf): stanzas_dict = getattr(mock_conf, "_stanzas_dict", {}) return iter([Mock(name=n, content=c) for n, c in stanzas_dict.items()]) mock_conf.__iter__ = make_iter mock_conf.__getitem__ = lambda key, stanzas_dict=stanzas: Mock( name=key, content=stanzas_dict.get(key, {}) ) self.confs[conf_name] = mock_conf else: # Config exists - ensure _stanzas_dict exists but preserve existing data conf_obj = self.confs[conf_name] # Only create empty dict if it truly doesn't exist (not just None) # Use try/except to reliably check attribute existence with Mock objects try: existing_dict = conf_obj._stanzas_dict # type: ignore[attr-defined] if not isinstance(existing_dict, dict): # If it exists but isn't a dict, replace it conf_obj._stanzas_dict = {} # type: ignore[attr-defined] except AttributeError: # Attribute doesn't exist, create it conf_obj._stanzas_dict = {} # type: ignore[attr-defined] # Monkey-patch helpers on instance for POST handlers to use self._ensure_conf = _ensure_conf # type: ignore[attr-defined] class MockJob: """Mock search job object""" def __init__(self, is_done=True, results=None): self._is_done = is_done self._results = results or [] self.content = { "scanCount": len(self._results), "eventCount": len(self._results), "duration": 0.123, } def is_done(self): return self._is_done def __iter__(self): return iter(self._results) class MockFastMCPContext: """Mock FastMCP Context that matches the actual Context interface""" def __init__(self, service=None, is_connected=True): self.request_context = Mock() self.request_context.lifespan_context = Mock() self.request_context.lifespan_context.service = service or MockSplunkService() self.request_context.lifespan_context.is_connected = is_connected # Mock Context methods self.info = AsyncMock() self.debug = AsyncMock() self.warning = AsyncMock() self.error = AsyncMock() self.report_progress = AsyncMock() self.read_resource = AsyncMock() self.sample = AsyncMock() # Mock Context properties self.request_id = "test-request-123" self.client_id = "test-client-456" self.session_id = "test-session-789" class MockResultsReader: """Mock for splunklib.results.ResultsReader""" def __init__(self, results): self.results = results def __iter__(self): return iter(self.results) class MCPTestHelpers: """Helper functions for MCP testing using FastMCP patterns""" async def check_connection_health(self, client) -> dict[str, Any]: """Check MCP connection health and return status""" try: # Test basic connectivity by listing tools and resources tools = await client.list_tools() resources = await client.list_resources() # Test a simple tool call health_result = await client.call_tool("get_splunk_health") return { "ping": True, "tools_count": len(tools), "resources_count": len(resources), "tools": [tool.name for tool in tools], "resources": [resource.uri for resource in resources], "health_check": health_result, } except Exception as e: return { "ping": False, "error": str(e), "tools_count": 0, "resources_count": 0, "tools": [], "resources": [], } @pytest.fixture async def fastmcp_client(): """Create FastMCP client for in-memory testing""" if Client is None: pytest.skip("FastMCP not available") # Import the actual server from src.server import mcp # Use FastMCP's in-memory transport for testing client = Client(mcp) yield client # Client cleanup is handled automatically @pytest.fixture def mcp_helpers(): """Create MCP test helpers""" return MCPTestHelpers() @pytest.fixture def extract_tool_result(): """Helper function to extract results from MCP tool calls""" def _extract(result): """Extract data from MCP tool call result""" # Fast path: CallToolResult-like with .data if hasattr(result, "data"): return result.data # Handle content-style results if hasattr(result, "contents") and getattr(result, "contents", None): first = result.contents[0] if hasattr(first, "text"): try: return json.loads(first.text) except (json.JSONDecodeError, AttributeError): return {"raw_text": first.text} if isinstance(result, dict): return result elif isinstance(result, list) and len(result) > 0: first_item = result[0] if hasattr(first_item, "text"): try: # Try to parse as JSON return json.loads(first_item.text) except (json.JSONDecodeError, AttributeError): # Return as raw text if not JSON return {"raw_text": first_item.text} elif isinstance(first_item, dict): return first_item else: return {"raw_data": first_item} else: return {"empty_result": True} return _extract @pytest.fixture def splunk_test_query(): """Sample Splunk query for testing""" return { "query": "index=_internal | head 5", "earliest_time": "-15m", "latest_time": "now", "max_results": 5, } @pytest.fixture def mock_splunk_service(): """Create a mock Splunk service for testing""" return MockSplunkService() @pytest.fixture def mock_context(mock_splunk_service): """Create a mock FastMCP Context with Splunk service""" return MockFastMCPContext(service=mock_splunk_service, is_connected=True) @pytest.fixture def mock_disconnected_context(): """Create a mock FastMCP Context with disconnected Splunk service""" return MockFastMCPContext(service=None, is_connected=False) @pytest.fixture def mock_search_results(): """Sample search results for testing""" return [ {"_time": "2024-01-01T00:00:00", "source": "/var/log/system.log", "log_level": "INFO"}, {"_time": "2024-01-01T00:01:00", "source": "/var/log/app.log", "log_level": "ERROR"}, {"_time": "2024-01-01T00:02:00", "source": "/var/log/system.log", "log_level": "WARN"}, ] @pytest.fixture def mock_oneshot_job(mock_search_results): """Create a mock oneshot job""" job = Mock() job.__iter__ = lambda: iter(mock_search_results) return job @pytest.fixture def mock_regular_job(mock_search_results): """Create a mock regular search job""" return MockJob(is_done=True, results=mock_search_results) @pytest.fixture def sample_env_vars(): """Sample environment variables for testing""" return { # Use local Splunk container defaults when available "SPLUNK_HOST": "localhost", "SPLUNK_PORT": "8089", "SPLUNK_USERNAME": "admin", "SPLUNK_PASSWORD": "Chang3d!", "SPLUNK_VERIFY_SSL": "false", } @pytest.fixture def mock_kvstore_collection_data(): """Sample KV Store collection data""" return [ {"_key": "1", "username": "admin", "role": "admin", "active": True}, {"_key": "2", "username": "user1", "role": "user", "active": True}, {"_key": "3", "username": "user2", "role": "user", "active": False}, ] @pytest.fixture(autouse=True) def setup_test_environment(sample_env_vars): """Set up test environment variables""" with patch.dict(os.environ, sample_env_vars): yield @pytest.fixture(autouse=True) def mock_splunk_get_service(mock_splunk_service): """Autouse fixture to mock Splunk service access for all tools. This keeps tests in-memory while avoiding dependence on a running Splunk. """ async def _get_service(*args, **kwargs): return mock_splunk_service patches = [] try: # Import tool classes and patch their get_splunk_service from src.tools.admin.apps import ListApps from src.tools.admin.users import ListUsers from src.tools.health.status import GetSplunkHealth from src.tools.kvstore.collections import ListKvstoreCollections from src.tools.kvstore.data import GetKvstoreData from src.tools.metadata.indexes import ListIndexes from src.tools.metadata.sources import ListSources from src.tools.metadata.sourcetypes import ListSourcetypes from src.tools.search.job_search import JobSearch from src.tools.search.oneshot_search import OneshotSearch from src.tools.search.saved_search_tools import ( CreateSavedSearch, DeleteSavedSearch, ExecuteSavedSearch, GetSavedSearchDetails, ListSavedSearches, UpdateSavedSearch, ) targets = [ GetSplunkHealth, OneshotSearch, JobSearch, ListApps, ListUsers, ListIndexes, ListSources, ListSourcetypes, ListKvstoreCollections, GetKvstoreData, CreateSavedSearch, DeleteSavedSearch, ExecuteSavedSearch, GetSavedSearchDetails, ListSavedSearches, UpdateSavedSearch, ] for cls in targets: p = patch.object(cls, "get_splunk_service", AsyncMock(side_effect=_get_service)) p.start() patches.append(p) yield finally: for p in patches: try: p.stop() except Exception: pass @pytest.fixture def mock_results_reader(): """Mock for ResultsReader""" return MockResultsReader # Legacy fixtures for backward compatibility (will be removed eventually) @pytest.fixture async def traefik_client(): """Legacy fixture - use fastmcp_client instead""" pytest.skip("Use fastmcp_client fixture for proper FastMCP testing") @pytest.fixture async def direct_client(): """Legacy fixture - use fastmcp_client instead""" pytest.skip("Use fastmcp_client fixture for proper FastMCP testing") # Async test configuration for pytest-asyncio pytest_plugins = ["pytest_asyncio"]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/deslicer/mcp-for-splunk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server