Skip to main content
Glama

MCP Server for Splunk

Apache 2.0
16
  • Apple
  • Linux
config.pyβ€’14.2 kB
""" Tool for retrieving Splunk configurations. """ from typing import Any try: from splunklib import client as splunk_client # type: ignore except Exception: # splunklib may not be available in some test contexts splunk_client = None # type: ignore from fastmcp import Context from src.core.base import BaseTool, ToolMetadata from src.core.utils import log_tool_execution class GetConfigurations(BaseTool): """ Get Splunk configurations. """ METADATA = ToolMetadata( name="get_configurations", description=( "Retrieves Splunk configuration settings from specified .conf files. " "Use this tool when you need to access or inspect Splunk configurations, " "such as for troubleshooting, auditing, or understanding settings in files like props.conf or inputs.conf. " "Access settings from any Splunk configuration file (props.conf, transforms.conf, " "inputs.conf, outputs.conf, etc.) either by entire file or specific stanza. " "Returns structured configuration data showing all settings and their values.\n\n" "Args:\n" " conf_file (str): Configuration file name without .conf extension " "(e.g., 'props', 'transforms', 'inputs', 'outputs', 'server', 'web')\n" " stanza (str, optional): Specific stanza name within the conf file to retrieve. " "If not provided, returns all stanzas in the file.\n" " app (str, optional): Filter results to stanzas owned by this app (namespace).\n" " owner (str, optional): Filter results to stanzas owned by this owner (user)." ), category="admin", tags=["configuration", "settings", "administration"], requires_connection=True, ) async def execute( self, ctx: Context, conf_file: str, stanza: str = "", app: str = "", owner: str = "" ) -> dict[str, Any]: """ Get Splunk configurations from specific configuration files. Args: conf_file (str): Configuration file name without .conf extension (e.g., 'props', 'transforms', 'inputs', 'outputs', 'server', 'web') stanza (str, optional): Specific stanza name within the conf file to retrieve. If not provided, returns all stanzas in the file. Returns: Dict containing configuration settings with status, file name, and configuration data """ log_tool_execution( "get_configurations", conf_file=conf_file, stanza=stanza, app=app, owner=owner ) is_available, service, error_msg = self.check_splunk_available(ctx) if not is_available: return self.format_error_response(error_msg) # Normalize inputs to avoid SDK url-encoding issues (e.g., stray newlines) normalized_conf = (conf_file or "").strip() if normalized_conf.endswith(".conf"): normalized_conf = normalized_conf[: -len(".conf")] normalized_stanza = (stanza or "").strip() normalized_app = (app or "").strip() normalized_owner = (owner or "").strip() if not normalized_conf: return self.format_error_response("conf_file is required") # Debug the normalized parameters self.logger.debug( "GetConfigurations called with conf_file=%r, stanza=%r, app=%r, owner=%r", conf_file, stanza, app, owner, ) self.logger.debug( "Normalized conf=%r, stanza=%r, app=%r, owner=%r", normalized_conf, normalized_stanza, normalized_app, normalized_owner, ) self.logger.info("Retrieving configurations from %s", normalized_conf) await ctx.info(f"Retrieving configurations from {normalized_conf}") try: # Use REST API endpoint first to avoid url-encoding issues with certain configs import json def parse_stanzas_from_json(json_bytes: bytes) -> dict[str, dict[str, Any]]: stanzas: dict[str, dict[str, Any]] = {} try: data = json.loads(json_bytes.decode("utf-8")) except Exception: return stanzas entries = data.get("entry", []) if isinstance(data, dict) else [] for entry in entries: stanza_name = entry.get("name") if not stanza_name: continue content = entry.get("content", {}) or {} acl = entry.get("acl", {}) or {} # Keep values as-is from Splunk JSON stanza_settings: dict[str, Any] = {k: v for k, v in content.items()} stanzas[stanza_name] = { "settings": stanza_settings, "app": acl.get("app"), "owner": acl.get("owner"), } return stanzas endpoint = f"/services/configs/conf-{normalized_conf}" # Build namespace attempts, prioritizing provided filters if set attempts: list[tuple[str | None, str | None]] = [] if normalized_app or normalized_owner: attempts.append((normalized_owner or None, normalized_app or None)) # Default fallbacks attempts.extend([(None, None), ("nobody", "search"), ("nobody", "system")]) # Deduplicate while preserving order seen = set() dedup_attempts: list[tuple[str | None, str | None]] = [] for t in attempts: if t not in seen: dedup_attempts.append(t) seen.add(t) attempts = dedup_attempts if normalized_stanza: self.logger.info("Retrieving configuration for stanza: %s", normalized_stanza) await ctx.info(f"Retrieving configuration for stanza: {normalized_stanza}") for ns_owner, ns_app in attempts: try: stanza_endpoint = f"{endpoint}/{normalized_stanza}" self.logger.debug( "REST GET %s (owner=%s, app=%s)", stanza_endpoint, ns_owner, ns_app ) resp = service.get( stanza_endpoint, owner=ns_owner, app=ns_app, output_mode="json" ) parsed = parse_stanzas_from_json(resp.body.read()) if parsed: stanza_info = parsed.get( normalized_stanza, next(iter(parsed.values()), {}) ) # Apply optional filters on app/owner acl_app = stanza_info.get("app") acl_owner = stanza_info.get("owner") if (normalized_app and acl_app != normalized_app) or ( normalized_owner and acl_owner != normalized_owner ): self.logger.debug( "Stanza found but filtered out by app/user (app=%s, owner=%s)", acl_app, acl_owner, ) # continue searching continue result = { "stanza": normalized_stanza, "app": stanza_info.get("app"), "owner": stanza_info.get("owner"), "settings": stanza_info.get("settings", {}), } await ctx.info( f"Retrieved configuration for stanza: {normalized_stanza}" ) return self.format_success_response(result) except Exception as rest_err: self.logger.debug( "REST stanza attempt failed (owner=%s, app=%s): %s", ns_owner, ns_app, repr(rest_err), ) # Fallback to SDK confs access with namespace fallbacks try: confs = service.confs[normalized_conf] stanza_obj = confs[normalized_stanza] result = {"stanza": normalized_stanza, "settings": dict(stanza_obj.content)} await ctx.info(f"Retrieved configuration for stanza: {normalized_stanza}") return self.format_success_response(result) except Exception: if splunk_client is not None: for fb_owner, fb_app in attempts[1:]: try: fb_service = splunk_client.Service( scheme=getattr(service, "scheme", "https"), host=getattr(service, "host", "localhost"), port=getattr(service, "port", 8089), token=getattr(service, "token", None), owner=fb_owner, app=fb_app, ) stanza_obj = fb_service.confs[normalized_conf][normalized_stanza] result = { "stanza": normalized_stanza, "settings": dict(stanza_obj.content), } await ctx.info( f"Retrieved configuration for stanza: {normalized_stanza}" ) return self.format_success_response(result) except Exception: continue msg = f"Stanza '{normalized_stanza}' not found in {normalized_conf}." self.logger.error("%s", msg) await ctx.error(msg) return self.format_error_response(msg) # All stanzas via REST first all_stanzas: dict[str, dict[str, Any]] = {} for ns_owner, ns_app in attempts: try: self.logger.debug("REST GET %s (owner=%s, app=%s)", endpoint, ns_owner, ns_app) resp = service.get( endpoint, owner=ns_owner, app=ns_app, output_mode="json", count=0 ) stanzas = parse_stanzas_from_json(resp.body.read()) if stanzas: # Apply optional filters by app/owner if normalized_app or normalized_owner: filtered: dict[str, dict[str, Any]] = {} for name, info in stanzas.items(): acl_app = info.get("app") acl_owner = info.get("owner") if normalized_app and acl_app != normalized_app: continue if normalized_owner and acl_owner != normalized_owner: continue filtered[name] = info stanzas = filtered if stanzas: all_stanzas.update(stanzas) break except Exception as rest_err: self.logger.debug( "REST list attempt failed (owner=%s, app=%s): %s", ns_owner, ns_app, repr(rest_err), ) if not all_stanzas: # Fallback to SDK confs iteration with namespace fallback try: confs = service.confs[normalized_conf] for stanza_obj in confs: all_stanzas[stanza_obj.name] = dict(stanza_obj.content) except Exception: if splunk_client is not None: for fb_owner, fb_app in attempts[1:]: try: fb_service = splunk_client.Service( scheme=getattr(service, "scheme", "https"), host=getattr(service, "host", "localhost"), port=getattr(service, "port", 8089), token=getattr(service, "token", None), owner=fb_owner, app=fb_app, ) confs = fb_service.confs[normalized_conf] for stanza_obj in confs: all_stanzas[stanza_obj.name] = dict(stanza_obj.content) if all_stanzas: break except Exception: continue self.logger.debug( "Collected %d stanzas: %s", len(all_stanzas), list(all_stanzas.keys())[:10] ) await ctx.info(f"Retrieved {len(all_stanzas)} stanzas from {normalized_conf}") return self.format_success_response({"file": normalized_conf, "stanzas": all_stanzas}) except Exception as e: # Log full stack trace for diagnostics self.logger.exception("Failed to get configurations") await ctx.error(f"Failed to get configurations: {str(e)}") return self.format_error_response(str(e))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/deslicer/mcp-for-splunk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server