Skip to main content
Glama
splunk_client.py11.4 kB
""" Splunk API Client Handles authentication and API interactions with Splunk Enterprise/Cloud """ import asyncio import json import logging import time from typing import Any, Dict, List, Optional from urllib.parse import urljoin import xml.etree.ElementTree as ET import aiohttp import xmltodict from config import SplunkConfig logger = logging.getLogger(__name__) class SplunkClient: """Async Splunk REST API client""" def __init__(self, config: SplunkConfig): self.config = config self.session: Optional[aiohttp.ClientSession] = None self.session_key: Optional[str] = None self.base_url = f"{config.scheme}://{config.host}:{config.port}" async def connect(self): """Establish connection and authenticate with Splunk""" if self.session: await self.session.close() # Create SSL context for HTTPS ssl_context = None if self.config.scheme == "https": import ssl ssl_context = ssl.create_default_context() if not self.config.verify_ssl: ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE # Create session with appropriate settings connector = aiohttp.TCPConnector(ssl=ssl_context) timeout = aiohttp.ClientTimeout(total=self.config.timeout) self.session = aiohttp.ClientSession( connector=connector, timeout=timeout, headers={"User-Agent": "Splunk-MCP-Client/1.0"} ) await self._authenticate() async def close(self): """Close the HTTP session""" if self.session: await self.session.close() self.session = None async def _authenticate(self): """Authenticate with Splunk and get session key""" auth_url = urljoin(self.base_url, "/services/auth/login") # Prepare authentication data if self.config.token: # Token-based authentication headers = {"Authorization": f"Bearer {self.config.token}"} self.session.headers.update(headers) # Test the token by making a simple API call test_url = urljoin(self.base_url, "/services/server/info") async with self.session.get(test_url) as response: if response.status == 200: logger.info("Token authentication successful") return else: raise Exception(f"Token authentication failed: {response.status}") elif self.config.username and self.config.password: # Username/password authentication auth_data = { "username": self.config.username, "password": self.config.password, "output_mode": "json" } async with self.session.post(auth_url, data=auth_data) as response: if response.status == 200: result = await response.json() self.session_key = result["sessionKey"] # Update session headers with session key self.session.headers.update({ "Authorization": f"Splunk {self.session_key}" }) logger.info("Username/password authentication successful") else: error_text = await response.text() raise Exception(f"Authentication failed: {response.status} - {error_text}") else: raise Exception("No authentication method provided. Use either token or username/password.") async def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: """Make authenticated request to Splunk API""" if not self.session: raise Exception("Client not connected. Call connect() first.") url = urljoin(self.base_url, endpoint) # Set default output mode to JSON if "params" not in kwargs: kwargs["params"] = {} if "output_mode" not in kwargs["params"]: kwargs["params"]["output_mode"] = "json" async with self.session.request(method, url, **kwargs) as response: if response.status in (200, 201): content_type = response.headers.get("content-type", "") if "application/json" in content_type: return await response.json() elif "text/xml" in content_type or "application/xml" in content_type: xml_content = await response.text() return xmltodict.parse(xml_content) else: return {"text": await response.text()} else: error_text = await response.text() raise Exception(f"API request failed: {response.status} - {error_text}") async def search( self, query: str, earliest_time: str = "-24h@h", latest_time: str = "now", max_count: int = 100, timeout: int = 60 ) -> Dict[str, Any]: """Execute a search query""" # Create search job search_params = { "search": query, "earliest_time": earliest_time, "latest_time": latest_time, "max_count": max_count, "output_mode": "json" } job_response = await self._make_request( "POST", "/services/search/jobs", data=search_params ) search_id = job_response["sid"] logger.info(f"Created search job: {search_id}") # Wait for search to complete start_time = time.time() while time.time() - start_time < timeout: job_status = await self._make_request( "GET", f"/services/search/jobs/{search_id}" ) entry = job_status["entry"][0] if "entry" in job_status else {} content = entry.get("content", {}) if content.get("dispatchState") == "DONE": break elif content.get("dispatchState") == "FAILED": raise Exception(f"Search failed: {content.get('messages', 'Unknown error')}") await asyncio.sleep(1) else: raise Exception(f"Search timeout after {timeout} seconds") # Get search results results_response = await self._make_request( "GET", f"/services/search/jobs/{search_id}/results" ) # Clean up search job await self._make_request("DELETE", f"/services/search/jobs/{search_id}") return { "results": results_response.get("results", []), "messages": results_response.get("messages", []), "search_time": time.time() - start_time } async def list_indexes(self, pattern: Optional[str] = None) -> List[Dict[str, Any]]: """List available indexes""" response = await self._make_request("GET", "/services/data/indexes") indexes = [] for entry in response.get("entry", []): index_name = entry.get("name", "") # Apply pattern filter if provided if pattern: import fnmatch if not fnmatch.fnmatch(index_name, pattern): continue content = entry.get("content", {}) indexes.append({ "name": index_name, "currentDBSizeMB": content.get("currentDBSizeMB", 0), "maxDataSize": content.get("maxDataSize", "auto"), "totalEventCount": content.get("totalEventCount", 0), "disabled": content.get("disabled", False) }) return sorted(indexes, key=lambda x: x["name"]) async def list_saved_searches( self, search_name: Optional[str] = None, owner: Optional[str] = None ) -> List[Dict[str, Any]]: """List saved searches""" endpoint = "/services/saved/searches" params = {} if owner: params["owner"] = owner response = await self._make_request("GET", endpoint, params=params) saved_searches = [] for entry in response.get("entry", []): name = entry.get("name", "") # Filter by search name if provided if search_name and search_name.lower() not in name.lower(): continue content = entry.get("content", {}) saved_searches.append({ "name": name, "search": content.get("search", ""), "description": content.get("description", ""), "owner": entry.get("author", ""), "app": entry.get("acl", {}).get("app", ""), "disabled": content.get("disabled", False), "cron_schedule": content.get("cron_schedule", ""), "next_scheduled_time": content.get("next_scheduled_time", "") }) return sorted(saved_searches, key=lambda x: x["name"]) async def list_apps(self, visible_only: bool = True) -> List[Dict[str, Any]]: """List installed applications""" response = await self._make_request("GET", "/services/apps/local") apps = [] for entry in response.get("entry", []): content = entry.get("content", {}) # Filter visible apps only if requested if visible_only and content.get("visible", True) is False: continue apps.append({ "name": entry.get("name", ""), "label": content.get("label", ""), "description": content.get("description", ""), "version": content.get("version", ""), "author": entry.get("author", ""), "disabled": content.get("disabled", False), "configured": content.get("configured", False) }) return sorted(apps, key=lambda x: x["name"]) async def get_server_info(self) -> Dict[str, Any]: """Get server information""" response = await self._make_request("GET", "/services/server/info") if "entry" in response and len(response["entry"]) > 0: content = response["entry"][0].get("content", {}) return { "version": content.get("version", ""), "build": content.get("build", ""), "serverName": content.get("serverName", ""), "host": content.get("host", ""), "product_type": content.get("product_type", ""), "license_state": content.get("license_state", ""), "mode": content.get("mode", ""), "startup_time": content.get("startup_time", "") } else: return {}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rootiq-ai/splunk-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server