Skip to main content
Glama

MCP Server for Splunk

Apache 2.0
16
  • Apple
  • Linux
utils.py5.42 kB
""" Common utilities for the MCP Splunk server core framework. Provides shared utility functions used across tools, resources, and prompts. """ import logging from typing import Any logger = logging.getLogger(__name__) def validate_splunk_connection(ctx: Any) -> tuple[bool, Any, str]: """ Validate Splunk connection from MCP context. Args: ctx: MCP context containing lifespan context Returns: Tuple of (is_available, service, error_message) """ try: splunk_ctx = ctx.request_context.lifespan_context if not splunk_ctx.is_connected or not splunk_ctx.service: return ( False, None, "Splunk service is not available. MCP server is running in degraded mode.", ) return True, splunk_ctx.service, "" except AttributeError as e: logger.error(f"Invalid context structure: {e}") return False, None, "Invalid context structure" except Exception as e: logger.error(f"Unexpected error validating Splunk connection: {e}") return False, None, f"Connection validation error: {str(e)}" def format_error_response(error: str, **kwargs) -> dict[str, Any]: """ Format a consistent error response. Args: error: Error message **kwargs: Additional error context Returns: Formatted error response dictionary """ return {"status": "error", "error": error, **kwargs} def format_success_response(data: dict[str, Any]) -> dict[str, Any]: """ Format a consistent success response. Args: data: Response data Returns: Formatted success response dictionary """ return {"status": "success", **data} def sanitize_search_query(query: str) -> str: """ Sanitize and prepare a Splunk search query. Args: query: Raw search query Returns: Sanitized search query with 'search' command added if needed """ query = query.strip() # Add 'search' command if not present and query doesn't start with a pipe if not query.lower().startswith(("search ", "| ")): query = f"search {query}" return query def validate_time_range(earliest_time: str, latest_time: str) -> tuple[bool, str]: """ Validate time range parameters. Args: earliest_time: Start time for search latest_time: End time for search Returns: Tuple of (is_valid, error_message) """ # Basic validation - could be expanded with more sophisticated checks if not earliest_time or not latest_time: return False, "Both earliest_time and latest_time must be provided" # Check for obviously invalid formats invalid_chars = ["<", ">", ";", "&", "|", "`"] for char in invalid_chars: if char in earliest_time or char in latest_time: return False, f"Invalid character '{char}' in time range" return True, "" def log_tool_execution(tool_name: str, **kwargs): """ Log tool execution for monitoring and debugging. Args: tool_name: Name of the tool being executed **kwargs: Additional context to log """ logger.info(f"Executing tool: {tool_name}") if kwargs: logger.debug(f"Tool parameters: {kwargs}") def filter_customer_indexes(indexes): """ Filter out internal Splunk indexes from the collection to improve performance and focus on customer-defined indexes. Internal indexes typically start with underscore (_) and include: - _internal: Splunk's internal logs - _audit: Audit logs - _introspection: Performance monitoring - _thefishbucket: Internal tracking - _telemetry: Usage data Args: indexes: Splunk indexes collection or list Returns: List of customer-defined indexes only """ # Handle both iterables and individual index objects customer_indexes = [] try: for idx in indexes: # Handle both index objects with .name attribute and string names index_name = idx.name if hasattr(idx, "name") else str(idx) # Skip any index that starts with underscore (Splunk internal convention) if not index_name.startswith("_"): customer_indexes.append(idx) except (AttributeError, TypeError) as e: logger.warning(f"Error filtering indexes: {e}") # If filtering fails, return original collection as fallback return list(indexes) if indexes else [] return customer_indexes def truncate_large_response(data: Any, max_items: int = 1000) -> tuple[Any, bool]: """ Truncate large responses to prevent overwhelming the client. Args: data: Response data to potentially truncate max_items: Maximum number of items to include Returns: Tuple of (truncated_data, was_truncated) """ if isinstance(data, list) and len(data) > max_items: return data[:max_items], True if isinstance(data, dict) and "results" in data and isinstance(data["results"], list): if len(data["results"]) > max_items: truncated_data = data.copy() truncated_data["results"] = data["results"][:max_items] truncated_data["truncated"] = True truncated_data["original_count"] = len(data["results"]) return truncated_data, True return data, False

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/deslicer/mcp-for-splunk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server