Skip to main content
Glama
xplainable

Xplainable MCP Server

Official
by xplainable
server.py25.6 kB
""" Xplainable MCP Server implementation using FastMCP. This server provides secure access to Xplainable AI platform capabilities through standardized MCP tools. """ import os import sys import logging from typing import Optional, List, Dict, Any from datetime import datetime from fastmcp import FastMCP from dotenv import load_dotenv from pydantic import BaseModel, Field from .response_handlers import ( handle_none_as_empty_list, safe_model_dump_list, safe_model_dump, safe_list_response, safe_client_call ) # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class ServerConfig(BaseModel): """Server configuration model.""" api_key: str = Field(..., description="Xplainable API key") hostname: str = Field( default="https://platform.xplainable.io", description="Xplainable API hostname" ) org_id: Optional[str] = Field(None, description="Organization ID") team_id: Optional[str] = Field(None, description="Team ID") enable_write_tools: bool = Field( default=False, description="Enable write operations (deploy, activate, etc.)" ) rate_limit_enabled: bool = Field( default=True, description="Enable rate limiting" ) def load_config() -> ServerConfig: """Load configuration from environment variables.""" api_key = os.environ.get("XPLAINABLE_API_KEY") if not api_key: logger.error("XPLAINABLE_API_KEY environment variable not set") sys.exit(1) return ServerConfig( api_key=api_key, hostname=os.getenv("XPLAINABLE_HOST", "https://platform.xplainable.io"), org_id=os.getenv("XPLAINABLE_ORG_ID"), team_id=os.getenv("XPLAINABLE_TEAM_ID"), enable_write_tools=os.getenv("ENABLE_WRITE_TOOLS", "false").lower() == "true", rate_limit_enabled=os.getenv("RATE_LIMIT_ENABLED", "true").lower() == "true", ) # Initialize configuration config = load_config() # Import the shared MCP instance from .mcp_instance import mcp # Import get_client from client_manager from .client_manager import get_client # Import all modular tools - they self-register with @mcp.tool() decorator from . import tools # ============================================================================ # DISCOVERY/METADATA TOOLS # ============================================================================ def categorize_tool(tool_name: str, tool_func) -> str: """ Automatically categorize a tool based on its name and function. Args: tool_name: Name of the tool tool_func: FunctionTool object from FastMCP registry Returns: Category string: 'discovery', 'read', 'write', or 'admin' """ # Discovery tools if tool_name in ['list_tools']: return 'discovery' # Write operations (only enabled when config allows) write_patterns = [ 'generate', 'create', 'activate', 'deactivate', 'deploy', 'delete', 'update', 'modify', 'set', 'enable', 'disable', 'gpt_' ] if any(pattern in tool_name.lower() for pattern in write_patterns): return 'write' if config.enable_write_tools else 'disabled' # Admin tools (if any) admin_patterns = ['admin', 'config', 'manage_users'] if any(pattern in tool_name.lower() for pattern in admin_patterns): return 'admin' # Default to read operations return 'read' def extract_tool_info(tool_name: str, tool_func) -> Dict[str, Any]: """ Extract tool information from the function signature and docstring. Args: tool_name: Name of the tool tool_func: FunctionTool object from FastMCP registry Returns: Dictionary with tool information """ import inspect # Handle FastMCP FunctionTool objects actual_func = tool_func if hasattr(tool_func, 'func'): actual_func = tool_func.func elif hasattr(tool_func, '_func'): actual_func = tool_func._func elif hasattr(tool_func, '__call__') and not inspect.isfunction(tool_func): # Try to get the underlying function from callable objects if hasattr(tool_func, '__func__'): actual_func = tool_func.__func__ try: # Get function signature sig = inspect.signature(actual_func) parameters = [] for param_name, param in sig.parameters.items(): param_info = { "name": param_name, "type": str(param.annotation) if param.annotation != inspect.Parameter.empty else "Any", "required": param.default == inspect.Parameter.empty, "default": str(param.default) if param.default != inspect.Parameter.empty else None, "description": f"Parameter {param_name}" } parameters.append(param_info) # Extract description from docstring doc = inspect.getdoc(actual_func) or f"Tool: {tool_name}" description = doc.split('\n')[0].strip() if doc else f"Tool: {tool_name}" except Exception as e: logger.warning(f"Could not extract signature for {tool_name}: {e}") parameters = [] description = f"Tool: {tool_name}" return { "name": tool_name, "description": description, "parameters": parameters } def _discover_available_tools() -> List[Dict[str, Any]]: """ Truly dynamically discover available tools by introspecting xplainable-client classes. This directly introspects client class methods and extracts their signatures and docstrings. Returns: List of tool dictionaries with name, description, category, parameters """ try: import inspect logger.info("Starting true dynamic tool discovery via class introspection") # Import client classes directly (no instantiation needed) from xplainable_client.client.models import ModelsClient from xplainable_client.client.deployments import DeploymentsClient from xplainable_client.client.preprocessing import PreprocessingClient # TODO: Add other clients like GPTClient, CollectionsClient when needed available_tools = [] # Add utility tools that don't correspond to client methods available_tools.extend([ { "name": "list_tools", "description": "List all available MCP tools and their descriptions", "category": "discovery", "parameters": [] }, { "name": "get_connection_info", "description": "Return connection and user info for diagnostics", "category": "read", "parameters": [] }, { "name": "misc_get_version_info", "description": "Return client/server version info", "category": "read", "parameters": [] } ]) # Map client classes to their module names client_modules = [ ("models", ModelsClient), ("deployments", DeploymentsClient), ("preprocessing", PreprocessingClient) ] # Introspect each client class for module_name, client_class in client_modules: logger.info(f"Introspecting {module_name} client...") for name, method in inspect.getmembers(client_class, predicate=inspect.isfunction): # Skip private methods and HTTP convenience methods if name.startswith('_') or name.lower() in ['get', 'post', 'put', 'patch', 'delete']: continue # Skip methods that require complex objects (DataFrames, pipelines, etc.) sig = inspect.signature(method) skip_method = False for param_name, param in sig.parameters.items(): if param_name == 'self': continue # Skip methods that take complex types as parameters if param.annotation and hasattr(param.annotation, '__module__'): param_module = getattr(param.annotation, '__module__', '') if any(mod in param_module for mod in ['pandas', 'xplainable.preprocessing']): skip_method = True break if skip_method: logger.debug(f"Skipping {name} - requires complex parameters") continue try: # Extract method info doc = inspect.getdoc(method) or '' description = doc.split('\n')[0] if doc else f"{name.replace('_', ' ').title()}" # Determine category based on method name write_keywords = ["create", "add", "update", "delete", "deploy", "activate", "deactivate", "generate", "revoke"] is_write = any(keyword in name.lower() for keyword in write_keywords) category = "write" if is_write else "read" # Extract parameters parameters = [] for param_name, param in sig.parameters.items(): if param_name == 'self': continue # Convert type annotation to string param_type = "str" # default if param.annotation != param.empty: type_str = str(param.annotation) if 'int' in type_str.lower(): param_type = "int" elif 'bool' in type_str.lower(): param_type = "bool" elif 'optional' in type_str.lower() or 'union' in type_str.lower(): param_type = "Optional[str]" param_info = { "name": param_name, "type": param_type, "required": param.default == param.empty, "description": f"Parameter {param_name}" } if param.default != param.empty: param_info["default"] = param.default parameters.append(param_info) # Create tool definition tool = { "name": name, # Use actual method name "description": description, "category": category, "parameters": parameters } available_tools.append(tool) logger.debug(f"Added tool: {name} ({category})") except Exception as e: logger.warning(f"Error processing method {module_name}.{name}: {e}") continue logger.info(f"Dynamic tool discovery completed: found {len(available_tools)} tools") return available_tools except Exception as e: logger.error(f"Dynamic tool discovery failed: {e}") # Clean fallback - just return the basic tools we know work logger.info("Using minimal fallback tool list") return [ {"name": "list_tools", "description": "List all available MCP tools", "category": "discovery", "parameters": []}, {"name": "get_connection_info", "description": "Get connection information", "category": "read", "parameters": []}, {"name": "list_team_models", "description": "List team models", "category": "read", "parameters": []}, {"name": "get_model", "description": "Get model details", "category": "read", "parameters": [{"name": "model_id", "type": "str", "required": True}]}, {"name": "list_deployments", "description": "List deployments", "category": "read", "parameters": []}, {"name": "misc_get_version_info", "description": "Get version info", "category": "read", "parameters": []} ] @mcp.tool() def list_tools() -> Dict[str, Any]: """ List all available MCP tools and their descriptions. Returns: Dictionary containing tool information organized by category """ try: # Use modular tool discovery system from .tool_discovery import get_modular_tools_registry discovery = get_modular_tools_registry() available_tools = discovery.get_tools_by_category() # Filter tools based on configuration tools_dict = {"discovery": [], "read": [], "write": [], "admin": [], "inference": [], "analysis": []} for category, tools in available_tools.items(): for tool in tools: # Skip write tools if not enabled if category == "write" and not config.enable_write_tools: continue # Convert ToolInfo to dict format expected by rest of function tool_dict = { "name": tool.name, "description": tool.description, "category": tool.category, "module": tool.module, "parameters": tool.parameters, "enabled": tool.enabled } if category not in tools_dict: tools_dict[category] = [] tools_dict[category].append(tool_dict) # Remove empty categories tools_dict = {k: v for k, v in tools_dict.items() if v} # Calculate summary summary = { "discovery_tools": len(tools_dict.get("discovery", [])), "read_tools": len(tools_dict.get("read", [])), "write_tools": len(tools_dict.get("write", [])), "admin_tools": len(tools_dict.get("admin", [])), "inference_tools": len(tools_dict.get("inference", [])), "analysis_tools": len(tools_dict.get("analysis", [])), "write_tools_enabled": config.enable_write_tools } total_tools = sum(summary[k] for k in summary if k != 'write_tools_enabled') result = { "server_version": "0.1.0", "total_tools": total_tools, "enabled_tools": total_tools, "categories": tools_dict, "summary": summary } logger.info(f"Listed {total_tools} available tools") return result except Exception as e: logger.error(f"Error listing tools: {e}") # Fallback to basic info if introspection fails return { "server_version": "0.1.0", "total_tools": 0, "enabled_tools": 0, "categories": {"error": [{"name": "list_tools", "description": f"Error: {str(e)}", "category": "error"}]}, "summary": {"error": str(e)} } # ============================================================================ # READ-ONLY TOOLS (LEGACY - NOW REPLACED BY MODULAR TOOLS) # ============================================================================ # OLD HARDCODED TOOLS - REPLACED BY MODULAR AUTO-GENERATED TOOLS # These tools are now available in the modular tools/ directory # @mcp.tool() # def get_connection_info() -> Dict[str, Any]: # \"\"\" # Return connection and user info for diagnostics (no secrets). # # Returns: # Dictionary containing connection information # \"\"\" # try: # client = get_client() # info = client.connection_info # # Remove sensitive information # safe_info = { # "hostname": info.get("hostname"), # "username": info.get("username"), # "api_key_expires": info.get("api_key_expires"), # "xplainable_version": info.get("xplainable_version"), # "python_version": info.get("python_version"), # "org_id": info.get("org_id"), # "team_id": info.get("team_id"), # } # logger.info(f"Connection info retrieved for user: {safe_info.get('username')}") # return safe_info # except Exception as e: # logger.error(f"Error getting connection info: {e}") # raise # @mcp.tool() # def list_team_models(team_id_override: Optional[str] = None) -> List[Dict[str, Any]]: # \"\"\" # List models for the current team or a provided team_id. # # Args: # team_id_override: Optional team ID to override the default # # Returns: # List of model information dictionaries # \"\"\" # try: # client = get_client() # # Only pass team_id if it's provided # if team_id_override: # models = safe_client_call( # client.models.list_team_models, # "list_team_models", # team_id_override=team_id_override # ) # else: # models = safe_client_call( # client.models.list_team_models, # "list_team_models" # ) # result = safe_model_dump_list(models, "list_team_models") # logger.info(f"Listed {len(result)} models for team: {team_id_override or config.team_id}") # return result # except Exception as e: # logger.error(f"Error listing team models: {e}") # raise # ALL OLD TOOLS BELOW ARE COMMENTED OUT - REPLACED BY MODULAR AUTO-GENERATED TOOLS # @mcp.tool() # def get_model(model_id: str) -> Dict[str, Any]: # \"\"\" # Get detailed information about a model by id. # # Args: # model_id: The model ID to retrieve # # Returns: # Dictionary containing model information # \"\"\" # try: # client = get_client() # info = client.models.get_model(model_id) # result = safe_model_dump(info, "get_model") # if result is None: # raise ValueError(f"Model {model_id} not found") # logger.info(f"Retrieved model info for: {model_id}") # return result # except Exception as e: # logger.error(f"Error getting model {model_id}: {e}") # raise # @mcp.tool() # def list_model_versions(model_id: str) -> List[Dict[str, Any]]: # \"\"\" # List all versions for a model. # # Args: # model_id: The model ID to list versions for # # Returns: # List of version information dictionaries # \"\"\" # try: # client = get_client() # versions = client.models.list_model_versions(model_id) # result = safe_model_dump_list(versions, "list_model_versions") # logger.info(f"Listed {len(result)} versions for model: {model_id}") # return result # except Exception as e: # logger.error(f"Error listing model versions for {model_id}: {e}") # raise # @mcp.tool() # def list_deployments(team_id_override: Optional[str] = None) -> List[Dict[str, Any]]: # \"\"\" # List deployments for the team. # # Args: # team_id_override: Optional team ID to override the default # # Returns: # List of deployment information dictionaries # \"\"\" # try: # client = get_client() # # Only pass team_id if it's provided # if team_id_override: # deployments = client.deployments.list_deployments(team_id=team_id_override) # else: # deployments = client.deployments.list_deployments() # result = safe_model_dump_list(deployments, "list_deployments") # logger.info(f"Listed {len(result)} deployments for team: {team_id_override or config.team_id}") # return result # except Exception as e: # logger.error(f"Error listing deployments: {e}") # raise # @mcp.tool() # def get_active_team_deploy_keys_count(team_id_override: Optional[str] = None) -> int: # \"\"\" # Return the count of active deploy keys for a team. # # Args: # team_id_override: Optional team ID to override the default # # Returns: # Count of active deploy keys # \"\"\" # try: # client = get_client() # # Only pass team_id if it's provided # if team_id_override: # count = client.deployments.get_active_team_deploy_keys_count(team_id=team_id_override) # else: # count = client.deployments.get_active_team_deploy_keys_count() # logger.info(f"Active deploy keys count for team {team_id_override or config.team_id}: {count}") # return count # except Exception as e: # logger.error(f"Error getting deploy keys count: {e}") # raise # @mcp.tool() # def list_preprocessors(team_id_override: Optional[str] = None) -> List[Dict[str, Any]]: # \"\"\" # List preprocessors for the team. # # Args: # team_id_override: Optional team ID to override the default # # Returns: # List of preprocessor information dictionaries # \"\"\" # try: # client = get_client() # # Only pass team_id if it's provided # if team_id_override: # results = client.preprocessing.list_preprocessors(team_id=team_id_override) # else: # results = client.preprocessing.list_preprocessors() # result = safe_model_dump_list(results, "list_preprocessors") # logger.info(f"Listed {len(result)} preprocessors for team: {team_id_override or config.team_id}") # return result # except Exception as e: # logger.error(f"Error listing preprocessors: {e}") # raise # @mcp.tool() # def get_preprocessor(preprocessor_id: str) -> Dict[str, Any]: # \"\"\" # Get a preprocessor by id. # # Args: # preprocessor_id: The preprocessor ID to retrieve # # Returns: # Dictionary containing preprocessor information # \"\"\" # try: # client = get_client() # info = client.preprocessing.get_preprocessor(preprocessor_id) # result = safe_model_dump(info, "get_preprocessor") # if result is None: # raise ValueError(f"Preprocessor {preprocessor_id} not found") # logger.info(f"Retrieved preprocessor: {preprocessor_id}") # return result # except Exception as e: # logger.error(f"Error getting preprocessor {preprocessor_id}: {e}") # raise # @mcp.tool() # def get_collection_scenarios(collection_id: str) -> List[Dict[str, Any]]: # \"\"\" # List scenarios for a collection. # # Args: # collection_id: The collection ID to list scenarios for # # Returns: # List of scenario dictionaries # \"\"\" # try: # client = get_client() # scenarios = client.collections.get_collection_scenarios(collection_id) # # This endpoint likely returns plain dicts, so use safe_list_response # result = safe_list_response(scenarios, "get_collection_scenarios") # logger.info(f"Listed {len(result)} scenarios for collection: {collection_id}") # return result # except Exception as e: # logger.error(f"Error getting collection scenarios for {collection_id}: {e}") # raise # @mcp.tool() # def misc_get_version_info() -> Dict[str, Any]: # \"\"\" # Return Xplainable client/server version info. # # Returns: # Dictionary containing version information # \"\"\" # try: # client = get_client() # info = client.misc.get_version_info() # logger.info("Retrieved version information") # return info.model_dump() # except Exception as e: # logger.error(f"Error getting version info: {e}") # raise # ============================================================================ # WRITE TOOLS (RESTRICTED) - LEGACY, NOW REPLACED BY MODULAR TOOLS # ============================================================================ # OLD WRITE TOOLS REMOVED - REPLACED BY MODULAR AUTO-GENERATED TOOLS # All write tools are now available in the modular tools/ directory files def main(): """Main entry point for the server.""" try: # Log startup information logger.info("Starting Xplainable MCP Server") logger.info(f"Write tools enabled: {config.enable_write_tools}") logger.info(f"Rate limiting enabled: {config.rate_limit_enabled}") # Don't initialize client at startup - let it happen lazily when tools are called # This prevents the server from crashing if API key is invalid # Run the MCP server mcp.run() except KeyboardInterrupt: logger.info("Server shutdown requested") sys.exit(0) except Exception as e: logger.error(f"Server error: {e}") sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/xplainable/xplainable-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server