Skip to main content
Glama

list_kvstore_collections

Retrieve and display all KV store collections across apps, including metadata like app association, fields, and acceleration status for efficient data management.

Instructions

List all KV store collections across apps. Returns: List of KV store collections with metadata including app, fields, and accelerated fields

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • The handler function decorated with @mcp.tool() that implements the list_kvstore_collections tool. It fetches KV Store collections from Splunk, extracts metadata like fields and accelerated fields, retrieves record counts from introspection stats, and returns a list of collection dictionaries.
    @mcp.tool() async def list_kvstore_collections() -> List[Dict[str, Any]]: """ List all KV store collections across apps. Returns: List of KV store collections with metadata including app, fields, and accelerated fields """ try: service = get_splunk_connection() logger.info("📚 Fetching KV store collections...") collections = [] app_count = 0 collections_found = 0 # Get KV store collection stats to retrieve record counts collection_stats = {} try: stats_response = service.get("/services/server/introspection/kvstore/collectionstats", output_mode="json") stats_data = json.loads(stats_response.body.read()) if "entry" in stats_data and len(stats_data["entry"]) > 0: entry = stats_data["entry"][0] content = entry.get("content", {}) data = content.get("data", {}) for kvstore in data: kvstore = json.loads(kvstore) if "ns" in kvstore and "count" in kvstore: collection_stats[kvstore["ns"]] = kvstore["count"] logger.debug(f"✅ Retrieved stats for {len(collection_stats)} KV store collections") except Exception as e: logger.warning(f"⚠️ Error retrieving KV store collection stats: {str(e)}") try: for entry in service.kvstore: try: collection_name = entry['name'] fieldsList = [f.replace('field.', '') for f in entry['content'] if f.startswith('field.')] accelFields = [f.replace('accelerated_field.', '') for f in entry['content'] if f.startswith('accelerated_field.')] app_name = entry['access']['app'] collection_data = { "name": collection_name, "app": app_name, "fields": fieldsList, "accelerated_fields": accelFields, "record_count": collection_stats.get(f"{app_name}.{collection_name}", 0) } collections.append(collection_data) collections_found += 1 logger.debug(f"✅ Added collection: {collection_name} from app: {app_name}") except Exception as e: logger.warning(f"⚠️ Error processing collection entry: {str(e)}") continue logger.info(f"✅ Found {collections_found} KV store collections") return collections except Exception as e: logger.error(f"❌ Error accessing KV store collections: {str(e)}") raise except Exception as e: logger.error(f"❌ Error listing KV store collections: {str(e)}") raise

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/livehybrid/splunk-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server