Skip to main content
Glama

Black Orchid

by AJ-Gonzalez
black_orchid.py15.9 kB
"""Black Orchid: Hot-reloadable MCP proxy server with collision detection/ Hackable scripting for MCP""" from pathlib import Path from typing import Any from datetime import datetime from importlib.util import spec_from_file_location, module_from_spec import glob import ast import sys from fastmcp import FastMCP # ProxyHandler: Dynamic module loading with collision detection class ProxyHandler: """Proxy Handler class for loading python modules dynamically. Auto-discovers modules from: - modules/ (public, committed to git) - private/modules/ (private, gitignored) if it exists Validates all paths to prevent directory traversal attacks. """ def __init__(self): """Initialize ProxyHandler with auto-discovery of module directories.""" # Base directory (where black_orchid.py lives) self.base_dir = Path(__file__).parent.resolve() # Module directories to scan self.modules_dir = self.base_dir / "modules" self.private_modules_dir = self.base_dir / "private" / "modules" # Valid module directories (for path validation) self.valid_dirs = [self.modules_dir.resolve()] if self.private_modules_dir.exists(): self.valid_dirs.append(self.private_modules_dir.resolve()) # Registry structure: tool_name -> tool metadata self.registry = {} # Track original function names for collision detection self._name_tracker = {} # original_name -> list of (module_name, final_tool_name) # Track rejected modules for debugging self.rejected_modules = [] # list of (path, reason) tuples # Discover modules from all valid directories self.raw_modules = [] for valid_dir in self.valid_dirs: self.raw_modules.extend(glob.glob(str(valid_dir / "*.py"))) self.okmods = [] # Validate and check modules for mod in self.raw_modules: mod_path = Path(mod).resolve() is_valid_path = any( mod_path.is_relative_to(valid_dir) for valid_dir in self.valid_dirs ) if not is_valid_path: self.rejected_modules.append((mod, "path_traversal_attempt")) continue if mod_path.stem == "toolset": continue try: with open(mod_path, "r", encoding="utf-8") as f: source = f.read() ast.parse(source) self.okmods.append(str(mod_path)) except SyntaxError: self.rejected_modules.append((mod, "syntax_error")) except Exception as e: self.rejected_modules.append((mod, f"read_error: {e}")) # Load modules and build registry with collision detection self.loaded_mods = {} for mod_path in self.okmods: mod_name = Path(mod_path).stem # Import the module from file path using importlib.util spec = spec_from_file_location(mod_name, mod_path) if spec is None or spec.loader is None: continue tmod = module_from_spec(spec) sys.modules[mod_name] = tmod spec.loader.exec_module(tmod) self.loaded_mods[mod_name] = tmod # Extract toolable endpoints (functions) # Filter out dunder methods, non-lowercase, and underscore-prefixed helpers clean_list = [x for x in dir(tmod) if "__" not in x and x.islower() and not x.startswith('_')] # Register each function with collision detection for fn_name in clean_list: self._register_tool( original_name=fn_name, module_name=mod_name, function=getattr(tmod, fn_name) ) def _register_tool(self, original_name: str, module_name: str, function: callable): """Register a tool with collision detection. If this is the first time seeing this function name, register it simply. If we've seen it before (collision), retroactively rename the first one and give this one a suffixed name too. """ # Check if we've seen this function name before if original_name in self._name_tracker: # COLLISION DETECTED # Retroactively rename the first occurrence first_module, first_tool_name = self._name_tracker[original_name][0] # Only rename if it hasn't been renamed yet (still using original name) # AND the tool still exists in registry (might have been removed during reload) if first_tool_name == original_name and original_name in self.registry: new_first_name = f"{original_name}_{first_module}" # Move the registry entry self.registry[new_first_name] = self.registry.pop(original_name) self.registry[new_first_name]["had_collision"] = True # Update tracker self._name_tracker[original_name][0] = (first_module, new_first_name) # Register this new one with suffix new_tool_name = f"{original_name}_{module_name}" self.registry[new_tool_name] = { "function": function, "docstring": function.__doc__, "source_module": module_name, "original_name": original_name, "had_collision": True } # Track this collision self._name_tracker[original_name].append((module_name, new_tool_name)) else: # First time seeing this name - register simply self.registry[original_name] = { "function": function, "docstring": function.__doc__, "source_module": module_name, "original_name": original_name, "had_collision": False } # Start tracking this name self._name_tracker[original_name] = [(module_name, original_name)] def use_proxy_tool(self, tool_id: str, kwargs: dict) -> Any: """Use proxy tool by ID with keyword arguments.""" if tool_id not in self.registry: raise KeyError(f"Tool '{tool_id}' not found in registry. Available tools: {list(self.registry.keys())}") proxy_fn = self.registry[tool_id]["function"] return proxy_fn(**kwargs) def list_tools(self) -> dict: """List all registered tools with their docstrings.""" return {name: info["docstring"] for name, info in self.registry.items()} def reload_all_modules(self) -> str: """Reload all modules from scratch. Rebuilds collision detection.""" # Clear all state self.registry.clear() self._name_tracker.clear() self.loaded_mods.clear() self.rejected_modules.clear() # Re-discover modules self.raw_modules = [] for valid_dir in self.valid_dirs: self.raw_modules.extend(glob.glob(str(valid_dir / "*.py"))) self.okmods = [] # Validate and check modules for mod in self.raw_modules: mod_path = Path(mod).resolve() is_valid_path = any( mod_path.is_relative_to(valid_dir) for valid_dir in self.valid_dirs ) if not is_valid_path: self.rejected_modules.append((mod, "path_traversal_attempt")) continue if mod_path.stem == "toolset": continue try: with open(mod_path, "r", encoding="utf-8") as f: source = f.read() ast.parse(source) self.okmods.append(str(mod_path)) except SyntaxError: self.rejected_modules.append((mod, "syntax_error")) except Exception as e: self.rejected_modules.append((mod, f"read_error: {e}")) # Load modules for mod_path in self.okmods: mod_name = Path(mod_path).stem # Reload if already in sys.modules, otherwise load fresh if mod_name in sys.modules: try: from importlib import reload tmod = reload(sys.modules[mod_name]) except Exception: # If reload fails, try fresh import spec = spec_from_file_location(mod_name, mod_path) if spec is None or spec.loader is None: continue tmod = module_from_spec(spec) sys.modules[mod_name] = tmod spec.loader.exec_module(tmod) else: spec = spec_from_file_location(mod_name, mod_path) if spec is None or spec.loader is None: continue tmod = module_from_spec(spec) sys.modules[mod_name] = tmod spec.loader.exec_module(tmod) self.loaded_mods[mod_name] = tmod # Register tools # Filter out dunder methods, non-lowercase, and underscore-prefixed helpers clean_list = [x for x in dir(tmod) if "__" not in x and x.islower() and not x.startswith('_')] for fn_name in clean_list: self._register_tool( original_name=fn_name, module_name=mod_name, function=getattr(tmod, fn_name) ) # Return summary num_tools = len(self.registry) num_modules = len(self.loaded_mods) return f"Loaded {num_tools} tools from {num_modules} modules" def reload_module(self, module_name: str) -> dict: """Reload a specific module. Collision suffixes remain permanent for the session.""" if module_name not in self.loaded_mods: return { "success": False, "error": f"Module '{module_name}' not currently loaded" } # Track tools before reload old_tools = { tool_id: info for tool_id, info in self.registry.items() if info["source_module"] == module_name } old_tool_names = set(old_tools.keys()) # Try to reload the module try: # Find the module file mod_path = None for valid_dir in self.valid_dirs: candidate = valid_dir / f"{module_name}.py" if candidate.exists(): mod_path = candidate break if mod_path is None: raise FileNotFoundError(f"Module file for '{module_name}' not found") # Reload using spec (same method as initial load) spec = spec_from_file_location(module_name, mod_path) if spec is None or spec.loader is None: raise ImportError(f"Could not create spec for '{module_name}'") # Re-execute the module spec.loader.exec_module(self.loaded_mods[module_name]) reloaded_module = self.loaded_mods[module_name] # Remove old tools from registry for tool_id in old_tool_names: del self.registry[tool_id] # Register new tools from reloaded module clean_list = [x for x in dir(reloaded_module) if "__" not in x and x.islower()] for fn_name in clean_list: self._register_tool( original_name=fn_name, module_name=module_name, function=getattr(reloaded_module, fn_name) ) # Track tools after reload new_tools = { tool_id: info for tool_id, info in self.registry.items() if info["source_module"] == module_name } new_tool_names = set(new_tools.keys()) tools_added = list(new_tool_names - old_tool_names) tools_removed = list(old_tool_names - new_tool_names) result = { "success": True, "reloaded": module_name, "tools_added": tools_added, "tools_removed": tools_removed } # Add suggestion if tools changed if tools_added or tools_removed: result["suggestion"] = "Consider reload_all() to rebuild collision detection" return result except Exception as e: # Reload failed - keep old version, return error with traceback import traceback as tb error_summary = ''.join(tb.format_exception(type(e), e, e.__traceback__)) return { "success": False, "error": f"Failed to reload '{module_name}'", "traceback": error_summary, "note": "Old version of module is still loaded" } # Initialize proxy handler proxy_handler = ProxyHandler() # Initialize MCP server mcp = FastMCP("Black Orchid") # General Utilities @mcp.tool def check_time(): """Check date and time""" dt_string = str(datetime.now()).split(".", maxsplit=1)[0].split(" ") formatted_date = f"{dt_string[0]}_{dt_string[1].replace(":","-")}" return formatted_date @mcp.tool def list_proxy_tools() -> dict: """List all tools available via proxy. Returns: dict: Tool names with their docstrings """ return proxy_handler.list_tools() @mcp.tool def use_proxy_tool(tool_id: str, kwargs: dict) -> Any: """Use a proxy tool by ID. Provide tool ID (from list_proxy_tools) and arguments as a dictionary. The dictionary will be unpacked as keyword arguments. Args: tool_id (str): Tool name (may include module suffix if collision detected) kwargs (dict): Keyword arguments for the tool Returns: Any: Result from the proxied tool function """ return proxy_handler.use_proxy_tool(tool_id, kwargs) @mcp.tool def search_for_proxy_tool(search_term: str) -> dict: """Search for proxy tools by keyword. Args: search_term (str): Keyword to search for in tool names Returns: dict: Matching tool names with docstrings, or empty dict if none found """ all_tools = proxy_handler.list_tools() matches = {} for tool_name, docstring in all_tools.items(): if search_term.lower() in tool_name.lower(): matches[tool_name] = docstring return matches @mcp.tool def reload_all_modules() -> str: """Reload all proxy modules from scratch. Clears and rebuilds the entire tool registry with fresh collision detection. Use this when you've made significant changes or when tool naming gets confusing. Returns: str: Summary of loaded tools and modules """ return proxy_handler.reload_all_modules() @mcp.tool def reload_module(module_name: str) -> dict: """Reload a specific proxy module. Reloads one module while keeping collision suffixes permanent for the session. If the reload fails, the old version stays loaded. Args: module_name (str): Name of module to reload (without .py extension) Returns: dict: Detailed report with tools_added, tools_removed, and any errors """ return proxy_handler.reload_module(module_name) @mcp.tool def list_rejected_modules() -> list: """List modules that were rejected during loading. Useful for debugging why a module didn't load. Shows path and reason (syntax_error, path_traversal_attempt, etc.) Returns: list: List of (path, reason) tuples for rejected modules """ return proxy_handler.rejected_modules if __name__ == "__main__": mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AJ-Gonzalez/black-orchid'

If you have feedback or need assistance with the MCP directory API, please join our Discord server