Skip to main content
Glama
meta_api_tool.py44.8 kB
"""API Meta-Tool for dynamically exposing FastAPI endpoints via MCP. This module provides a tool for automatically discovering and integrating FastAPI-compatible REST APIs into the MCP server by pointing it at the FastAPI server's OpenAPI specification (e.g., /openapi.json). Usage Examples: 1. Register an API: ```python result = await client.tools.register_api( api_name="petstore", openapi_url="https://petstore.swagger.io/v2/swagger.json" ) print(f"Registered {result['tools_count']} tools for the Petstore API") ``` 2. List all registered APIs: ```python apis = await client.tools.list_registered_apis() for api_name, api_info in apis["apis"].items(): print(f"{api_name}: {api_info['tools_count']} tools") ``` 3. Call a dynamically registered tool: ```python # Get a pet by ID pet = await client.tools.call_dynamic_tool( tool_name="petstore_getPetById", inputs={"petId": 123} ) print(f"Pet name: {pet['name']}") # Add a new pet new_pet = await client.tools.call_dynamic_tool( tool_name="petstore_addPet", inputs={ "body": { "id": 0, "name": "Fluffy", "status": "available" } } ) print(f"Added pet with ID: {new_pet['id']}") ``` 4. Unregister an API: ```python result = await client.tools.unregister_api(api_name="petstore") print(f"Unregistered {result['tools_count']} tools") ``` """ import asyncio import json import re import time from typing import Any, Dict, List, Optional from urllib.parse import urlparse import httpx from ultimate_mcp_server.exceptions import ToolError, ToolInputError from ultimate_mcp_server.services.cache import with_cache from ultimate_mcp_server.tools.base import ( with_error_handling, with_state_management, with_tool_metrics, ) from ultimate_mcp_server.utils import get_logger logger = get_logger("ultimate_mcp_server.tools.meta_api") async def fetch_openapi_spec( url: str, timeout: float = 30.0, headers: Optional[Dict[str, str]] = None ) -> Dict[str, Any]: """Fetches the OpenAPI spec from the given URL. Args: url: URL of the OpenAPI spec (typically ending in /openapi.json) timeout: Timeout for the HTTP request in seconds headers: Optional headers to include in the request (e.g., for authentication) Returns: Parsed OpenAPI spec as a dictionary Raises: ToolError: If the fetch or parsing fails """ try: async with httpx.AsyncClient() as client: response = await client.get(url, timeout=timeout, headers=headers) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: raise ToolError( f"Failed to fetch OpenAPI spec: HTTP {e.response.status_code}", details={"url": url, "status_code": e.response.status_code}, ) from e except httpx.RequestError as e: raise ToolError( f"Failed to fetch OpenAPI spec: {str(e)}", details={"url": url, "error": str(e)} ) from e except json.JSONDecodeError as e: raise ToolError( f"Failed to parse OpenAPI spec as JSON: {str(e)}", details={"url": url, "error": str(e)} ) from e def extract_endpoint_info(openapi_spec: Dict[str, Any]) -> List[Dict[str, Any]]: """Extracts endpoint information from an OpenAPI spec. Args: openapi_spec: Parsed OpenAPI spec as a dictionary Returns: List of dictionaries containing endpoint information, each with keys: - path: The endpoint path - method: The HTTP method (GET, POST, etc.) - operation_id: The operationId from the spec (used as tool name) - parameters: List of parameter objects - request_body: Request body schema (if applicable) - responses: Response schemas - summary: Endpoint summary - description: Endpoint description """ endpoints = [] paths = openapi_spec.get("paths", {}) for path, path_item in paths.items(): for method, operation in path_item.items(): if method.lower() not in ["get", "post", "put", "delete", "patch"]: continue # Skip non-HTTP methods like "parameters" # Extract operation ID (fall back to generating one if not provided) operation_id = operation.get("operationId") if not operation_id: # Generate operation ID from path and method path_parts = [p for p in path.split("/") if p and not p.startswith("{")] if path_parts: operation_id = f"{method.lower()}_{path_parts[-1]}" else: operation_id = f"{method.lower()}_root" # Ensure operation_id is a valid Python identifier operation_id = re.sub(r"[^a-zA-Z0-9_]", "_", operation_id) if operation_id[0].isdigit(): operation_id = f"op_{operation_id}" # Extract parameters parameters = [] # Include parameters from the path item if "parameters" in path_item: parameters.extend(path_item["parameters"]) # Include parameters from the operation (overriding path item parameters if same name) if "parameters" in operation: # Remove any path item parameters with the same name path_param_names = { p["name"] for p in path_item.get("parameters", []) if "name" in p } op_params = [] for p in operation["parameters"]: if p.get("name") in path_param_names: # This parameter overrides a path item parameter parameters = [ param for param in parameters if param.get("name") != p.get("name") ] op_params.append(p) parameters.extend(op_params) # Extract request body schema request_body = None if "requestBody" in operation: request_body = operation["requestBody"] # Extract response schemas responses = operation.get("responses", {}) endpoints.append( { "path": path, "method": method.lower(), "operation_id": operation_id, "parameters": parameters, "request_body": request_body, "responses": responses, "summary": operation.get("summary", ""), "description": operation.get("description", ""), "tags": operation.get("tags", []), } ) return endpoints def generate_tool_function_code( endpoint_info: Dict[str, Any], base_url: str, api_name: str, cache_ttl: Optional[int] = None, auth_header: Optional[str] = None, ) -> str: """Generates Python code for a tool function based on endpoint info. Args: endpoint_info: Dictionary containing endpoint information base_url: Base URL of the API api_name: Name of the API (used for function documentation) cache_ttl: Optional TTL for caching tool results in seconds auth_header: Optional authentication header name to include in requests Returns: String containing Python code for the tool function """ operation_id = endpoint_info["operation_id"] path = endpoint_info["path"] method = endpoint_info["method"] summary = endpoint_info["summary"] description = endpoint_info["description"] tags = ", ".join(endpoint_info.get("tags", [])) # Generate a clean function name (no API prefix, will be added during registration) function_name = operation_id # Generate docstring docstring = ( f'"""{summary}\n\n' if summary else f'"""Calls the {method.upper()} {path} endpoint of the {api_name} API.\n\n' ) if description: docstring += f"{description}\n\n" if tags: docstring += f"Tags: {tags}\n\n" docstring += "Args:\n" # Generate function parameters params = [] path_params = [] query_params = [] header_params = [] body_param = None for param in endpoint_info.get("parameters", []): param_name = param["name"] # Clean the parameter name to be a valid Python identifier clean_param_name = re.sub(r"[^a-zA-Z0-9_]", "_", param_name) if clean_param_name[0].isdigit(): clean_param_name = f"p_{clean_param_name}" param_type = param.get("schema", {}).get("type", "string") required = param.get("required", False) param_in = param.get("in", "query") param_description = param.get("description", "") python_type = "str" if param_type == "integer": python_type = "int" elif param_type == "number": python_type = "float" elif param_type == "boolean": python_type = "bool" elif param_type == "array": python_type = "List[Any]" elif param_type == "object": python_type = "Dict[str, Any]" if required: params.append(f"{clean_param_name}: {python_type}") docstring += f" {clean_param_name}: {param_description} (in: {param_in})\n" else: params.append(f"{clean_param_name}: Optional[{python_type}] = None") docstring += ( f" {clean_param_name}: (Optional) {param_description} (in: {param_in})\n" ) # Store parameter location for request building if param_in == "path": path_params.append((param_name, clean_param_name)) elif param_in == "query": query_params.append((param_name, clean_param_name)) elif ( param_in == "header" and param_name.lower() != auth_header.lower() if auth_header else True ): header_params.append((param_name, clean_param_name)) # Handle request body if endpoint_info.get("request_body"): content = endpoint_info["request_body"].get("content", {}) if "application/json" in content: body_param = "body" schema_desc = "Request body" # Try to get schema description from the content schema schema = content.get("application/json", {}).get("schema", {}) if "description" in schema: schema_desc = schema["description"] params.append("body: Dict[str, Any]") docstring += f" body: {schema_desc}\n" # Add timeout and auth_token params if needed params.append("timeout: float = 30.0") docstring += " timeout: Timeout for the HTTP request in seconds\n" if auth_header: params.append("auth_token: Optional[str] = None") docstring += f" auth_token: Optional authentication token to include in the '{auth_header}' header\n" docstring += '\n Returns:\n API response data as a dictionary\n """' # Generate function body function_body = [] function_body.append(" async with httpx.AsyncClient() as client:") # Format URL with path params if path_params: # For path params, replace {param} with {clean_param_name} url_format = path for param_name, clean_name in path_params: url_format = url_format.replace(f"{{{param_name}}}", f"{{{clean_name}}}") function_body.append(f' url = f"{base_url}{url_format}"') else: function_body.append(f' url = "{base_url}{path}"') # Prepare query params if query_params: function_body.append(" params = {}") for param_name, clean_name in query_params: function_body.append(f" if {clean_name} is not None:") function_body.append(f' params["{param_name}"] = {clean_name}') else: function_body.append(" params = None") # Prepare headers function_body.append(" headers = {}") if auth_header: function_body.append(" if auth_token is not None:") function_body.append(f' headers["{auth_header}"] = auth_token') if header_params: for param_name, clean_name in header_params: function_body.append(f" if {clean_name} is not None:") function_body.append(f' headers["{param_name}"] = {clean_name}') # Prepare request request_args = ["url"] if query_params: request_args.append("params=params") if header_params or auth_header: request_args.append("headers=headers") if body_param: request_args.append(f"json={body_param}") request_args.append("timeout=timeout") function_body.append(" try:") function_body.append(" response = await client.{method}({', '.join(request_args)})") function_body.append(" response.raise_for_status()") function_body.append( " if response.headers.get('content-type', '').startswith('application/json'):" ) function_body.append(" return response.json()") function_body.append(" else:") function_body.append(" return {{'text': response.text}}") function_body.append(" except httpx.HTTPStatusError as e:") function_body.append(" error_detail = e.response.text") function_body.append(" try:") function_body.append(" error_json = e.response.json()") function_body.append(" if isinstance(error_json, dict):") function_body.append(" error_detail = error_json") function_body.append(" except Exception:") function_body.append(" pass # Couldn't parse JSON error") function_body.append(" raise ToolError(") function_body.append(' f"API request failed: HTTP {{e.response.status_code}}",') function_body.append( ' details={{"status_code": e.response.status_code, "response": error_detail}}' ) function_body.append(" )") function_body.append(" except httpx.RequestError as e:") function_body.append(" raise ToolError(") function_body.append(' f"API request failed: {{str(e)}}",') function_body.append(' details={{"error": str(e)}}') function_body.append(" )") # Generate the full function param_str = ", ".join(params) if param_str: param_str = f", {param_str}" # Add decorators based on configuration decorators = ["@with_tool_metrics", "@with_error_handling"] if cache_ttl is not None: decorators.insert(0, f"@with_cache(ttl={cache_ttl})") function_code = [ *decorators, f"async def {function_name}(self{param_str}):", f"{docstring}", *function_body, ] return "\n".join(function_code) # After the generate_tool_function_code function and before register_api_meta_tools @with_tool_metrics @with_error_handling @with_state_management(namespace="meta_api") async def register_api( api_name: str, openapi_url: str, base_url: Optional[str] = None, cache_ttl: Optional[int] = None, auth_header: Optional[str] = None, auth_token: Optional[str] = None, tool_name_prefix: Optional[str] = None, timeout: float = 30.0, ctx: Optional[Dict[str, Any]] = None, get_state=None, set_state=None, delete_state=None ) -> Dict[str, Any]: """Registers an API with the MCP server by fetching its OpenAPI spec. Dynamically generates MCP tools for each endpoint in the API and registers them with the MCP server. The tools are prefixed with the API name by default, resulting in tool names like "api_name_operation_id". Args: api_name: A unique name for the API (used as a prefix for tool names) openapi_url: URL of the OpenAPI spec (typically ending in /openapi.json) base_url: Base URL of the API (if different from the OpenAPI URL) cache_ttl: Optional TTL for caching tool results in seconds auth_header: Optional name of the header to use for authentication auth_token: Optional token to use when fetching the OpenAPI spec tool_name_prefix: Optional prefix for tool names (default: api_name) timeout: Timeout for the HTTP request in seconds ctx: MCP context get_state: Function to get state from store (injected by decorator) set_state: Function to set state in store (injected by decorator) delete_state: Function to delete state from store (injected by decorator) Returns: A dictionary containing the registration results: { "success": true, "api_name": "example_api", "base_url": "https://api.example.com", "tools_registered": ["example_api_get_users", "example_api_create_user", ...], "tools_count": 5, "processing_time": 1.23 } """ # Validate inputs if not api_name: raise ToolInputError("api_name cannot be empty") # Check if API name has invalid characters if not re.match(r"^[a-zA-Z0-9_]+$", api_name): raise ToolInputError( "api_name must contain only alphanumeric characters and underscores" ) if not openapi_url: raise ToolInputError("openapi_url cannot be empty") # Get registered APIs from state store registered_apis = await get_state("registered_apis", {}) generated_tools = await get_state("generated_tools", {}) # Check if API is already registered if api_name in registered_apis: raise ToolInputError( f"API {api_name} is already registered. Use a different name or unregister it first." ) # Set tool name prefix tool_name_prefix = tool_name_prefix or api_name # Determine base URL if not provided if not base_url: # Extract base URL from OpenAPI URL try: parsed_url = urlparse(openapi_url) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" logger.info(f"Using base_url: {base_url} (derived from openapi_url)") except Exception as e: raise ToolInputError(f"Could not determine base_url from openapi_url: {str(e)}") from e # Prepare headers for fetching OpenAPI spec headers = None if auth_token and auth_header: headers = {auth_header: auth_token} # Fetch OpenAPI spec logger.info(f"Fetching OpenAPI spec from {openapi_url}") start_time = time.time() openapi_spec = await fetch_openapi_spec(openapi_url, timeout, headers) # Extract endpoint information endpoints = extract_endpoint_info(openapi_spec) logger.info(f"Extracted {len(endpoints)} endpoints from OpenAPI spec") # Get MCP server from context mcp = ctx.get('mcp') if not mcp: raise ToolError("MCP server context not available") # Generate and register tools for each endpoint registered_tools = [] generated_code = {} for endpoint in endpoints: operation_id = endpoint["operation_id"] tool_name = f"{tool_name_prefix}_{operation_id}" # Skip if this tool is already registered if tool_name in generated_tools: logger.warning(f"Tool {tool_name} already registered, skipping") continue # Generate tool function code tool_code = generate_tool_function_code( endpoint, base_url, api_name, cache_ttl, auth_header ) # Store the generated code for debugging generated_code[tool_name] = tool_code # Create and register the tool function try: # Create a namespace for the exec namespace = {} # Add required imports to the namespace namespace.update( { "httpx": httpx, "ToolError": ToolError, "Dict": Dict, "Any": Any, "Optional": Optional, "List": List, "with_tool_metrics": with_tool_metrics, "with_error_handling": with_error_handling, "with_cache": with_cache, } ) # Execute the generated code exec(tool_code, namespace) # Get the generated function from the namespace generated_func = namespace[operation_id] # Register with MCP server mcp.tool(name=tool_name)(generated_func) # Store the generated tool in state generated_tools[tool_name] = tool_code registered_tools.append(tool_name) logger.info( f"Registered tool {tool_name} for endpoint {endpoint['method'].upper()} {endpoint['path']}" ) except Exception as e: logger.error(f"Failed to register tool {tool_name}: {str(e)}", exc_info=True) if "tool_code" in locals(): logger.error(f"Generated code that failed:\n{tool_code}") # Store API information in state store registered_apis[api_name] = { "base_url": base_url, "openapi_url": openapi_url, "spec": openapi_spec, "tools": registered_tools, "tool_name_prefix": tool_name_prefix, "generated_code": generated_code, "auth_header": auth_header, } # Update state store await set_state("registered_apis", registered_apis) await set_state("generated_tools", generated_tools) processing_time = time.time() - start_time logger.success( f"API {api_name} registered with {len(registered_tools)} tools in {processing_time:.2f}s" ) return { "success": True, "api_name": api_name, "base_url": base_url, "tools_registered": registered_tools, "tools_count": len(registered_tools), "processing_time": processing_time, } @with_tool_metrics @with_error_handling @with_state_management(namespace="meta_api") async def list_registered_apis( ctx: Optional[Dict[str, Any]] = None, get_state=None, set_state=None, delete_state=None ) -> Dict[str, Any]: """Lists all registered APIs and their endpoints. Args: ctx: MCP context get_state: Function to get state from store (injected by decorator) set_state: Function to set state in store (injected by decorator) delete_state: Function to delete state from store (injected by decorator) Returns: A dictionary containing the registered APIs: { "success": true, "apis": { "example_api": { "base_url": "https://api.example.com", "openapi_url": "https://api.example.com/openapi.json", "tools_count": 5, "tools": ["example_api_get_users", "example_api_create_user", ...] }, ... }, "total_apis": 2, "total_tools": 12 } """ # Get state data registered_apis = await get_state("registered_apis", {}) generated_tools = await get_state("generated_tools", {}) result = { "success": True, "apis": {}, "total_apis": len(registered_apis), "total_tools": len(generated_tools), } for api_name, api_info in registered_apis.items(): result["apis"][api_name] = { "base_url": api_info["base_url"], "openapi_url": api_info["openapi_url"], "tools_count": len(api_info["tools"]), "tools": api_info["tools"], "tool_name_prefix": api_info["tool_name_prefix"], } return result @with_tool_metrics @with_error_handling @with_state_management(namespace="meta_api") async def get_api_details( api_name: str, ctx: Optional[Dict[str, Any]] = None, get_state=None, set_state=None, delete_state=None ) -> Dict[str, Any]: """Gets detailed information about a registered API. Args: api_name: The name of the API to get details for ctx: MCP context get_state: Function to get state from store (injected by decorator) set_state: Function to set state in store (injected by decorator) delete_state: Function to delete state from store (injected by decorator) Returns: A dictionary containing the API details: { "success": true, "api_name": "example_api", "base_url": "https://api.example.com", "openapi_url": "https://api.example.com/openapi.json", "tools": [ { "name": "example_api_get_users", "method": "get", "path": "/users", "summary": "Get all users", "description": "Returns a list of all users in the system", "parameters": [...] }, ... ], "endpoints_count": 5 } """ # Get registered APIs from state registered_apis = await get_state("registered_apis", {}) if api_name not in registered_apis: raise ToolInputError(f"API {api_name} not found") api_info = registered_apis[api_name] # Extract endpoint details from the OpenAPI spec endpoints = [] spec = api_info["spec"] for endpoint_info in extract_endpoint_info(spec): tool_name = f"{api_info['tool_name_prefix']}_{endpoint_info['operation_id']}" endpoints.append( { "name": tool_name, "method": endpoint_info["method"], "path": endpoint_info["path"], "summary": endpoint_info["summary"], "description": endpoint_info["description"], "parameters": endpoint_info["parameters"], "tags": endpoint_info.get("tags", []), } ) return { "success": True, "api_name": api_name, "base_url": api_info["base_url"], "openapi_url": api_info["openapi_url"], "tools": endpoints, "endpoints_count": len(endpoints), } @with_tool_metrics @with_error_handling @with_state_management(namespace="meta_api") async def unregister_api( api_name: str, ctx: Optional[Dict[str, Any]] = None, get_state=None, set_state=None, delete_state=None ) -> Dict[str, Any]: """Unregisters an API and all its tools from the MCP server. Args: api_name: The name of the API to unregister ctx: MCP context get_state: Function to get state from store (injected by decorator) set_state: Function to set state in store (injected by decorator) delete_state: Function to delete state from store (injected by decorator) Returns: A dictionary indicating the result: { "success": true, "api_name": "example_api", "tools_unregistered": ["example_api_get_users", "example_api_create_user", ...], "tools_count": 5 } """ # Get state data registered_apis = await get_state("registered_apis", {}) generated_tools = await get_state("generated_tools", {}) if api_name not in registered_apis: raise ToolInputError(f"API {api_name} not found") api_info = registered_apis[api_name] tools = api_info["tools"] # Get MCP server from context mcp = ctx.get('mcp') if not mcp: raise ToolError("MCP server context not available") # Unregister tools from MCP server for tool_name in tools: try: # Check if the MCP server has a method for unregistering tools if hasattr(mcp, "unregister_tool"): mcp.unregister_tool(tool_name) # If not, try to remove from the tools dictionary elif hasattr(mcp, "tools"): if tool_name in mcp.tools: del mcp.tools[tool_name] # Remove from our generated tools dictionary if tool_name in generated_tools: del generated_tools[tool_name] logger.info(f"Unregistered tool {tool_name}") except Exception as e: logger.error(f"Failed to unregister tool {tool_name}: {str(e)}", exc_info=True) # Remove API from registered APIs del registered_apis[api_name] # Update state await set_state("registered_apis", registered_apis) await set_state("generated_tools", generated_tools) logger.success(f"API {api_name} unregistered with {len(tools)} tools") return { "success": True, "api_name": api_name, "tools_unregistered": tools, "tools_count": len(tools), } @with_tool_metrics @with_error_handling @with_state_management(namespace="meta_api") async def call_dynamic_tool( tool_name: str, inputs: Optional[Dict[str, Any]] = None, ctx: Optional[Dict[str, Any]] = None, get_state=None, set_state=None, delete_state=None ) -> Dict[str, Any]: """Calls a dynamically registered tool by name. This is a convenience function for calling tools registered via register_api, allowing direct invocation of API endpoints. Args: tool_name: Name of the tool to call inputs: Inputs to pass to the tool (parameters for the API endpoint) ctx: MCP context get_state: Function to get state from store (injected by decorator) set_state: Function to set state in store (injected by decorator) delete_state: Function to delete state from store (injected by decorator) Returns: The result of the tool call """ # Get MCP server from context mcp = ctx.get('mcp') if not mcp: raise ToolError("MCP server context not available") # Get registered APIs and generated tools from state registered_apis = await get_state("registered_apis", {}) generated_tools = await get_state("generated_tools", {}) if not tool_name: raise ToolInputError("tool_name cannot be empty") # Check if tool exists if tool_name not in generated_tools: valid_tools = list(generated_tools.keys()) raise ToolInputError( f"Tool {tool_name} not found. Valid tools: {', '.join(valid_tools[:10])}..." if len(valid_tools) > 10 else f"Tool {tool_name} not found. Valid tools: {', '.join(valid_tools)}" ) # Initialize inputs if inputs is None: inputs = {} # Find which API this tool belongs to api_name = None for name, info in registered_apis.items(): if tool_name in info["tools"]: api_name = name break if not api_name: logger.warning(f"Could not determine which API {tool_name} belongs to") # Add auth_token to inputs if specified and the API has an auth_header api_info = registered_apis.get(api_name, {}) if api_info.get("auth_header") and "auth_token" in ctx: inputs["auth_token"] = ctx["auth_token"] # Call the tool directly through MCP logger.info(f"Calling dynamic tool {tool_name} with inputs: {inputs}") start_time = time.time() # MCP execute may be different from mcp.call_tool, handle appropriately if hasattr(mcp, "execute"): result = await mcp.execute(tool_name, inputs) else: result = await mcp.call_tool(tool_name, inputs) processing_time = time.time() - start_time # Add metadata to result if isinstance(result, dict): result["processing_time"] = processing_time result["success"] = True else: result = {"data": result, "processing_time": processing_time, "success": True} logger.info(f"Called dynamic tool {tool_name} in {processing_time:.4f}s") return result @with_tool_metrics @with_error_handling @with_state_management(namespace="meta_api") async def refresh_api( api_name: str, update_base_url: Optional[str] = None, timeout: float = 30.0, ctx: Optional[Dict[str, Any]] = None, get_state=None, set_state=None, delete_state=None ) -> Dict[str, Any]: """Refreshes an API by re-fetching its OpenAPI spec and updating tools. This is useful when the API has been updated with new endpoints or modifications to existing endpoints. Args: api_name: The name of the API to refresh update_base_url: Optional new base URL for the API timeout: Timeout for the HTTP request in seconds ctx: MCP context get_state: Function to get state from store (injected by decorator) set_state: Function to set state in store (injected by decorator) delete_state: Function to delete state from store (injected by decorator) Returns: A dictionary indicating the result: { "success": true, "api_name": "example_api", "tools_added": ["example_api_new_endpoint", ...], "tools_updated": ["example_api_modified_endpoint", ...], "tools_removed": ["example_api_deleted_endpoint", ...], "tools_count": 8 } """ # Get registered APIs from state registered_apis = await get_state("registered_apis", {}) if api_name not in registered_apis: raise ToolInputError(f"API {api_name} not found") api_info = registered_apis[api_name] old_tools = set(api_info["tools"]) # Determine if we need to update the base URL base_url = update_base_url or api_info["base_url"] # First, unregister the API await unregister_api(api_name, ctx=ctx, get_state=get_state, set_state=set_state, delete_state=delete_state) # Re-register with the same parameters but potentially updated base URL result = await register_api( api_name=api_name, openapi_url=api_info["openapi_url"], base_url=base_url, auth_header=api_info.get("auth_header"), tool_name_prefix=api_info["tool_name_prefix"], timeout=timeout, ctx=ctx, get_state=get_state, set_state=set_state, delete_state=delete_state ) # Determine which tools were added, updated, or removed new_tools = set(result["tools_registered"]) tools_added = list(new_tools - old_tools) tools_removed = list(old_tools - new_tools) tools_updated = list(new_tools.intersection(old_tools)) logger.success( f"API {api_name} refreshed: " f"{len(tools_added)} added, {len(tools_removed)} removed, {len(tools_updated)} updated" ) return { "success": True, "api_name": api_name, "tools_added": tools_added, "tools_updated": tools_updated, "tools_removed": tools_removed, "tools_count": len(new_tools), } @with_tool_metrics @with_error_handling @with_state_management(namespace="meta_api") async def get_tool_details( tool_name: str, ctx: Optional[Dict[str, Any]] = None, get_state=None, set_state=None, delete_state=None ) -> Dict[str, Any]: """Gets detailed information about a dynamically registered tool. Args: tool_name: Name of the tool to get details for ctx: MCP context get_state: Function to get state from store (injected by decorator) set_state: Function to set state in store (injected by decorator) delete_state: Function to delete state from store (injected by decorator) Returns: A dictionary containing the tool details: { "success": true, "tool_name": "example_api_get_users", "api_name": "example_api", "method": "get", "path": "/users", "summary": "Get all users", "description": "Returns a list of all users in the system", "parameters": [...], "source_code": "..." } """ # Get registered APIs and generated tools from state registered_apis = await get_state("registered_apis", {}) generated_tools = await get_state("generated_tools", {}) if tool_name not in generated_tools: raise ToolInputError(f"Tool {tool_name} not found") # Find which API this tool belongs to api_name = None for name, info in registered_apis.items(): if tool_name in info["tools"]: api_name = name break if not api_name: raise ToolError(f"Could not determine which API {tool_name} belongs to") api_info = registered_apis[api_name] # Find endpoint information in the API's endpoint list endpoint_info = None for endpoint in extract_endpoint_info(api_info["spec"]): if f"{api_info['tool_name_prefix']}_{endpoint['operation_id']}" == tool_name: endpoint_info = endpoint break if not endpoint_info: raise ToolError(f"Could not find endpoint information for tool {tool_name}") # Get the source code source_code = api_info.get("generated_code", {}).get(tool_name, "Source code not available") return { "success": True, "tool_name": tool_name, "api_name": api_name, "method": endpoint_info["method"], "path": endpoint_info["path"], "summary": endpoint_info["summary"], "description": endpoint_info["description"], "parameters": endpoint_info["parameters"], "tags": endpoint_info.get("tags", []), "source_code": source_code, } @with_tool_metrics @with_error_handling @with_state_management(namespace="meta_api") async def list_available_tools( include_source_code: bool = False, ctx: Optional[Dict[str, Any]] = None, get_state=None, set_state=None, delete_state=None ) -> Dict[str, Any]: """Lists all available tools registered via the API Meta-Tool. Args: include_source_code: Whether to include source code in the response ctx: MCP context get_state: Function to get state from store (injected by decorator) set_state: Function to set state in store (injected by decorator) delete_state: Function to delete state from store (injected by decorator) Returns: A dictionary containing the available tools: { "success": true, "tools": [ { "name": "example_api_get_users", "api_name": "example_api", "method": "get", "path": "/users", "summary": "Get all users", "source_code": "..." # Only if include_source_code=True }, ... ], "tools_count": 12 } """ # Get registered APIs from state registered_apis = await get_state("registered_apis", {}) generated_tools = await get_state("generated_tools", {}) tools = [] for api_name, api_info in registered_apis.items(): spec = api_info["spec"] endpoints = extract_endpoint_info(spec) for endpoint in endpoints: tool_name = f"{api_info['tool_name_prefix']}_{endpoint['operation_id']}" if tool_name in generated_tools: tool_info = { "name": tool_name, "api_name": api_name, "method": endpoint["method"], "path": endpoint["path"], "summary": endpoint["summary"], } if include_source_code: tool_info["source_code"] = api_info.get("generated_code", {}).get( tool_name, "Source code not available" ) tools.append(tool_info) return {"success": True, "tools": tools, "tools_count": len(tools)} # Now we have all our stateless functions defined: # register_api, list_registered_apis, get_api_details, unregister_api # call_dynamic_tool, refresh_api, get_tool_details, list_available_tools def register_api_meta_tools(mcp_server): """Registers API Meta-Tool with the MCP server. Args: mcp_server: MCP server instance """ # Register tools with MCP server mcp_server.tool(name="register_api")(register_api) mcp_server.tool(name="list_registered_apis")(list_registered_apis) mcp_server.tool(name="get_api_details")(get_api_details) mcp_server.tool(name="unregister_api")(unregister_api) mcp_server.tool(name="call_dynamic_tool")(call_dynamic_tool) mcp_server.tool(name="refresh_api")(refresh_api) mcp_server.tool(name="get_tool_details")(get_tool_details) mcp_server.tool(name="list_available_tools")(list_available_tools) logger.info("Registered API Meta-Tool functions") return None # No need to return an instance anymore # Example usage if this module is run directly if __name__ == "__main__": import argparse import asyncio from ultimate_mcp_server import create_app async def main(): # Parse command line arguments parser = argparse.ArgumentParser(description="API Meta-Tool for Ultimate MCP Server") parser.add_argument("--register", help="Register an API with the given name") parser.add_argument("--url", help="OpenAPI spec URL") parser.add_argument("--list", action="store_true", help="List registered APIs") parser.add_argument("--details", help="Get details for the given API") parser.add_argument("--unregister", help="Unregister the given API") parser.add_argument("--refresh", help="Refresh the given API") parser.add_argument("--base-url", help="Base URL for API requests") args = parser.parse_args() # Create MCP server create_app() # In FastMCP 2.0+, access the MCP server directly from the Gateway instance # The create_app() should return the gateway instance or we need to get it differently from ultimate_mcp_server.core import _gateway_instance mcp_server = _gateway_instance.mcp if _gateway_instance else None if not mcp_server: raise RuntimeError("Gateway instance not initialized or MCP server not available") # Register API Meta-Tool register_api_meta_tools(mcp_server) # Create context for stateless functions ctx = {"mcp": mcp_server} # Process commands if args.register and args.url: result = await register_api( api_name=args.register, openapi_url=args.url, base_url=args.base_url, ctx=ctx ) print(f"Registered API {args.register} with {result['tools_count']} tools") print(f"Tools: {', '.join(result['tools_registered'])}") elif args.list: result = await list_registered_apis(ctx=ctx) print(f"Registered APIs ({result['total_apis']}):") for api_name, api_info in result["apis"].items(): print( f"- {api_name}: {api_info['tools_count']} tools, Base URL: {api_info['base_url']}" ) elif args.details: result = await get_api_details(args.details, ctx=ctx) print(f"API {args.details} ({result['endpoints_count']} endpoints):") print(f"Base URL: {result['base_url']}") print(f"OpenAPI URL: {result['openapi_url']}") print("Endpoints:") for endpoint in result["tools"]: print(f"- {endpoint['method'].upper()} {endpoint['path']} ({endpoint['name']})") if endpoint["summary"]: print(f" Summary: {endpoint['summary']}") elif args.unregister: result = await unregister_api(args.unregister, ctx=ctx) print(f"Unregistered API {args.unregister} with {result['tools_count']} tools") elif args.refresh: result = await refresh_api( api_name=args.refresh, update_base_url=args.base_url, ctx=ctx ) print( f"Refreshed API {args.refresh}: {len(result['tools_added'])} added, {len(result['tools_removed'])} removed, {len(result['tools_updated'])} updated" ) else: print("No action specified. Use --help for usage information.") # Run the main function asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kappasig920/Ultimate-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server