Skip to main content
Glama

MCP Server for Splunk

Apache 2.0
16
  • Apple
  • Linux
embedded.pyβ€’32.9 kB
""" Enhanced Embedded Resources for MCP Server. Provides enhanced resource functionality including embedded content, resource templates, and improved discovery mechanisms following MCP patterns. """ import asyncio import base64 import hashlib import json import logging import mimetypes import re import time from abc import abstractmethod from collections.abc import Callable from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any from fastmcp import Context from ..core.base import BaseResource, ResourceMetadata from ..core.client_identity import get_client_manager from ..core.enhanced_config_extractor import EnhancedConfigExtractor from ..core.registry import resource_registry logger = logging.getLogger(__name__) @dataclass class ContentValidationResult: """Result of content validation.""" is_valid: bool errors: list[str] warnings: list[str] class ContentValidator: """Validator for embedded resource content.""" @staticmethod def validate_json(content: str) -> ContentValidationResult: """Validate JSON content.""" errors = [] warnings = [] try: json.loads(content) except json.JSONDecodeError as e: errors.append(f"Invalid JSON: {e}") return ContentValidationResult( is_valid=len(errors) == 0, errors=errors, warnings=warnings ) @staticmethod def validate_markdown(content: str) -> ContentValidationResult: """Validate Markdown content.""" errors = [] warnings = [] # Basic markdown validation if not content.strip(): errors.append("Empty content") # Check for common markdown issues if content.count('#') > 10: warnings.append("Many headers detected - consider structure") return ContentValidationResult( is_valid=len(errors) == 0, errors=errors, warnings=warnings ) @staticmethod def validate_text(content: str) -> ContentValidationResult: """Validate text content.""" errors = [] warnings = [] if not content.strip(): errors.append("Empty content") if len(content) > 1000000: # 1MB warnings.append("Large content detected") return ContentValidationResult( is_valid=len(errors) == 0, errors=errors, warnings=warnings ) class EmbeddedResource(BaseResource): """ Enhanced base class for embedded resources that can contain both text and binary content. Embedded resources provide enhanced functionality including: - Embedded content within resource metadata - Binary data support with base64 encoding - MIME type detection and validation - Content caching and optimization - Content validation - ETag support for caching """ def __init__( self, uri: str, name: str, description: str, mime_type: str = "text/plain", embedded_content: str | bytes | None = None, cache_ttl: int = 300, validate_content: bool = True, etag_enabled: bool = True ): super().__init__(uri, name, description, mime_type) self.embedded_content = embedded_content self.cache_ttl = cache_ttl self.validate_content = validate_content self.etag_enabled = etag_enabled self._cached_content = None self._cache_timestamp = 0 self._etag = None self._content_hash = None # Back-reference to a registry that may manage this resource's stats self._registry = None async def get_content(self, ctx: Context) -> str: """ Get resource content with embedded content support. Args: ctx: MCP context Returns: Resource content as string (JSON for binary data) """ start_time = time.time() try: # Check cache first if self._is_cache_valid(): content = self._cached_content self._update_registry_stats(start_time, error=False) return content # Get content from embedded data or generate dynamically if self.embedded_content is not None: content = await self._process_embedded_content() else: content = await self._generate_dynamic_content(ctx) # Validate content if enabled if self.validate_content: validation_result = self._validate_content(content) if not validation_result.is_valid: logger.warning(f"Content validation failed for {self.uri}: {validation_result.errors}") # Return structured error so callers can safely parse # Include original content snapshot to aid debugging/tests error_response = self._create_error_response( f"Content validation failed: {validation_result.errors}; original={content}" ) self._update_registry_stats(start_time, error=True) return error_response if validation_result.warnings: logger.info(f"Content validation warnings for {self.uri}: {validation_result.warnings}") # Cache the result self._cache_content(content) self._update_registry_stats(start_time, error=False) return content except Exception as e: logger.error(f"Error getting content for {self.uri}: {e}") error_response = self._create_error_response(str(e)) self._update_registry_stats(start_time, error=True) return error_response def _update_registry_stats(self, start_time: float, error: bool = False) -> None: """Update registry access statistics if this resource is managed by a registry.""" registry = getattr(self, "_registry", None) if not registry: return stats = registry._access_stats.get(self.uri) if stats is None: return duration = max(0.0, time.time() - start_time) # Update running average response time access_count_before = stats.get("access_count", 0) avg_before = stats.get("average_response_time", 0.0) avg_after = ( (avg_before * access_count_before + duration) / (access_count_before + 1) if access_count_before >= 0 else duration ) stats["access_count"] = access_count_before + 1 stats["last_accessed"] = datetime.now() stats["average_response_time"] = avg_after if error: stats["error_count"] = stats.get("error_count", 0) + 1 async def _process_embedded_content(self) -> str: """Process embedded content and return appropriate format.""" if isinstance(self.embedded_content, str): # Text content return self.embedded_content elif isinstance(self.embedded_content, bytes): # Binary content - encode as base64 and return as JSON encoded = base64.b64encode(self.embedded_content).decode('utf-8') return json.dumps({ "type": "binary", "mime_type": self.mime_type, "data": encoded, "size": len(self.embedded_content), "etag": self._generate_etag() if self.etag_enabled else None }) else: raise ValueError(f"Unsupported embedded content type: {type(self.embedded_content)}") async def _generate_dynamic_content(self, ctx: Context) -> str: """Generate dynamic content - override in subclasses.""" raise NotImplementedError("Subclasses must implement _generate_dynamic_content") def _is_cache_valid(self) -> bool: """Check if cached content is still valid.""" if self._cached_content is None: return False return (time.time() - self._cache_timestamp) < self.cache_ttl def _cache_content(self, content: str) -> None: """Cache content with timestamp.""" self._cached_content = content self._cache_timestamp = time.time() self._content_hash = self._generate_content_hash(content) def _generate_content_hash(self, content: str) -> str: """Generate content hash for ETag.""" return hashlib.md5(content.encode('utf-8')).hexdigest() def _generate_etag(self) -> str | None: """Generate ETag for the resource.""" if not self.etag_enabled: return None # Prefer previously computed content hash if self._content_hash: return f'"{self._content_hash}"' # Fall back to hashing the embedded content when available if self.embedded_content is not None: try: if isinstance(self.embedded_content, bytes): content_hash = hashlib.md5(self.embedded_content).hexdigest() else: content_hash = hashlib.md5(str(self.embedded_content).encode('utf-8')).hexdigest() self._content_hash = content_hash return f'"{content_hash}"' except Exception: return None return None def _validate_content(self, content: str) -> ContentValidationResult: """Validate content based on MIME type.""" if self.mime_type == "application/json": return ContentValidator.validate_json(content) elif self.mime_type in ["text/markdown", "text/x-markdown"]: return ContentValidator.validate_markdown(content) else: return ContentValidator.validate_text(content) def _create_error_response(self, error_message: str) -> str: """Create error response in consistent format.""" return json.dumps({ "error": error_message, "uri": self.uri, "type": "error", "timestamp": datetime.now().isoformat() }) def get_metadata(self) -> ResourceMetadata: """Get enhanced metadata for this resource.""" return ResourceMetadata( uri=self.uri, name=self.name, description=self.description, mime_type=self.mime_type, category="embedded", tags=["embedded", "content", "cached" if self.cache_ttl > 0 else "dynamic"] ) class FileEmbeddedResource(EmbeddedResource): """ Enhanced embedded resource that loads content from files. Supports both text and binary files with automatic MIME type detection, file watching, and content validation. """ def __init__( self, uri: str, name: str, description: str, file_path: str, mime_type: str | None = None, encoding: str = "utf-8", watch_file: bool = False ): # Auto-detect MIME type if not provided if mime_type is None: detected_mime_type, _ = mimetypes.guess_type(file_path) mime_type = detected_mime_type or "application/octet-stream" super().__init__(uri, name, description, mime_type) self.file_path = Path(file_path) self.encoding = encoding self.watch_file = watch_file self._file_mtime = None async def get_content(self, ctx: Context) -> str: """Get content from file with enhanced error handling.""" start_time = time.time() try: if not self.file_path.exists(): error_response = self._create_error_response(f"File not found: {self.file_path}") self._update_registry_stats(start_time, error=True) return error_response # Check if file has changed (for file watching) if self.watch_file and self._file_mtime: current_mtime = self.file_path.stat().st_mtime if current_mtime > self._file_mtime: # File changed, invalidate cache self._cached_content = None self._cache_timestamp = 0 # Read file content is_text_like = ( self.mime_type.startswith('text/') or self.mime_type == 'application/json' or self.mime_type.endswith('+json') ) if is_text_like: # Text file content = self.file_path.read_text(encoding=self.encoding) self._file_mtime = self.file_path.stat().st_mtime self._update_registry_stats(start_time, error=False) return content else: # Binary file binary_content = self.file_path.read_bytes() encoded = base64.b64encode(binary_content).decode('utf-8') self._file_mtime = self.file_path.stat().st_mtime result = json.dumps({ "type": "binary", "mime_type": self.mime_type, "data": encoded, "size": len(binary_content), "filename": self.file_path.name, "etag": self._generate_etag() if self.etag_enabled else None }) self._update_registry_stats(start_time, error=False) return result except UnicodeDecodeError as e: logger.error(f"Encoding error reading file {self.file_path}: {e}") error_response = self._create_error_response(f"Encoding error: {str(e)}") self._update_registry_stats(start_time, error=True) return error_response except PermissionError as e: logger.error(f"Permission error reading file {self.file_path}: {e}") error_response = self._create_error_response(f"Permission denied: {str(e)}") self._update_registry_stats(start_time, error=True) return error_response except Exception as e: logger.error(f"Error reading file {self.file_path}: {e}") error_response = self._create_error_response(f"Error reading file: {str(e)}") self._update_registry_stats(start_time, error=True) return error_response class TemplateEmbeddedResource(EmbeddedResource): """ Enhanced template-based embedded resource that supports URI parameters. Allows dynamic resource generation based on URI parameters with validation and type conversion. """ def __init__( self, uri_template: str, name: str, description: str, mime_type: str = "text/plain", parameter_validators: dict[str, Callable] | None = None ): super().__init__(uri_template, name, description, mime_type) self.uri_template = uri_template self.parameter_validators = parameter_validators or {} async def get_content(self, ctx: Context, uri: str = None) -> str: """ Get content with URI parameter extraction and validation. Args: ctx: MCP context uri: Specific URI (optional) Returns: Generated content based on URI parameters """ try: # Extract parameters from URI params = self._extract_uri_parameters(uri or self.uri) # Validate parameters validation_errors = self._validate_parameters(params) if validation_errors: return self._create_error_response(f"Parameter validation failed: {validation_errors}") # Generate content based on parameters content = await self._generate_content_from_params(ctx, params) return content except Exception as e: logger.error(f"Error generating template content: {e}") return self._create_error_response(f"Template error: {str(e)}") def _extract_uri_parameters(self, uri: str) -> dict[str, str]: """Extract parameters from URI based on template with enhanced parsing.""" params = {} # Enhanced parameter extraction if '{' in self.uri_template and '}' in self.uri_template: # Extract parameter names from template param_names = re.findall(r'\{([^}]+)\}', self.uri_template) # Create regex pattern for matching pattern = self.uri_template for param_name in param_names: pattern = pattern.replace(f'{{{param_name}}}', f'(?P<{param_name}>[^/]+)') # Match URI against pattern match = re.match(pattern, uri) if match: params = match.groupdict() return params def _validate_parameters(self, params: dict[str, str]) -> list[str]: """Validate extracted parameters.""" errors = [] for param_name, validator in self.parameter_validators.items(): if param_name in params: try: validator(params[param_name]) except Exception as e: errors.append(f"Parameter '{param_name}' validation failed: {e}") return errors @abstractmethod async def _generate_content_from_params(self, ctx: Context, params: dict[str, str]) -> str: """Generate content based on URI parameters.""" pass class SplunkEmbeddedResource(EmbeddedResource): """ Enhanced embedded resource with Splunk integration. Provides enhanced Splunk-specific resource functionality with client isolation, connection pooling, and error recovery. """ def __init__( self, uri: str, name: str, description: str, mime_type: str = "application/json", connection_timeout: int = 30, retry_attempts: int = 3 ): super().__init__(uri, name, description, mime_type) self.client_manager = get_client_manager() self.connection_timeout = connection_timeout self.retry_attempts = retry_attempts async def get_content(self, ctx: Context) -> str: """Get content with enhanced Splunk client isolation and retry logic.""" last_error = None for attempt in range(self.retry_attempts): try: # Extract client configuration config_extractor = EnhancedConfigExtractor() client_config = await config_extractor.extract_client_config(ctx) if not client_config: return self._create_error_response("No Splunk configuration available") # Get client connection identity, service = await self.client_manager.get_client_connection(ctx, client_config) # Generate Splunk-specific content content = await self._generate_splunk_content(ctx, identity, service) return content except Exception as e: last_error = e logger.warning(f"Splunk connection attempt {attempt + 1} failed: {e}") if attempt < self.retry_attempts - 1: # Wait before retry (exponential backoff) await asyncio.sleep(2 ** attempt) logger.error(f"All Splunk connection attempts failed for {self.uri}") return self._create_error_response(f"Splunk error after {self.retry_attempts} attempts: {str(last_error)}") @abstractmethod async def _generate_splunk_content(self, ctx: Context, identity, service) -> str: """Generate content using Splunk service.""" pass class ResourceTemplate: """ Enhanced template for creating dynamic resources. Provides URI template functionality following RFC 6570 with parameter validation and type conversion. """ def __init__( self, uri_template: str, name: str, description: str, mime_type: str = "text/plain", parameter_types: dict[str, type] | None = None, parameter_defaults: dict[str, Any] | None = None ): self.uri_template = uri_template self.name = name self.description = description self.mime_type = mime_type self.parameter_types = parameter_types or {} self.parameter_defaults = parameter_defaults or {} def expand(self, **params) -> str: """Expand template with parameters and type conversion.""" try: # Apply type conversions converted_params = {} for key, value in params.items(): if key in self.parameter_types: try: converted_params[key] = self.parameter_types[key](value) except (ValueError, TypeError) as e: raise ValueError(f"Parameter '{key}' type conversion failed: {e}") from e else: converted_params[key] = value # Apply defaults for missing parameters for key, default_value in self.parameter_defaults.items(): if key not in converted_params: converted_params[key] = default_value return self.uri_template.format(**converted_params) except KeyError as e: raise ValueError(f"Missing required parameter: {e}") from e except Exception as e: raise ValueError(f"Template expansion error: {e}") from e def get_metadata(self) -> ResourceMetadata: """Get enhanced metadata for this template.""" return ResourceMetadata( uri=self.uri_template, name=self.name, description=self.description, mime_type=self.mime_type, category="template", tags=["template", "dynamic", "parameterized"] ) def validate_parameters(self, **params) -> list[str]: """Validate parameters against template requirements.""" errors = [] # Validate parameter types strictly (no implicit conversion here) for param, value in params.items(): if param in self.parameter_types: expected_type = self.parameter_types[param] if not isinstance(value, expected_type): errors.append(f"Parameter '{param}' type validation failed: expected {expected_type.__name__}") return errors class EmbeddedResourceRegistry: """ Enhanced registry for embedded resources with improved discovery. Provides centralized management of embedded resources with template support, automatic discovery, and performance monitoring. """ def __init__(self): self._resources: dict[str, EmbeddedResource] = {} self._templates: dict[str, ResourceTemplate] = {} self._metadata: dict[str, ResourceMetadata] = {} self._access_stats: dict[str, dict[str, Any]] = {} def register_embedded_resource(self, resource: EmbeddedResource) -> None: """Register an embedded resource with statistics tracking.""" self._resources[resource.uri] = resource self._metadata[resource.uri] = resource.get_metadata() self._access_stats[resource.uri] = { "access_count": 0, "last_accessed": None, "average_response_time": 0.0, "error_count": 0 } # Attach back-reference so the resource can update stats on access try: resource._registry = self except Exception: pass logger.info(f"Registered embedded resource: {resource.uri}") def register_template(self, template: ResourceTemplate) -> None: """Register a resource template with validation.""" # Validate template parameters validation_errors = template.validate_parameters() if validation_errors: logger.warning(f"Template validation warnings: {validation_errors}") self._templates[template.uri_template] = template self._metadata[template.uri_template] = template.get_metadata() logger.info(f"Registered resource template: {template.uri_template}") def get_resource(self, uri: str) -> EmbeddedResource | None: """Get embedded resource by URI with access tracking.""" resource = self._resources.get(uri) if resource: # Update access statistics stats = self._access_stats[uri] stats["access_count"] += 1 stats["last_accessed"] = datetime.now() return resource def get_template(self, uri_template: str) -> ResourceTemplate | None: """Get resource template by URI template.""" return self._templates.get(uri_template) def list_resources(self) -> list[ResourceMetadata]: """List all registered embedded resources with statistics.""" return list(self._metadata.values()) def list_templates(self) -> list[ResourceTemplate]: """List all registered templates.""" return list(self._templates.values()) def get_statistics(self) -> dict[str, Any]: """Get registry statistics.""" total_resources = len(self._resources) total_templates = len(self._templates) # Count how many resources were accessed at least once total_accesses = sum(1 for stats in self._access_stats.values() if stats.get("access_count", 0) > 0) total_errors = sum(stats["error_count"] for stats in self._access_stats.values()) return { "total_resources": total_resources, "total_templates": total_templates, "total_accesses": total_accesses, "total_errors": total_errors, "resource_stats": self._access_stats } def create_from_template(self, template_uri: str, **params) -> EmbeddedResource | None: """Create resource instance from template with validation.""" template = self.get_template(template_uri) if not template: return None try: # Validate parameters validation_errors = template.validate_parameters(**params) if validation_errors: logger.error(f"Template parameter validation failed: {validation_errors}") return None expanded_uri = template.expand(**params) # Create a new resource instance with the expanded URI return EmbeddedResource( uri=expanded_uri, name=f"{template.name} ({expanded_uri})", description=template.description, mime_type=template.mime_type ) except Exception as e: logger.error(f"Error creating resource from template: {e}") return None def cleanup_expired_cache(self) -> int: """Clean up expired cache entries and return count of cleaned items.""" cleaned_count = 0 current_time = time.time() for resource in self._resources.values(): if hasattr(resource, '_cache_timestamp') and resource._cache_timestamp > 0: if current_time - resource._cache_timestamp > resource.cache_ttl: resource._cached_content = None resource._cache_timestamp = 0 cleaned_count += 1 if cleaned_count > 0: logger.info(f"Cleaned up {cleaned_count} expired cache entries") return cleaned_count # Global registry instance embedded_resource_registry = EmbeddedResourceRegistry() def register_embedded_resources(): """Register all embedded resources with the registry.""" try: # Register file-based embedded resources _register_file_resources() # Register template-based resources _register_template_resources() # Register Splunk embedded resources _register_splunk_embedded_resources() # Register with the main resource registry _register_with_main_registry() logger.info("Successfully registered embedded resources") except Exception as e: logger.error(f"Error registering embedded resources: {e}") def _register_file_resources(): """Register file-based embedded resources.""" # Enhanced file resources with better error handling file_resources = [ { "uri": "embedded://docs/README.md", "name": "README Documentation", "description": "Project README file with enhanced formatting", "file_path": "README.md", "mime_type": "text/markdown", "encoding": "utf-8", "watch_file": True }, { "uri": "embedded://docs/CHANGELOG.md", "name": "Changelog", "description": "Project changelog and version history", "file_path": "CHANGELOG.md", "mime_type": "text/markdown", "encoding": "utf-8", "watch_file": True } # { # "uri": "embedded://config/settings.json", # "name": "Application Settings", # "description": "Application configuration settings", # "file_path": "config/settings.json", # "mime_type": "application/json", # "encoding": "utf-8", # "watch_file": False # } ] for resource_config in file_resources: try: resource = FileEmbeddedResource( uri=resource_config["uri"], name=resource_config["name"], description=resource_config["description"], file_path=resource_config["file_path"], mime_type=resource_config.get("mime_type"), encoding=resource_config.get("encoding", "utf-8"), watch_file=resource_config.get("watch_file", False) ) embedded_resource_registry.register_embedded_resource(resource) # Also register a concrete instance with the discovery registry so that # FastMCP can retrieve the same instance without losing file_path state. try: metadata = resource.get_metadata() # Ensure the discovery registry knows this exact instance resource_registry.register_instance(resource, metadata) except Exception as e: logger.debug(f"Could not register instance for discovery: {e}") except Exception as e: logger.warning(f"Could not register file resource {resource_config['uri']}: {e}") def _register_template_resources(): """Register enhanced template-based resources.""" templates = [ ResourceTemplate( uri_template="embedded://docs/{doc_type}/{filename}", name="Documentation Template", description="Template for accessing documentation files with type validation", mime_type="text/markdown", parameter_types={"doc_type": str, "filename": str}, parameter_defaults={"doc_type": "general"} ), ResourceTemplate( uri_template="embedded://config/{config_type}", name="Configuration Template", description="Template for accessing configuration files", mime_type="text/plain", parameter_types={"config_type": str} ), ResourceTemplate( uri_template="embedded://data/{dataset}/{format}", name="Dataset Template", description="Template for accessing datasets in different formats", mime_type="application/json", parameter_types={"dataset": str, "format": str}, parameter_defaults={"format": "json"} ) ] for template in templates: embedded_resource_registry.register_template(template) def _register_splunk_embedded_resources(): """Register Splunk-specific embedded resources.""" # This would register Splunk-specific embedded resources # Implementation depends on your specific Splunk integration needs logger.info("Splunk embedded resources registration placeholder") def _register_with_main_registry(): """Register embedded resources with the main resource registry.""" try: from ..core.registry import resource_registry # Register all embedded resources with the main registry for _uri, resource in embedded_resource_registry._resources.items(): metadata = resource.get_metadata() resource_registry.register(type(resource), metadata) logger.info("Registered embedded resources with main registry") except Exception as e: logger.error(f"Error registering with main registry: {e}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/deslicer/mcp-for-splunk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server