Skip to main content
Glama

MCP Server for Splunk

Apache 2.0
16
  • Apple
  • Linux
saved_search_tools.py45.8 kB
""" Saved search tools for Splunk MCP server. Provides comprehensive functionality for managing and executing Splunk saved searches including listing, executing, creating, updating, and deleting saved searches. """ import time from typing import Any, Literal from fastmcp import Context from splunklib.results import JSONResultsReader from src.core.base import BaseTool, ToolMetadata from src.core.utils import log_tool_execution, sanitize_search_query class ListSavedSearches(BaseTool): """ List saved searches available in the Splunk environment with filtering options. Returns metadata about saved searches including ownership, scheduling, and permissions. """ METADATA = ToolMetadata( name="list_saved_searches", description=( "List saved searches with ownership, schedule, visibility, and permission metadata. " "Use this to discover available reports/automations and to filter by owner/app/sharing. " "Results reflect only saved searches the current user can access.\n\n" "Args:\n" " owner (str, optional): Filter by owner name (optional)\n" " app (str, optional): Filter by application name (optional)\n" " sharing (str, optional): Filter by sharing level (optional)\n" " include_disabled (bool, optional): Include disabled saved searches (default: False)\n\n" ), category="search", tags=["saved_searches", "list", "metadata"], requires_connection=True, ) async def execute( self, ctx: Context, owner: str | None = None, app: str | None = None, sharing: Literal["user", "app", "global", "system"] | None = None, include_disabled: bool = False, ) -> dict[str, Any]: """ List saved searches with optional filtering. Args: owner: Filter by owner name (optional) app: Filter by application name (optional) sharing: Filter by sharing level (optional) include_disabled: Include disabled saved searches (default: False) Returns: Dict containing: - saved_searches: List of saved search metadata - total_count: Total number of saved searches found - filtered_count: Number after applying filters Example: list_saved_searches(owner="admin", app="search", include_disabled=True) """ log_tool_execution("list_saved_searches", owner=owner, app=app, sharing=sharing) is_available, service, error_msg = self.check_splunk_available(ctx) if not is_available: await ctx.error(f"List saved searches failed: {error_msg}") return self.format_error_response(error_msg, saved_searches=[], total_count=0) try: await ctx.info("Retrieving saved searches list") saved_searches_list = [] total_count = 0 for saved_search in service.saved_searches: total_count += 1 # Extract saved search metadata search_info = { "name": saved_search.name, "search": saved_search.content.get("search", ""), "description": saved_search.content.get("description", ""), "owner": saved_search.content.get("eai:acl", {}).get("owner", ""), "app": saved_search.content.get("eai:acl", {}).get("app", ""), "sharing": saved_search.content.get("eai:acl", {}).get("sharing", ""), "disabled": self._convert_splunk_boolean( saved_search.content.get("disabled"), False ), "is_scheduled": self._convert_splunk_boolean( saved_search.content.get("is_scheduled"), False ), "is_visible": self._convert_splunk_boolean( saved_search.content.get("is_visible"), True ), "cron_schedule": saved_search.content.get("cron_schedule", ""), "next_scheduled_time": saved_search.content.get("next_scheduled_time", ""), "earliest_time": saved_search.content.get("dispatch.earliest_time", ""), "latest_time": saved_search.content.get("dispatch.latest_time", ""), "updated": saved_search.content.get("updated", ""), "permissions": { "read": saved_search.content.get("eai:acl", {}) .get("perms", {}) .get("read", []), "write": saved_search.content.get("eai:acl", {}) .get("perms", {}) .get("write", []), }, } # Apply filters if owner and search_info["owner"] != owner: continue if app and search_info["app"] != app: continue if sharing and search_info["sharing"] != sharing: continue if not include_disabled and search_info["disabled"]: continue saved_searches_list.append(search_info) # Sort by name for consistent output saved_searches_list.sort(key=lambda x: x["name"]) return self.format_success_response( { "saved_searches": saved_searches_list, "total_count": total_count, "filtered_count": len(saved_searches_list), "filters_applied": { "owner": owner, "app": app, "sharing": sharing, "include_disabled": include_disabled, }, } ) except Exception as e: self.logger.error(f"Failed to list saved searches: {str(e)}") await ctx.error(f"Failed to list saved searches: {str(e)}") return self.format_error_response(str(e), saved_searches=[], total_count=0) def _convert_splunk_boolean(self, value, default=False): """Convert Splunk boolean values to Python booleans""" if isinstance(value, bool): return value if isinstance(value, str): return value.lower() in ("1", "true", "yes", "on") if isinstance(value, int | float): return bool(value) return default class ExecuteSavedSearch(BaseTool): """ Execute a saved search by name with optional parameter overrides. Supports both oneshot and job execution modes based on search complexity. """ METADATA = ToolMetadata( name="execute_saved_search", description=( "Run a saved search by name with optional time overrides and mode selection. Use this to " "execute existing reports/automations quickly. Choose 'oneshot' for immediate results or " "'job' for progress tracking and large result sets.\\n\\n" "Outputs: results list (capped by max_results), mode used, timing, and job id (if job).\\n" "Security: execution and results are constrained by the authenticated user's permissions." ), category="search", tags=["saved_searches", "execute", "search"], requires_connection=True, ) async def execute( self, ctx: Context, name: str, earliest_time: str | None = None, latest_time: str | None = None, mode: Literal["oneshot", "job"] = "oneshot", max_results: int = 100, app: str | None = None, owner: str | None = None, ) -> dict[str, Any]: """ Execute a saved search by name. Args: name: Name of the saved search to execute earliest_time: Override earliest time for search (optional) latest_time: Override latest time for search (optional) mode: Execution mode - 'oneshot' for immediate results or 'job' for progress tracking max_results: Maximum number of results to return (default: 100) app: Application context for the saved search (optional) owner: Owner context for the saved search (optional) Returns: Dict containing: - saved_search_name: Name of the executed saved search - results: List of search results - results_count: Number of results returned - execution_mode: Mode used for execution - job_id: Search job ID (if job mode) - duration: Execution time in seconds Example: execute_saved_search( name="Security Alerts", earliest_time="-1h", mode="job" ) """ log_tool_execution( "execute_saved_search", name=name, mode=mode, earliest_time=earliest_time ) is_available, service, error_msg = self.check_splunk_available(ctx) if not is_available: await ctx.error(f"Execute saved search failed: {error_msg}") return self.format_error_response(error_msg, saved_search_name=name) try: # Find the saved search await ctx.info(f"Looking for saved search: {name}") # Build search criteria search_kwargs = {} if app: search_kwargs["app"] = app if owner: search_kwargs["owner"] = owner saved_search = None try: if search_kwargs: # Search with specific criteria for ss in service.saved_searches(**search_kwargs): if ss.name == name: saved_search = ss break else: # Direct access saved_search = service.saved_searches[name] except KeyError: # Try to find it by iterating through all saved searches for ss in service.saved_searches: if ss.name == name: if app and ss.content.get("eai:acl", {}).get("app") != app: continue if owner and ss.content.get("eai:acl", {}).get("owner") != owner: continue saved_search = ss break if not saved_search: error_msg = f"Saved search '{name}' not found" if app or owner: error_msg += f" (app: {app}, owner: {owner})" await ctx.error(error_msg) return self.format_error_response(error_msg, saved_search_name=name) # Check if saved search is disabled if self._convert_splunk_boolean(saved_search.content.get("disabled"), False): error_msg = f"Saved search '{name}' is disabled" await ctx.error(error_msg) return self.format_error_response(error_msg, saved_search_name=name) # Prepare execution parameters dispatch_kwargs = {} # Use override times if provided, otherwise use saved search defaults if earliest_time is not None: dispatch_kwargs["earliest_time"] = earliest_time elif saved_search.content.get("dispatch.earliest_time"): dispatch_kwargs["earliest_time"] = saved_search.content.get( "dispatch.earliest_time" ) if latest_time is not None: dispatch_kwargs["latest_time"] = latest_time elif saved_search.content.get("dispatch.latest_time"): dispatch_kwargs["latest_time"] = saved_search.content.get("dispatch.latest_time") await ctx.info(f"Executing saved search '{name}' in {mode} mode") start_time = time.time() if mode == "oneshot": return await self._execute_oneshot( ctx, saved_search, dispatch_kwargs, max_results, start_time ) else: return await self._execute_job( ctx, saved_search, dispatch_kwargs, max_results, start_time ) except Exception as e: self.logger.error(f"Failed to execute saved search '{name}': {str(e)}") await ctx.error(f"Failed to execute saved search '{name}': {str(e)}") return self.format_error_response(str(e), saved_search_name=name) async def _execute_oneshot( self, ctx: Context, saved_search, dispatch_kwargs: dict, max_results: int, start_time: float ) -> dict[str, Any]: """Execute saved search in oneshot mode""" dispatch_kwargs["count"] = max_results dispatch_kwargs["output_mode"] = "json" job = saved_search.dispatch(**dispatch_kwargs) results = [] result_count = 0 reader = JSONResultsReader(job) for result in reader: if isinstance(result, dict): results.append(result) result_count += 1 if result_count >= max_results: break duration = time.time() - start_time return self.format_success_response( { "saved_search_name": saved_search.name, "results": results, "results_count": result_count, "execution_mode": "oneshot", "duration": round(duration, 3), "search_query": saved_search.content.get("search", ""), "dispatch_parameters": dispatch_kwargs, } ) async def _execute_job( self, ctx: Context, saved_search, dispatch_kwargs: dict, max_results: int, start_time: float ) -> dict[str, Any]: """Execute saved search in job mode with progress tracking""" job = saved_search.dispatch(**dispatch_kwargs) # Wait for job completion with progress reporting while not job.is_done(): await ctx.report_progress( progress=int(float(job.content.get("dispatchState", {}).get("percentComplete", 0))), total=100, ) time.sleep(0.5) job.refresh() # Get job statistics stats = job.content # Get results results = [] result_count = 0 kwargs_paginate = {"count": max_results, "output_mode": "json"} reader = JSONResultsReader(job.results(**kwargs_paginate)) for result in reader: if isinstance(result, dict): results.append(result) result_count += 1 if result_count >= max_results: break duration = time.time() - start_time return self.format_success_response( { "saved_search_name": saved_search.name, "job_id": job.sid, "results": results, "results_count": result_count, "execution_mode": "job", "scan_count": int(float(stats.get("scanCount", 0))), "event_count": int(float(stats.get("eventCount", 0))), "duration": round(duration, 3), "search_query": saved_search.content.get("search", ""), "dispatch_parameters": dispatch_kwargs, "status": { "is_finalized": stats.get("isFinalized", "0") == "1", "is_failed": stats.get("isFailed", "0") == "1", "progress": 100, }, } ) def _convert_splunk_boolean(self, value, default=False): """Convert Splunk boolean values to Python booleans""" if isinstance(value, bool): return value if isinstance(value, str): return value.lower() in ("1", "true", "yes", "on") if isinstance(value, int | float): return bool(value) return default class CreateSavedSearch(BaseTool): """ Create a new saved search with specified configuration. Supports scheduling, alerting, and sharing configuration. """ METADATA = ToolMetadata( name="create_saved_search", description=( "Create a saved search (report/automation) with optional scheduling and sharing. Use this " "to persist useful SPL queries and optionally schedule them via cron.\\n\\n" "Outputs: creation status and the applied configuration.\\n" "Security: visibility and execution are constrained by permissions and chosen sharing level." ), category="search", tags=["saved_searches", "create", "management"], requires_connection=True, ) async def execute( self, ctx: Context, name: str, search: str, description: str = "", earliest_time: str = "", latest_time: str = "", app: str | None = None, sharing: Literal["user", "app", "global"] = "user", is_scheduled: bool = False, cron_schedule: str = "", is_visible: bool = True, ) -> dict[str, Any]: """ Create a new saved search. Args: name: Name for the new saved search (must be unique) search: The SPL search query to save description: Description of the saved search earliest_time: Default earliest time for the search latest_time: Default latest time for the search app: Application context (uses current app if not specified) sharing: Sharing level for the saved search is_scheduled: Whether to enable scheduling cron_schedule: Cron expression for scheduling (required if is_scheduled=True) is_visible: Whether the saved search is visible in the UI Returns: Dict containing: - name: Name of the created saved search - created: Whether creation was successful - configuration: Applied configuration Example: create_saved_search( name="Daily Error Count", search="index=main error | stats count", description="Count of daily errors", earliest_time="-24h@h", latest_time="@h", is_scheduled=True, cron_schedule="0 8 * * *" ) """ log_tool_execution("create_saved_search", name=name, search=search[:50] + "...") is_available, service, error_msg = self.check_splunk_available(ctx) if not is_available: await ctx.error(f"Create saved search failed: {error_msg}") return self.format_error_response(error_msg, name=name, created=False) try: # Validate inputs if not name.strip(): return self.format_error_response( "Saved search name cannot be empty", name=name, created=False ) if not search.strip(): return self.format_error_response( "Search query cannot be empty", name=name, created=False ) if is_scheduled and not cron_schedule.strip(): return self.format_error_response( "Cron schedule is required when is_scheduled=True", name=name, created=False ) # Check if saved search already exists try: existing = service.saved_searches[name] if existing: return self.format_error_response( f"Saved search '{name}' already exists", name=name, created=False ) except KeyError: # This is expected - saved search doesn't exist yet pass # Sanitize search query search = sanitize_search_query(search) # Build configuration config = { "search": search, "description": description, "is_visible": "1" if is_visible else "0", } # Add time range if specified if earliest_time: config["dispatch.earliest_time"] = earliest_time if latest_time: config["dispatch.latest_time"] = latest_time # Add scheduling if enabled if is_scheduled: config["is_scheduled"] = "1" config["cron_schedule"] = cron_schedule else: config["is_scheduled"] = "0" await ctx.info(f"Creating saved search '{name}'") # Create the saved search if app: service.saved_searches.create(name, **config, app=app, sharing=sharing) else: service.saved_searches.create(name, **config, sharing=sharing) return self.format_success_response( { "name": name, "created": True, "configuration": { "search": search, "description": description, "earliest_time": earliest_time, "latest_time": latest_time, "app": app or "default", "sharing": sharing, "is_scheduled": is_scheduled, "cron_schedule": cron_schedule if is_scheduled else "", "is_visible": is_visible, }, "created_at": time.time(), } ) except Exception as e: self.logger.error(f"Failed to create saved search '{name}': {str(e)}") await ctx.error(f"Failed to create saved search '{name}': {str(e)}") return self.format_error_response(str(e), name=name, created=False) class UpdateSavedSearch(BaseTool): """ Update an existing saved search's configuration. Allows modification of search query, scheduling, and other properties. """ METADATA = ToolMetadata( name="update_saved_search", description=( "Update an existing saved search's configuration including query, scheduling, " "and other properties. Allows selective modification of saved search parameters " "while preserving unchanged settings. Supports updating search logic, time ranges, " "scheduling configuration, and visibility settings for flexible search management.\\n\\n" "Args:\\n" " name (str): Name of the saved search to update (required)\\n" " search (str, optional): New SPL search query\\n" " description (str, optional): New description text\\n" " earliest_time (str, optional): New default earliest time " "(e.g., '-24h@h', '-7d', '2024-01-01T00:00:00')\\n" " latest_time (str, optional): New default latest time " "(e.g., 'now', '@d', '2024-01-02T00:00:00')\\n" " is_scheduled (bool, optional): Enable or disable scheduled execution\\n" " cron_schedule (str, optional): New cron expression for scheduling\\n" " is_visible (bool, optional): Show or hide in Splunk UI\\n" " app (str, optional): Application context for saved search lookup\\n" " owner (str, optional): Owner context for saved search lookup\\n\\n" "Response Format:\\n" "Returns dictionary with 'status', 'name', 'updated', 'changes_made', and 'updated_at' fields." ), category="search", tags=["saved_searches", "update", "management"], requires_connection=True, ) async def execute( self, ctx: Context, name: str, search: str | None = None, description: str | None = None, earliest_time: str | None = None, latest_time: str | None = None, is_scheduled: bool | None = None, cron_schedule: str | None = None, is_visible: bool | None = None, app: str | None = None, owner: str | None = None, ) -> dict[str, Any]: """ Update an existing saved search. Args: name: Name of the saved search to update search: New search query (optional) description: New description (optional) earliest_time: New earliest time (optional) latest_time: New latest time (optional) is_scheduled: Enable/disable scheduling (optional) cron_schedule: New cron schedule (optional) is_visible: Show/hide in UI (optional) app: Application context (optional) owner: Owner context (optional) Returns: Dict containing: - name: Name of the updated saved search - updated: Whether update was successful - changes_made: List of properties that were changed Example: update_saved_search( name="Daily Error Count", description="Updated: Count of daily errors with severity", search="index=main error | stats count by severity", is_scheduled=False ) """ log_tool_execution("update_saved_search", name=name) is_available, service, error_msg = self.check_splunk_available(ctx) if not is_available: await ctx.error(f"Update saved search failed: {error_msg}") return self.format_error_response(error_msg, name=name, updated=False) try: # Find the saved search await ctx.info(f"Looking for saved search to update: {name}") saved_search = None try: # Try direct access first saved_search = service.saved_searches[name] # Check app/owner constraints if specified if app and saved_search.content.get("eai:acl", {}).get("app") != app: saved_search = None if owner and saved_search.content.get("eai:acl", {}).get("owner") != owner: saved_search = None except KeyError: # Search by iterating if direct access fails for ss in service.saved_searches: if ss.name == name: if app and ss.content.get("eai:acl", {}).get("app") != app: continue if owner and ss.content.get("eai:acl", {}).get("owner") != owner: continue saved_search = ss break if not saved_search: error_msg = f"Saved search '{name}' not found" if app or owner: error_msg += f" (app: {app}, owner: {owner})" await ctx.error(error_msg) return self.format_error_response(error_msg, name=name, updated=False) # Build update configuration update_config = {} changes_made = [] if search is not None: search = sanitize_search_query(search) update_config["search"] = search changes_made.append("search") if description is not None: update_config["description"] = description changes_made.append("description") if earliest_time is not None: update_config["dispatch.earliest_time"] = earliest_time changes_made.append("earliest_time") if latest_time is not None: update_config["dispatch.latest_time"] = latest_time changes_made.append("latest_time") if is_scheduled is not None: update_config["is_scheduled"] = "1" if is_scheduled else "0" changes_made.append("is_scheduled") if cron_schedule is not None: if is_scheduled is not False: # Only update if not explicitly disabled update_config["cron_schedule"] = cron_schedule changes_made.append("cron_schedule") if is_visible is not None: update_config["is_visible"] = "1" if is_visible else "0" changes_made.append("is_visible") if not update_config: return self.format_error_response( "No update parameters provided", name=name, updated=False ) # Validate scheduling configuration if "is_scheduled" in update_config and update_config["is_scheduled"] == "1": current_cron = saved_search.content.get("cron_schedule", "") new_cron = update_config.get("cron_schedule", current_cron) if not new_cron.strip(): return self.format_error_response( "Cron schedule is required when enabling scheduling", name=name, updated=False, ) await ctx.info(f"Updating saved search '{name}' with changes: {changes_made}") # Apply updates saved_search.update(**update_config) return self.format_success_response( { "name": name, "updated": True, "changes_made": changes_made, "updated_at": time.time(), } ) except Exception as e: self.logger.error(f"Failed to update saved search '{name}': {str(e)}") await ctx.error(f"Failed to update saved search '{name}': {str(e)}") return self.format_error_response(str(e), name=name, updated=False) class DeleteSavedSearch(BaseTool): """ Delete a saved search from Splunk. Requires confirmation and provides safety checks. """ METADATA = ToolMetadata( name="delete_saved_search", description="Delete a saved search with confirmation and safety checks", category="search", tags=["saved_searches", "delete", "management"], requires_connection=True, ) async def execute( self, ctx: Context, name: str, confirm: bool = False, app: str | None = None, owner: str | None = None, ) -> dict[str, Any]: """ Delete a saved search. Args: name: Name of the saved search to delete confirm: Confirmation that deletion is intended (must be True) app: Application context (optional) owner: Owner context (optional) Returns: Dict containing: - name: Name of the saved search - deleted: Whether deletion was successful - was_scheduled: Whether the deleted search was scheduled Example: delete_saved_search( name="Old Test Search", confirm=True ) """ log_tool_execution("delete_saved_search", name=name, confirm=confirm) is_available, service, error_msg = self.check_splunk_available(ctx) if not is_available: await ctx.error(f"Delete saved search failed: {error_msg}") return self.format_error_response(error_msg, name=name, deleted=False) try: # Safety check - require explicit confirmation if not confirm: return self.format_error_response( "Deletion requires explicit confirmation. Set confirm=True to proceed.", name=name, deleted=False, ) # Find the saved search await ctx.info(f"Looking for saved search to delete: {name}") saved_search = None try: # Try direct access first saved_search = service.saved_searches[name] # Check app/owner constraints if specified if app and saved_search.content.get("eai:acl", {}).get("app") != app: saved_search = None if owner and saved_search.content.get("eai:acl", {}).get("owner") != owner: saved_search = None except KeyError: # Search by iterating if direct access fails for ss in service.saved_searches: if ss.name == name: if app and ss.content.get("eai:acl", {}).get("app") != app: continue if owner and ss.content.get("eai:acl", {}).get("owner") != owner: continue saved_search = ss break if not saved_search: error_msg = f"Saved search '{name}' not found" if app or owner: error_msg += f" (app: {app}, owner: {owner})" await ctx.error(error_msg) return self.format_error_response(error_msg, name=name, deleted=False) # Get information before deletion was_scheduled = self._convert_splunk_boolean( saved_search.content.get("is_scheduled"), False ) search_app = saved_search.content.get("eai:acl", {}).get("app", "") search_owner = saved_search.content.get("eai:acl", {}).get("owner", "") await ctx.info( f"Deleting saved search '{name}' (app: {search_app}, owner: {search_owner})" ) # Delete the saved search saved_search.delete() return self.format_success_response( { "name": name, "deleted": True, "was_scheduled": was_scheduled, "app": search_app, "owner": search_owner, "deleted_at": time.time(), } ) except Exception as e: self.logger.error(f"Failed to delete saved search '{name}': {str(e)}") await ctx.error(f"Failed to delete saved search '{name}': {str(e)}") return self.format_error_response(str(e), name=name, deleted=False) def _convert_splunk_boolean(self, value, default=False): """Convert Splunk boolean values to Python booleans""" if isinstance(value, bool): return value if isinstance(value, str): return value.lower() in ("1", "true", "yes", "on") if isinstance(value, int | float): return bool(value) return default class GetSavedSearchDetails(BaseTool): """ Get detailed information about a specific saved search. Returns comprehensive metadata including scheduling, alerting, and permissions. """ METADATA = ToolMetadata( name="get_saved_search_details", description=( "Get comprehensive details about a specific saved search including configuration, " "metadata, scheduling, permissions, and alert actions. Returns detailed information " "about saved search properties, execution settings, and access control configuration. " "Essential for troubleshooting, auditing, and understanding saved search configurations.\\n\\n" "Args:\\n" " name (str): Name of the saved search to inspect (required)\\n" " app (str, optional): Application context for saved search lookup\\n" " owner (str, optional): Owner context for saved search lookup\\n\\n" "Response Format:\\n" "Returns dictionary with 'status', 'name', 'details', and 'retrieved_at' fields. " "The 'details' field contains comprehensive nested information including:\\n" "- basic_info: Name, description, search query, visibility\\n" "- scheduling: Schedule configuration and timing\\n" "- dispatch: Time range and execution settings\\n" "- permissions: Access control and sharing settings\\n" "- actions: Email, script, and other alert actions\\n" "- alert: Alert conditions and suppression settings\\n" "- metadata: Creation timestamps and authorship" ), category="search", tags=["saved_searches", "details", "metadata"], requires_connection=True, ) async def execute( self, ctx: Context, name: str, app: str | None = None, owner: str | None = None ) -> dict[str, Any]: """ Get detailed information about a saved search. Args: name: Name of the saved search app: Application context (optional) owner: Owner context (optional) Returns: Dict containing comprehensive saved search details: - basic_info: Name, description, search query - scheduling: Schedule configuration and status - dispatch: Time range and execution settings - permissions: Access control and sharing - actions: Alert actions configuration - metadata: Creation and modification timestamps Example: get_saved_search_details(name="Security Alerts") """ log_tool_execution("get_saved_search_details", name=name) is_available, service, error_msg = self.check_splunk_available(ctx) if not is_available: await ctx.error(f"Get saved search details failed: {error_msg}") return self.format_error_response(error_msg, name=name) try: # Find the saved search await ctx.info(f"Retrieving details for saved search: {name}") saved_search = None try: # Try direct access first saved_search = service.saved_searches[name] # Check app/owner constraints if specified if app and saved_search.content.get("eai:acl", {}).get("app") != app: saved_search = None if owner and saved_search.content.get("eai:acl", {}).get("owner") != owner: saved_search = None except KeyError: # Search by iterating if direct access fails for ss in service.saved_searches: if ss.name == name: if app and ss.content.get("eai:acl", {}).get("app") != app: continue if owner and ss.content.get("eai:acl", {}).get("owner") != owner: continue saved_search = ss break if not saved_search: error_msg = f"Saved search '{name}' not found" if app or owner: error_msg += f" (app: {app}, owner: {owner})" await ctx.error(error_msg) return self.format_error_response(error_msg, name=name) content = saved_search.content acl = content.get("eai:acl", {}) # Build comprehensive details details = { "basic_info": { "name": saved_search.name, "search": content.get("search", ""), "description": content.get("description", ""), "qualified_search": content.get("qualifiedSearch", ""), "disabled": self._convert_splunk_boolean(content.get("disabled"), False), "is_visible": self._convert_splunk_boolean(content.get("is_visible"), True), }, "scheduling": { "is_scheduled": self._convert_splunk_boolean( content.get("is_scheduled"), False ), "cron_schedule": content.get("cron_schedule", ""), "next_scheduled_time": content.get("next_scheduled_time", ""), "schedule_priority": content.get("schedule_priority", "default"), "schedule_window": content.get("schedule_window", ""), }, "dispatch": { "earliest_time": content.get("dispatch.earliest_time", ""), "latest_time": content.get("dispatch.latest_time", ""), "index_earliest": content.get("dispatch.index_earliest", ""), "index_latest": content.get("dispatch.index_latest", ""), "max_count": content.get("dispatch.max_count", ""), "max_time": content.get("dispatch.max_time", ""), "spawn_process": self._convert_splunk_boolean( content.get("dispatch.spawn_process"), True ), "time_format": content.get("dispatch.time_format", ""), }, "permissions": { "owner": acl.get("owner", ""), "app": acl.get("app", ""), "sharing": acl.get("sharing", ""), "modifiable": acl.get("modifiable", ""), "can_write": acl.get("can_write", ""), "can_share_app": acl.get("can_share_app", ""), "can_share_global": acl.get("can_share_global", ""), "read_permissions": acl.get("perms", {}).get("read", []), "write_permissions": acl.get("perms", {}).get("write", []), }, "actions": { "email": { "enabled": self._convert_splunk_boolean(content.get("action.email"), False), "to": content.get("action.email.to", ""), "subject": content.get("action.email.subject", ""), "message": content.get("action.email.message", ""), }, "populate_lookup": { "enabled": self._convert_splunk_boolean( content.get("action.populate_lookup"), False ), "dest": content.get("action.populate_lookup.dest", ""), }, "rss": { "enabled": self._convert_splunk_boolean(content.get("action.rss"), False) }, "script": { "enabled": self._convert_splunk_boolean( content.get("action.script"), False ), "filename": content.get("action.script.filename", ""), }, "summary_index": { "enabled": self._convert_splunk_boolean( content.get("action.summary_index"), False ), "name": content.get("action.summary_index._name", ""), }, }, "alert": { "type": content.get("alert_type", ""), "condition": content.get("alert.condition", ""), "comparator": content.get("alert.comparator", ""), "threshold": content.get("alert.threshold", ""), "track": content.get("alert.track", ""), "digest_mode": self._convert_splunk_boolean( content.get("alert.digest_mode"), False ), "suppress": { "enabled": self._convert_splunk_boolean( content.get("alert.suppress"), False ), "period": content.get("alert.suppress.period", ""), "fields": content.get("alert.suppress.fields", ""), }, }, "metadata": { "created": content.get("eai:acl", {}).get("created", ""), "updated": content.get("updated", ""), "author": content.get("eai:acl", {}).get("author", ""), "splunk_server": content.get("splunk_server", ""), "version": content.get("eai:acl", {}).get("version", ""), }, } return self.format_success_response( {"name": name, "details": details, "retrieved_at": time.time()} ) except Exception as e: self.logger.error(f"Failed to get saved search details for '{name}': {str(e)}") await ctx.error(f"Failed to get saved search details for '{name}': {str(e)}") return self.format_error_response(str(e), name=name) def _convert_splunk_boolean(self, value, default=False): """Convert Splunk boolean values to Python booleans""" if isinstance(value, bool): return value if isinstance(value, str): return value.lower() in ("1", "true", "yes", "on") if isinstance(value, int | float): return bool(value) return default

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/deslicer/mcp-for-splunk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server