Skip to main content
Glama

MCP Server for Splunk

Apache 2.0
16
  • Apple
  • Linux
discovery.pyβ€’8.71 kB
""" Discovery system for automatically finding and loading tools, resources, and prompts. Provides automatic discovery of MCP components from the file system. """ import importlib import logging import pkgutil from .base import BasePrompt, BaseResource, BaseTool from .registry import prompt_registry, resource_registry, tool_registry logger = logging.getLogger(__name__) def discover_tools(search_paths: list[str] | None = None) -> int: """ Discover and register tools from specified paths. Args: search_paths: Paths to search for tools (defaults to core and contrib tools) Returns: Number of tools discovered and registered """ if search_paths is None: search_paths = [ "src.tools", # Core tools "contrib.tools", # Community tools ] discovered_count = 0 for search_path in search_paths: try: discovered_count += _discover_modules_in_package( search_path, BaseTool, _register_tool_class ) except ImportError as e: logger.debug(f"Could not import {search_path}: {e}") except Exception as e: logger.error(f"Error discovering tools in {search_path}: {e}") logger.info(f"Discovered and registered {discovered_count} tools") return discovered_count def discover_resources(search_paths: list[str] | None = None) -> int: """ Discover and register resources from specified paths. Args: search_paths: Paths to search for resources (defaults to core and contrib resources) Returns: Number of resources discovered and registered """ if search_paths is None: search_paths = [ "src.resources", # Core and documentation resources "contrib.resources", # Community resources ] discovered_count = 0 for search_path in search_paths: try: discovered_count += _discover_modules_in_package( search_path, BaseResource, _register_resource_class ) except ImportError as e: logger.debug(f"Could not import {search_path}: {e}") except Exception as e: logger.error(f"Error discovering resources in {search_path}: {e}") logger.info(f"Discovered and registered {discovered_count} resources") return discovered_count def discover_prompts(search_paths: list[str] | None = None) -> int: """ Discover and register prompts from specified paths. Args: search_paths: Paths to search for prompts (defaults to core and contrib prompts) Returns: Number of prompts discovered and registered """ if search_paths is None: search_paths = [ "src.prompts", # Core prompts "contrib.prompts", # Community prompts ] discovered_count = 0 for search_path in search_paths: try: discovered_count += _discover_modules_in_package( search_path, BasePrompt, _register_prompt_class ) except ImportError as e: logger.debug(f"Could not import {search_path}: {e}") except Exception as e: logger.error(f"Error discovering prompts in {search_path}: {e}") logger.info(f"Discovered and registered {discovered_count} prompts") return discovered_count def _discover_modules_in_package(package_name: str, base_class: type, register_func) -> int: """ Discover modules in a package and register classes that inherit from base_class. Args: package_name: Package to search base_class: Base class to look for register_func: Function to call for registration Returns: Number of classes discovered and registered """ try: package = importlib.import_module(package_name) except ImportError: logger.debug(f"Package {package_name} not found or not importable") return 0 discovered_count = 0 # Walk through all modules in the package if hasattr(package, "__path__"): for _importer, modname, _ispkg in pkgutil.walk_packages( package.__path__, package.__name__ + "." ): try: module = importlib.import_module(modname) discovered_count += _process_module(module, base_class, register_func) except Exception as e: logger.error(f"Error importing module {modname}: {e}") else: # Single module package discovered_count += _process_module(package, base_class, register_func) return discovered_count def _process_module(module, base_class: type, register_func) -> int: """ Process a module to find and register classes. Args: module: Module to process base_class: Base class to look for register_func: Function to call for registration Returns: Number of classes processed """ discovered_count = 0 for name in dir(module): obj = getattr(module, name) # Check if it's a class that inherits from base_class but isn't base_class itself if ( isinstance(obj, type) and issubclass(obj, base_class) and obj is not base_class and not name.startswith("_") ): try: register_func(obj, module) discovered_count += 1 logger.debug(f"Discovered {base_class.__name__}: {name}") except Exception as e: logger.error(f"Error registering {name}: {e}") return discovered_count def _register_tool_class(tool_class: type[BaseTool], module) -> None: """Register a discovered tool class""" # Look for metadata in the module or class metadata = _get_tool_metadata(tool_class, module) if metadata: tool_registry.register(tool_class, metadata) def _register_resource_class(resource_class: type[BaseResource], module) -> None: """Register a discovered resource class""" # Look for metadata in the module or class metadata = _get_resource_metadata(resource_class, module) if metadata: resource_registry.register(resource_class, metadata) def _register_prompt_class(prompt_class: type[BasePrompt], module) -> None: """Register a discovered prompt class""" # Look for metadata in the module or class metadata = _get_prompt_metadata(prompt_class, module) if metadata: prompt_registry.register(prompt_class, metadata) def _get_tool_metadata(tool_class: type[BaseTool], module): """Extract tool metadata from class or module""" from .base import ToolMetadata # Look for METADATA attribute in class if hasattr(tool_class, "METADATA"): return tool_class.METADATA # Look for metadata in module if hasattr(module, "TOOL_METADATA"): return module.TOOL_METADATA # Create default metadata class_name = tool_class.__name__ return ToolMetadata( name=_camel_to_snake(class_name), description=tool_class.__doc__ or f"Tool: {class_name}", category="general", ) def _get_resource_metadata(resource_class: type[BaseResource], module): """Extract resource metadata from class or module""" from .base import ResourceMetadata # Look for METADATA attribute in class if hasattr(resource_class, "METADATA"): return resource_class.METADATA # Look for metadata in module if hasattr(module, "RESOURCE_METADATA"): return module.RESOURCE_METADATA # Create default metadata class_name = resource_class.__name__ uri = f"resource://{_camel_to_snake(class_name)}" return ResourceMetadata( uri=uri, name=class_name, description=resource_class.__doc__ or f"Resource: {class_name}", category="general", ) def _get_prompt_metadata(prompt_class: type[BasePrompt], module): """Extract prompt metadata from class or module""" from .base import PromptMetadata # Look for METADATA attribute in class if hasattr(prompt_class, "METADATA"): return prompt_class.METADATA # Look for metadata in module if hasattr(module, "PROMPT_METADATA"): return module.PROMPT_METADATA # Create default metadata class_name = prompt_class.__name__ return PromptMetadata( name=_camel_to_snake(class_name), description=prompt_class.__doc__ or f"Prompt: {class_name}", category="general", ) def _camel_to_snake(name: str) -> str: """Convert CamelCase to snake_case""" import re s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name) return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/deslicer/mcp-for-splunk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server