Skip to main content
Glama
deslicer

MCP Server for Splunk

utils.py5.77 kB
""" Common utilities for the MCP Splunk server core framework. Provides shared utility functions used across tools, resources, and prompts. """ import logging from typing import Any logger = logging.getLogger(__name__) try: from src.core.security import sanitize_search_query as _secure_sanitize from src.core.security import validate_search_query as _secure_validate _SECURITY_AVAILABLE = True except ImportError: _SECURITY_AVAILABLE = False def validate_splunk_connection(ctx: Any) -> tuple[bool, Any, str]: """ Validate Splunk connection from MCP context. Args: ctx: MCP context containing lifespan context Returns: Tuple of (is_available, service, error_message) """ try: splunk_ctx = ctx.request_context.lifespan_context if not splunk_ctx.is_connected or not splunk_ctx.service: return ( False, None, "Splunk service is not available. MCP server is running in degraded mode.", ) return True, splunk_ctx.service, "" except AttributeError as e: logger.error(f"Invalid context structure: {e}") return False, None, "Invalid context structure" except Exception as e: logger.error(f"Unexpected error validating Splunk connection: {e}") return False, None, f"Connection validation error: {str(e)}" def format_error_response(error: str, **kwargs) -> dict[str, Any]: """ Format a consistent error response. Args: error: Error message **kwargs: Additional error context Returns: Formatted error response dictionary """ return {"status": "error", "error": error, **kwargs} def format_success_response(data: dict[str, Any]) -> dict[str, Any]: """ Format a consistent success response. Args: data: Response data Returns: Formatted success response dictionary """ return {"status": "success", **data} def sanitize_search_query(query: str) -> str: """ Sanitize and prepare a Splunk search query. Args: query: Raw search query Returns: Sanitized search query with 'search' command added if needed Raises: QuerySecurityError: If the query contains security violations """ if _SECURITY_AVAILABLE: return _secure_sanitize(query) query = query.strip() if not query.lower().startswith(("search ", "| ")): query = f"search {query}" return query def validate_search_query(query: str, strict: bool = True) -> tuple[bool, list]: """ Validate a Splunk search query for security issues. Args: query: The SPL query to validate strict: If True, raise exception on violation; if False, return violations Returns: Tuple of (is_valid, violations_list) """ if _SECURITY_AVAILABLE: return _secure_validate(query, strict=strict) return True, [] def validate_time_range(earliest_time: str, latest_time: str) -> tuple[bool, str]: """ Validate time range parameters. Args: earliest_time: Start time for search latest_time: End time for search Returns: Tuple of (is_valid, error_message) """ if not earliest_time or not latest_time: return False, "Both earliest_time and latest_time must be provided" invalid_chars = ["<", ">", ";", "&", "|", "`"] for char in invalid_chars: if char in earliest_time or char in latest_time: return False, f"Invalid character '{char}' in time range" return True, "" def log_tool_execution(tool_name: str, **kwargs): """ Log tool execution for monitoring and debugging. Args: tool_name: Name of the tool being executed **kwargs: Additional context to log """ logger.info(f"Executing tool: {tool_name}") if kwargs: logger.debug(f"Tool parameters: {kwargs}") def filter_customer_indexes(indexes): """ Filter out internal Splunk indexes from the collection to improve performance and focus on customer-defined indexes. Internal indexes typically start with underscore (_) and include: - _internal: Splunk's internal logs - _audit: Audit logs - _introspection: Performance monitoring - _thefishbucket: Internal tracking - _telemetry: Usage data Args: indexes: Splunk indexes collection or list Returns: List of customer-defined indexes only """ customer_indexes = [] try: for idx in indexes: index_name = idx.name if hasattr(idx, "name") else str(idx) if not index_name.startswith("_"): customer_indexes.append(idx) except (AttributeError, TypeError) as e: logger.warning(f"Error filtering indexes: {e}") return list(indexes) if indexes else [] return customer_indexes def truncate_large_response(data: Any, max_items: int = 1000) -> tuple[Any, bool]: """ Truncate large responses to prevent overwhelming the client. Args: data: Response data to potentially truncate max_items: Maximum number of items to include Returns: Tuple of (truncated_data, was_truncated) """ if isinstance(data, list) and len(data) > max_items: return data[:max_items], True if isinstance(data, dict) and "results" in data and isinstance(data["results"], list): if len(data["results"]) > max_items: truncated_data = data.copy() truncated_data["results"] = data["results"][:max_items] truncated_data["truncated"] = True truncated_data["original_count"] = len(data["results"]) return truncated_data, True return data, False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/deslicer/mcp-for-splunk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server