search_splunk
Execute search queries on Splunk Enterprise/Cloud to retrieve data within specified time ranges and result limits, facilitating efficient log analysis and insights extraction.
Instructions
Execute a Splunk search query and return the results.
Args:
search_query: The search query to execute
earliest_time: Start time for the search (default: 24 hours ago)
latest_time: End time for the search (default: now)
max_results: Maximum number of results to return (default: 100)
Returns:
List of search results
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| earliest_time | No | -24h | |
| latest_time | No | now | |
| max_results | No | ||
| search_query | Yes |
Implementation Reference
- splunk_mcp.py:333-378 (handler)The search_splunk tool handler function, registered via @mcp.tool() decorator. Executes Splunk searches using splunklib.client, handles time ranges and result limits, and returns JSON results.@mcp.tool() async def search_splunk(search_query: str, earliest_time: str = "-24h", latest_time: str = "now", max_results: int = 100) -> List[Dict[str, Any]]: """ Execute a Splunk search query and return the results. Args: search_query: The search query to execute earliest_time: Start time for the search (default: 24 hours ago) latest_time: End time for the search (default: now) max_results: Maximum number of results to return (default: 100) Returns: List of search results """ if not search_query: raise ValueError("Search query cannot be empty") # Prepend 'search' if not starting with '|' or 'search' (case-insensitive) stripped_query = search_query.lstrip() if not (stripped_query.startswith('|') or stripped_query.lower().startswith('search')): search_query = f"search {search_query}" try: service = get_splunk_connection() logger.info(f"🔍 Executing search: {search_query}") # Create the search job kwargs_search = { "earliest_time": earliest_time, "latest_time": latest_time, "preview": False, "exec_mode": "blocking" } job = service.jobs.create(search_query, **kwargs_search) # Get the results result_stream = job.results(output_mode='json', count=max_results) results_data = json.loads(result_stream.read().decode('utf-8')) return results_data.get("results", []) except Exception as e: logger.error(f"❌ Search failed: {str(e)}") raise
- splunk_mcp.py:333-333 (registration)The @mcp.tool() decorator registers the search_splunk function as an MCP tool.@mcp.tool()
- splunk_mcp.py:334-346 (schema)Input schema defined by function parameters with types and defaults; output is List[Dict[str, Any]]. Docstring provides detailed descriptions.async def search_splunk(search_query: str, earliest_time: str = "-24h", latest_time: str = "now", max_results: int = 100) -> List[Dict[str, Any]]: """ Execute a Splunk search query and return the results. Args: search_query: The search query to execute earliest_time: Start time for the search (default: 24 hours ago) latest_time: End time for the search (default: now) max_results: Maximum number of results to return (default: 100) Returns: List of search results """
- splunk_mcp.py:298-331 (helper)Helper function to establish Splunk connection using environment variables, used by search_splunk.def get_splunk_connection() -> splunklib.client.Service: """ Get a connection to the Splunk service. Supports both username/password and token-based authentication. If SPLUNK_TOKEN is set, it will be used for authentication and username/password will be ignored. Returns: splunklib.client.Service: Connected Splunk service """ try: if SPLUNK_TOKEN: logger.debug(f"🔌 Connecting to Splunk at {SPLUNK_SCHEME}://{SPLUNK_HOST}:{SPLUNK_PORT} using token authentication") service = splunklib.client.connect( host=SPLUNK_HOST, port=SPLUNK_PORT, scheme=SPLUNK_SCHEME, verify=VERIFY_SSL, token=f"Bearer {SPLUNK_TOKEN}" ) else: username = os.environ.get("SPLUNK_USERNAME", "admin") logger.debug(f"🔌 Connecting to Splunk at {SPLUNK_SCHEME}://{SPLUNK_HOST}:{SPLUNK_PORT} as {username}") service = splunklib.client.connect( host=SPLUNK_HOST, port=SPLUNK_PORT, username=username, password=SPLUNK_PASSWORD, scheme=SPLUNK_SCHEME, verify=VERIFY_SSL ) logger.debug(f"✅ Connected to Splunk successfully") return service except Exception as e: logger.error(f"❌ Failed to connect to Splunk: {str(e)}") raise