Skip to main content
Glama
gujord

OpenAPI to Model Context Protocol (MCP)

fastmcp_server.py25.3 kB
# SPDX-License-Identifier: MIT # Copyright (c) 2025 Roger Gujord # https://github.com/gujord/OpenAPI-MCP """ FastMCP-compliant OpenAPI proxy server. Follows FastMCP patterns and best practices. """ __all__ = ["FastMCPOpenAPIServer", "OpenAPITool", "main", "retry_with_backoff"] import json import os import sys import logging import asyncio import random from typing import Dict, Any, List, Optional from dataclasses import dataclass import httpx from fastmcp import FastMCP # Retry configuration DEFAULT_MAX_RETRIES = 3 DEFAULT_RETRY_BASE_DELAY = 1.0 # seconds DEFAULT_RETRY_MAX_DELAY = 30.0 # seconds RETRYABLE_STATUS_CODES = {408, 429, 500, 502, 503, 504} async def retry_with_backoff( func, max_retries: int = DEFAULT_MAX_RETRIES, base_delay: float = DEFAULT_RETRY_BASE_DELAY, max_delay: float = DEFAULT_RETRY_MAX_DELAY, retryable_exceptions: tuple = (httpx.ConnectError, httpx.TimeoutException), logger: Optional[logging.Logger] = None, ): """ Execute an async function with exponential backoff retry logic. Args: func: Async function to execute (should be a coroutine function or awaitable) max_retries: Maximum number of retry attempts base_delay: Initial delay between retries in seconds max_delay: Maximum delay between retries in seconds retryable_exceptions: Tuple of exception types that should trigger a retry logger: Optional logger for retry messages Returns: The result of the function call Raises: The last exception if all retries are exhausted """ last_exception = None for attempt in range(max_retries + 1): try: return await func() except retryable_exceptions as e: last_exception = e if attempt < max_retries: # Exponential backoff with jitter delay = min(base_delay * (2**attempt) + random.uniform(0, 1), max_delay) if logger: logger.warning( f"Request failed (attempt {attempt + 1}/{max_retries + 1}): {e}. " f"Retrying in {delay:.2f}s..." ) await asyncio.sleep(delay) else: if logger: logger.error(f"Request failed after {max_retries + 1} attempts: {e}") raise except httpx.HTTPStatusError as e: # Retry on certain status codes (e.g., 429, 5xx) if e.response.status_code in RETRYABLE_STATUS_CODES: last_exception = e if attempt < max_retries: # Check for Retry-After header retry_after = e.response.headers.get("Retry-After") if retry_after: try: delay = float(retry_after) except ValueError: delay = base_delay * (2**attempt) else: delay = min(base_delay * (2**attempt) + random.uniform(0, 1), max_delay) if logger: logger.warning( f"Request failed with status {e.response.status_code} " f"(attempt {attempt + 1}/{max_retries + 1}). Retrying in {delay:.2f}s..." ) await asyncio.sleep(delay) else: if logger: logger.error( f"Request failed with status {e.response.status_code} " f"after {max_retries + 1} attempts" ) raise else: # Non-retryable status code raise # Should not reach here, but just in case if last_exception: raise last_exception try: from .config import ServerConfig from .auth import AuthenticationManager from .openapi_loader import OpenAPILoader, OpenAPIParser from .request_handler import RequestHandler from .schema_converter import SchemaConverter, NameSanitizer from .exceptions import * except ImportError: from config import ServerConfig from auth import AuthenticationManager from openapi_loader import OpenAPILoader, OpenAPIParser from request_handler import RequestHandler from schema_converter import SchemaConverter, NameSanitizer from exceptions import * @dataclass class OpenAPITool: """Represents an OpenAPI operation as an MCP tool.""" operation_id: str method: str path: str summary: str description: str parameters: List[Dict[str, Any]] server_url: str class FastMCPOpenAPIServer: """FastMCP-based OpenAPI proxy server following best practices.""" def __init__(self, config: ServerConfig): self.config = config self.mcp = FastMCP(config.server_name) # Core components self.authenticator = AuthenticationManager(config) self.request_handler = RequestHandler(self.authenticator) # Server state self.openapi_spec: Dict[str, Any] = {} self.operations: List[OpenAPITool] = [] self.server_url: str = "" self.api_info: Dict[str, Any] = {} # Initialize logging with debug level based on config log_level = logging.DEBUG if config.debug else logging.INFO log_format = ( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" if config.debug else "%(levelname)s - %(message)s" ) logging.basicConfig(level=log_level, format=log_format) self.logger = logging.getLogger(__name__) self.logger.setLevel(log_level) if config.debug: self.logger.debug("Debug mode enabled - verbose logging active") async def initialize(self): """Initialize the server with OpenAPI spec and register tools.""" try: self.logger.debug(f"Loading OpenAPI spec from: {self.config.openapi_url}") # Get custom headers for loading the spec auth_headers = self.authenticator.get_custom_headers() if self.authenticator else None if auth_headers and self.config.debug: self.logger.debug(f"Using {len(auth_headers)} custom auth headers") # Load OpenAPI specification with auth headers self.openapi_spec = OpenAPILoader.load_spec(self.config.openapi_url, auth_headers) self.server_url = OpenAPILoader.extract_server_url(self.openapi_spec, self.config.openapi_url) self.logger.debug(f"Server URL: {self.server_url}") # Parse operations into tools parser = OpenAPIParser(NameSanitizer.sanitize_name) operations_info = parser.parse_operations(self.openapi_spec) self.logger.debug(f"Found {len(operations_info)} operations in spec") # Extract API info self.api_info = self.openapi_spec.get("info", {}) api_title = self.api_info.get("title", "API") # Create tool objects for op_id, info in operations_info.items(): tool = OpenAPITool( operation_id=op_id, method=info["method"], path=info["path"], summary=info.get("summary", ""), description=info.get("description", ""), parameters=info.get("parameters", []), server_url=self.server_url, ) self.operations.append(tool) # Register all tools using FastMCP decorators self._register_tools() # Register resources self._register_resources() # Register prompts self._register_prompts() self.logger.info(f"Initialized OpenAPI proxy for {api_title}") self.logger.info(f"Registered {len(self.operations)} API operations as tools") except (OpenAPIError, FileNotFoundError) as e: self.logger.error(f"Failed to initialize server: {e}") raise except (httpx.RequestError, ValueError, KeyError) as e: self.logger.error(f"Failed to initialize server: {e}") raise RuntimeError(f"Server initialization failed: {e}") from e def _register_tools(self): """Register OpenAPI operations as FastMCP tools using decorators.""" # Register each OpenAPI operation as a tool for tool in self.operations: self._register_single_tool(tool) # Register server management tools self._register_management_tools() def _create_tool_function(self, tool: OpenAPITool): """Create a tool function for testing purposes.""" return self._make_generic_tool_function(tool) def _register_single_tool(self, tool: OpenAPITool): """Register a single OpenAPI operation as an MCP tool.""" # Create generic tool function generic_tool_function = self._make_generic_tool_function(tool) # Store function for testing tool._function = generic_tool_function # Register the tool with FastMCP using the decorator pattern tool_name = f"{self.config.server_name}_{tool.operation_id}" tool_description = f"[{self.config.server_name}] {tool.summary or tool.description}" # Use the tool decorator to register with name and description self.mcp.tool(name=tool_name, description=tool_description)(generic_tool_function) def _make_generic_tool_function(self, tool: OpenAPITool): """Create generic tool function for a specific tool.""" async def generic_tool_function( dry_run: bool = False, req_id: Optional[str] = None, # Common OpenAPI parameters id: Optional[str] = None, status: Optional[str] = None, tags: Optional[str] = None, name: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, q: Optional[str] = None, query: Optional[str] = None, # Additional common parameters page: Optional[int] = None, size: Optional[int] = None, sort: Optional[str] = None, filter: Optional[str] = None, # Weather API specific lat: Optional[float] = None, lon: Optional[float] = None, altitude: Optional[int] = None, ) -> Dict[str, Any]: """Generic tool function for OpenAPI operation.""" try: # Build kwargs from function parameters import inspect frame = inspect.currentframe() args = inspect.getargvalues(frame) kwargs = { k: v for k, v in args.locals.items() if k != "self" and v is not None and k not in ["frame", "args"] } if req_id is None: req_id = f"{tool.operation_id}_{int(asyncio.get_event_loop().time())}" # Handle dry run if dry_run: request_data, error = self.request_handler.prepare_request( req_id, kwargs, tool.parameters, tool.path, tool.server_url, tool.operation_id ) if error: return error full_url, req_params, req_headers, req_body, _ = request_data return { "jsonrpc": "2.0", "id": req_id, "result": { "dry_run": True, "request": { "method": tool.method, "url": full_url, "params": req_params, "headers": req_headers, "body": req_body, }, }, } # Execute real request request_data, error = self.request_handler.prepare_request( req_id, kwargs, tool.parameters, tool.path, tool.server_url, tool.operation_id ) if error: return error full_url, req_params, req_headers, req_body, _ = request_data # Make HTTP request with retry logic retry_config = self.config.get_http_retry_config() # Log request details (hide sensitive headers in non-debug mode) if self.config.debug: safe_headers = {k: v for k, v in req_headers.items() if k.lower() not in ("authorization", "x-api-key")} self.logger.info( f"REQUEST: {tool.method.upper()} {full_url}\n" f" Headers: {safe_headers}\n" f" Params: {req_params}\n" f" Body: {req_body}" ) else: self.logger.debug( f"Executing {tool.method.upper()} {full_url} " f"(timeout={retry_config['timeout']}s, max_retries={retry_config['max_retries']})" ) async with httpx.AsyncClient(timeout=retry_config["timeout"]) as client: async def make_request(): response = await client.request( method=tool.method, url=full_url, headers=req_headers, params=req_params, json=req_body ) response.raise_for_status() return response response = await retry_with_backoff( make_request, max_retries=retry_config["max_retries"], base_delay=retry_config["base_delay"], max_delay=retry_config["max_delay"], logger=self.logger, ) # Handle response try: response_data = response.json() except (ValueError, json.JSONDecodeError): response_data = response.text # Log response details if self.config.debug: response_preview = str(response_data)[:500] if response_data else "(empty)" self.logger.info( f"RESPONSE: {response.status_code} ({len(response.content)} bytes)\n" f" Headers: {dict(response.headers)}\n" f" Body: {response_preview}..." ) else: self.logger.debug(f"Response: {response.status_code} ({len(response.content)} bytes)") return { "jsonrpc": "2.0", "id": req_id, "result": { "status_code": response.status_code, "headers": dict(response.headers), "data": response_data, }, } except httpx.HTTPStatusError as e: self.logger.error(f"Tool {tool.operation_id} HTTP error: {e.response.status_code}") return { "jsonrpc": "2.0", "id": req_id or "unknown", "error": {"code": -32603, "message": f"HTTP error {e.response.status_code}: {e.response.text[:200]}"}, } except httpx.RequestError as e: self.logger.error(f"Tool {tool.operation_id} request error: {e}") return { "jsonrpc": "2.0", "id": req_id or "unknown", "error": {"code": -32603, "message": f"Request failed: {str(e)}"}, } except (ValueError, KeyError, TypeError) as e: self.logger.error(f"Tool {tool.operation_id} error: {e}") return { "jsonrpc": "2.0", "id": req_id or "unknown", "error": {"code": -32602, "message": f"Invalid parameter: {str(e)}"}, } return generic_tool_function def _build_parameter_schema(self, tool: OpenAPITool) -> Dict[str, Any]: """Build parameter schema for tool from OpenAPI parameters.""" schema = {"type": "object", "properties": {}, "required": []} for param in tool.parameters: param_name = param.get("name", "") param_schema = param.get("schema", {}) schema["properties"][param_name] = { "type": param_schema.get("type", "string"), "description": param.get("description", ""), **param_schema, } if param.get("required", False): schema["required"].append(param_name) # Add common parameters schema["properties"]["dry_run"] = { "type": "boolean", "description": "Show request details without executing", "default": False, } return schema def _register_management_tools(self): """Register server management tools.""" async def server_info() -> Dict[str, Any]: """Get server information.""" return { "server_name": self.config.server_name, "api_title": self.api_info.get("title", "API"), "api_version": self.api_info.get("version", "Unknown"), "api_description": self.api_info.get("description", ""), "server_url": self.server_url, "total_operations": len(self.operations), "authentication": { "oauth_configured": self.config.is_oauth_configured(), "username_auth_configured": self.config.is_username_auth_configured(), }, } async def list_operations() -> Dict[str, Any]: """List all available API operations.""" operations = [] for tool in self.operations: operations.append( { "operation_id": tool.operation_id, "method": tool.method, "path": tool.path, "summary": tool.summary, "description": tool.description, "tool_name": f"{self.config.server_name}_{tool.operation_id}", } ) return {"total_operations": len(operations), "operations": operations} # Register management tools using the decorator pattern self.mcp.tool( name=f"{self.config.server_name}_server_info", description=f"Get information about the {self.config.server_name} API server", )(server_info) self.mcp.tool( name=f"{self.config.server_name}_list_operations", description=f"List all available API operations for {self.config.server_name}", )(list_operations) def _register_resources(self): """Register OpenAPI schemas as MCP resources.""" schemas = self.openapi_spec.get("components", {}).get("schemas", {}) for schema_name, schema in schemas.items(): resource_name = f"{self.config.server_name}_{schema_name}" safe_name = NameSanitizer.sanitize_resource_name(resource_name) # Create resource function with closure to capture schema def make_schema_resource(schema_data): async def get_schema() -> str: """Get OpenAPI schema.""" return SchemaConverter.convert_openapi_to_mcp_schema(schema_data) return get_schema # Use the resource decorator to register self.mcp.resource( uri=f"schema://{safe_name}", name=safe_name, description=f"[{self.config.server_name}] Schema for {schema_name}", mime_type="application/json", )(make_schema_resource(schema)) def _register_prompts(self): """Register contextual prompts for API usage.""" async def api_usage_prompt() -> str: """Generate API usage prompt.""" api_title = self.api_info.get("title", "API") content = f"""# {self.config.server_name} - {api_title} Usage Guide This server provides access to {len(self.operations)} API operations from {api_title}. ## Available Operations: """ for tool in self.operations[:10]: # Show first 10 operations content += f"\n### {tool.operation_id}\n" content += f"- **Method:** {tool.method.upper()}\n" content += f"- **Path:** {tool.path}\n" content += f"- **Description:** {tool.summary or tool.description}\n" content += f"- **Tool Name:** `{self.config.server_name}_{tool.operation_id}`\n" if len(self.operations) > 10: content += f"\n... and {len(self.operations) - 10} more operations.\n" content += "\n## Usage Tips:\n" content += "- Use `dry_run=true` to see request details without executing\n" content += f"- Check the `{self.config.server_name}_server_info` tool for server details\n" content += f"- Use `{self.config.server_name}_list_operations` to see all available operations\n" return content # Use the prompt decorator to register with name and description self.mcp.prompt( name=f"{self.config.server_name}_api_usage", description=f"Guide for using the {self.api_info.get('title', 'API')} via {self.config.server_name}", )(api_usage_prompt) def run_stdio(self): """Run server with stdio transport (for MCP clients).""" self.mcp.run() async def run_sse_async(self, host: str = "127.0.0.1", port: int = 8000): """Run server with SSE transport asynchronously.""" import uvicorn app = self.mcp.sse_app() config = uvicorn.Config(app, host=host, port=port, log_level="info") server = uvicorn.Server(config) await server.serve() async def run_http_async(self, host: str = "127.0.0.1", port: int = 8000): """Run server with streamable HTTP transport asynchronously.""" import uvicorn app = self.mcp.streamable_http_app() config = uvicorn.Config(app, host=host, port=port, log_level="info") server = uvicorn.Server(config) await server.serve() def run_sse(self, host: str = "127.0.0.1", port: int = 8000): """Run server with SSE transport.""" # FastMCP uses uvicorn to run SSE server import uvicorn app = self.mcp.sse_app() uvicorn.run(app, host=host, port=port) def run_http(self, host: str = "127.0.0.1", port: int = 8000): """Run server with streamable HTTP transport.""" import uvicorn app = self.mcp.streamable_http_app() uvicorn.run(app, host=host, port=port) def get_sse_app(self): """Get SSE app for custom deployment.""" return self.mcp.sse_app() def get_http_app(self): """Get HTTP app for custom deployment.""" return self.mcp.streamable_http_app() def main(): """Main entry point following FastMCP patterns.""" try: # Load configuration config = ServerConfig() # Create and initialize server server = FastMCPOpenAPIServer(config) # Initialize synchronously since FastMCP handles async internally import asyncio asyncio.run(server.initialize()) # Choose transport based on configuration if config.mcp_http_enabled: # Check if we should use streamable HTTP or SSE transport_type = os.environ.get("MCP_TRANSPORT_TYPE", "sse").lower() if transport_type == "http" or transport_type == "streamable": logging.info( f"Starting FastMCP Streamable HTTP server on {config.mcp_http_host}:{config.mcp_http_port}" ) server.run_http(host=config.mcp_http_host, port=config.mcp_http_port) else: logging.info(f"Starting FastMCP SSE server on {config.mcp_http_host}:{config.mcp_http_port}") server.run_sse(host=config.mcp_http_host, port=config.mcp_http_port) else: logging.info("Starting FastMCP stdio server") server.run_stdio() except ConfigurationError as e: logging.error(f"Configuration error: {e}") sys.exit(1) except (OpenAPIError, FileNotFoundError) as e: logging.error(f"Failed to load OpenAPI spec: {e}") sys.exit(1) except RuntimeError as e: logging.error(f"Failed to start server: {e}") sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gujord/OpenAPI-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server