Skip to main content
Glama

list_kvstore_collections

Retrieve all KV store collections with metadata including app, fields, and accelerated fields across Splunk applications.

Instructions

List all KV store collections across apps. Returns: List of KV store collections with metadata including app, fields, and accelerated fields

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • The handler function for the 'list_kvstore_collections' MCP tool. Decorated with @mcp.tool(), which registers it with FastMCP. Lists all KV Store collections across Splunk apps, including metadata like fields, accelerated fields, app name, and record count. Uses get_splunk_connection() helper.
    @mcp.tool() async def list_kvstore_collections() -> List[Dict[str, Any]]: """ List all KV store collections across apps. Returns: List of KV store collections with metadata including app, fields, and accelerated fields """ try: service = get_splunk_connection() logger.info("📚 Fetching KV store collections...") collections = [] app_count = 0 collections_found = 0 # Get KV store collection stats to retrieve record counts collection_stats = {} try: stats_response = service.get("/services/server/introspection/kvstore/collectionstats", output_mode="json") stats_data = json.loads(stats_response.body.read()) if "entry" in stats_data and len(stats_data["entry"]) > 0: entry = stats_data["entry"][0] content = entry.get("content", {}) data = content.get("data", {}) for kvstore in data: kvstore = json.loads(kvstore) if "ns" in kvstore and "count" in kvstore: collection_stats[kvstore["ns"]] = kvstore["count"] logger.debug(f"✅ Retrieved stats for {len(collection_stats)} KV store collections") except Exception as e: logger.warning(f"⚠️ Error retrieving KV store collection stats: {str(e)}") try: for entry in service.kvstore: try: collection_name = entry['name'] fieldsList = [f.replace('field.', '') for f in entry['content'] if f.startswith('field.')] accelFields = [f.replace('accelerated_field.', '') for f in entry['content'] if f.startswith('accelerated_field.')] app_name = entry['access']['app'] collection_data = { "name": collection_name, "app": app_name, "fields": fieldsList, "accelerated_fields": accelFields, "record_count": collection_stats.get(f"{app_name}.{collection_name}", 0) } collections.append(collection_data) collections_found += 1 logger.debug(f"✅ Added collection: {collection_name} from app: {app_name}") except Exception as e: logger.warning(f"⚠️ Error processing collection entry: {str(e)}") continue logger.info(f"✅ Found {collections_found} KV store collections") return collections except Exception as e: logger.error(f"❌ Error accessing KV store collections: {str(e)}") raise except Exception as e: logger.error(f"❌ Error listing KV store collections: {str(e)}") raise
  • splunk_mcp.py:603-603 (registration)
    The @mcp.tool() decorator registers the list_kvstore_collections function as an MCP tool.
    @mcp.tool()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/livehybrid/splunk-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server