search_splunk
Execute Splunk search queries to retrieve log data and analytics results. Specify time ranges and result limits to analyze security events, system performance, or operational data.
Instructions
Execute a Splunk search query and return the results.
Args:
search_query: The search query to execute
earliest_time: Start time for the search (default: 24 hours ago)
latest_time: End time for the search (default: now)
max_results: Maximum number of results to return (default: 100)
Returns:
List of search results
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| search_query | Yes | ||
| earliest_time | No | -24h | |
| latest_time | No | now | |
| max_results | No |
Implementation Reference
- splunk_mcp.py:333-378 (handler)The main handler function decorated with @mcp.tool() that executes Splunk search queries, handles parameters, connects to Splunk, runs the job, and returns results. This serves as both the handler and registration point.@mcp.tool() async def search_splunk(search_query: str, earliest_time: str = "-24h", latest_time: str = "now", max_results: int = 100) -> List[Dict[str, Any]]: """ Execute a Splunk search query and return the results. Args: search_query: The search query to execute earliest_time: Start time for the search (default: 24 hours ago) latest_time: End time for the search (default: now) max_results: Maximum number of results to return (default: 100) Returns: List of search results """ if not search_query: raise ValueError("Search query cannot be empty") # Prepend 'search' if not starting with '|' or 'search' (case-insensitive) stripped_query = search_query.lstrip() if not (stripped_query.startswith('|') or stripped_query.lower().startswith('search')): search_query = f"search {search_query}" try: service = get_splunk_connection() logger.info(f"🔍 Executing search: {search_query}") # Create the search job kwargs_search = { "earliest_time": earliest_time, "latest_time": latest_time, "preview": False, "exec_mode": "blocking" } job = service.jobs.create(search_query, **kwargs_search) # Get the results result_stream = job.results(output_mode='json', count=max_results) results_data = json.loads(result_stream.read().decode('utf-8')) return results_data.get("results", []) except Exception as e: logger.error(f"❌ Search failed: {str(e)}") raise
- splunk_mcp.py:298-332 (helper)Helper function used by search_splunk to establish a connection to the Splunk service supporting both username/password and token authentication.def get_splunk_connection() -> splunklib.client.Service: """ Get a connection to the Splunk service. Supports both username/password and token-based authentication. If SPLUNK_TOKEN is set, it will be used for authentication and username/password will be ignored. Returns: splunklib.client.Service: Connected Splunk service """ try: if SPLUNK_TOKEN: logger.debug(f"🔌 Connecting to Splunk at {SPLUNK_SCHEME}://{SPLUNK_HOST}:{SPLUNK_PORT} using token authentication") service = splunklib.client.connect( host=SPLUNK_HOST, port=SPLUNK_PORT, scheme=SPLUNK_SCHEME, verify=VERIFY_SSL, token=f"Bearer {SPLUNK_TOKEN}" ) else: username = os.environ.get("SPLUNK_USERNAME", "admin") logger.debug(f"🔌 Connecting to Splunk at {SPLUNK_SCHEME}://{SPLUNK_HOST}:{SPLUNK_PORT} as {username}") service = splunklib.client.connect( host=SPLUNK_HOST, port=SPLUNK_PORT, username=username, password=SPLUNK_PASSWORD, scheme=SPLUNK_SCHEME, verify=VERIFY_SSL ) logger.debug(f"✅ Connected to Splunk successfully") return service except Exception as e: logger.error(f"❌ Failed to connect to Splunk: {str(e)}") raise
- splunk_mcp.py:47-52 (registration)Initialization of the FastMCP server instance where tools like search_splunk are registered via decorators.mcp = FastMCP( "splunk", description="A FastMCP-based tool for interacting with Splunk Enterprise/Cloud through natural language", version="0.3.0", host="0.0.0.0", # Listen on all interfaces port=FASTMCP_PORT