Skip to main content
Glama
gujord

OpenAPI to Model Context Protocol (MCP)

server.py24.9 kB
# SPDX-License-Identifier: MIT # Copyright (c) 2025 Roger Gujord # https://github.com/gujord/OpenAPI-MCP """ DEPRECATED: This module is deprecated and will be removed in a future version. Please use `fastmcp_server.py` instead: from openapi_mcp.fastmcp_server import FastMCPOpenAPIServer from openapi_mcp.config import ServerConfig config = ServerConfig() server = FastMCPOpenAPIServer(config) await server.initialize() server.run_stdio() The FastMCPOpenAPIServer provides: - Simpler API with better FastMCP integration - Automatic retry logic with exponential backoff - Debug logging support via MCP_DEBUG environment variable - Better error handling and validation """ __all__ = [ "MCPResource", "Prompt", "ResourceManager", "PromptGenerator", "MCPServer", "main", ] import os import sys import time import logging import warnings from typing import Dict, Any, List, Optional from fastmcp import FastMCP # Emit deprecation warning when module is imported warnings.warn( "openapi_mcp.server is deprecated. Use openapi_mcp.fastmcp_server instead. " "See migration guide at https://github.com/gujord/OpenAPI-MCP#migration", DeprecationWarning, stacklevel=2, ) try: from .config import ServerConfig from .auth import AuthenticationManager from .openapi_loader import OpenAPILoader, OpenAPIParser from .request_handler import RequestHandler from .tool_factory import ToolMetadataBuilder, ToolFunctionFactory from .schema_converter import SchemaConverter, NameSanitizer, ResourceNameProcessor from .sse_handler import SSEManager, SSEToolFactory, ChunkProcessors from .sse_server import SSEServerManager from .mcp_transport import MCPHttpTransport from .exceptions import * except ImportError: from config import ServerConfig from auth import AuthenticationManager from openapi_loader import OpenAPILoader, OpenAPIParser from request_handler import RequestHandler from tool_factory import ToolMetadataBuilder, ToolFunctionFactory from schema_converter import SchemaConverter, NameSanitizer, ResourceNameProcessor from sse_handler import SSEManager, SSEToolFactory, ChunkProcessors from sse_server import SSEServerManager from mcp_transport import MCPHttpTransport from exceptions import * class MCPResource: """Represents an MCP resource.""" def __init__(self, name: str, schema: dict, description: str): self.name = name self.schema = schema self.description = description self.uri = f"/resource/{name}" class Prompt: """Represents an MCP prompt.""" def __init__(self, name: str, content: str, description: str = ""): self.name = name self.content = content self.description = description class ResourceManager: """Manages MCP resources from OpenAPI schemas.""" def __init__(self, server_name: str, api_category: Optional[str] = None): self.server_name = server_name self.api_category = api_category self.registered_resources: Dict[str, Dict[str, Any]] = {} def register_resources_from_openapi(self, openapi_spec: Dict[str, Any], mcp_server: FastMCP) -> int: """Register resources from OpenAPI component schemas.""" schemas = openapi_spec.get("components", {}).get("schemas", {}) resource_count = 0 for schema_name, schema in schemas.items(): prefixed_name = f"{self.server_name}_{schema_name}" safe_name = NameSanitizer.sanitize_resource_name(prefixed_name) resource_schema = SchemaConverter.convert_openapi_to_mcp_schema(schema) resource_description = f"[{self.server_name}] {schema.get('description', f'Resource for {schema_name}')}" resource_obj = MCPResource(name=safe_name, schema=resource_schema, description=resource_description) mcp_server.add_resource(resource_obj) self.registered_resources[safe_name] = { "schema": resource_schema, "metadata": { "name": safe_name, "description": resource_description, "serverInfo": {"name": self.server_name}, "tags": ["resource", self.server_name] + ([self.api_category] if self.api_category else []), }, } resource_count += 1 return resource_count class PromptGenerator: """Generates MCP prompts from OpenAPI specifications.""" def __init__(self, server_name: str, openapi_spec: Dict[str, Any]): self.server_name = server_name self.openapi_spec = openapi_spec def generate_api_usage_prompt(self) -> Prompt: """Generate general API usage prompt.""" info = self.openapi_spec.get("info", {}) api_title = info.get("title", "API") content = f"""# {self.server_name} - API Usage Guide for {api_title} This API provides the following capabilities: """ for path, methods in self.openapi_spec.get("paths", {}).items(): for method, details in methods.items(): if method.lower() in {"get", "post", "put", "delete", "patch"}: raw_tool_name = details.get("operationId") or f"{method}_{path}" tool_name = f"{self.server_name}_{raw_tool_name}" content += f"\n## {tool_name}\n" content += f"- Path: `{path}` (HTTP {method.upper()})\n" content += ( f"- Description: {details.get('description') or details.get('summary', 'No description')}\n" ) if details.get("parameters"): content += "- Parameters:\n" for param in details.get("parameters", []): required = "Required" if param.get("required") else "Optional" content += f" - `{param.get('name')}` ({param.get('in')}): {param.get('description', 'No description')} [{required}]\n" prompt_name = f"{self.server_name}_api_general_usage" prompt_description = f"[{self.server_name}] General guidance for using {api_title} API" return Prompt(prompt_name, content, prompt_description) def generate_example_prompts(self) -> List[Prompt]: """Generate example usage prompts for CRUD operations.""" crud_ops = self._identify_crud_operations() prompts = [] for resource, operations in crud_ops.items(): content = f"""# {self.server_name} - Examples for working with {resource} Common scenarios for handling {resource} resources: """ if "list" in operations: prefixed_op = f"{self.server_name}_{operations['list']}" content += f""" ## Listing {resource} resources To list all {resource} resources: ``` {{{{tool.{prefixed_op}()}}}} ``` """ if "get" in operations: prefixed_op = f"{self.server_name}_{operations['get']}" content += f""" ## Getting a specific {resource} To retrieve a specific {resource} by ID: ``` {{{{tool.{prefixed_op}(id="example-id")}}}} ``` """ if "create" in operations: prefixed_op = f"{self.server_name}_{operations['create']}" content += f""" ## Creating a new {resource} To create a new {resource}: ``` {{{{tool.{prefixed_op}( name="Example name", description="Example description" # Add other required fields )}}}} ``` """ prompt_name = f"{self.server_name}_{resource}_examples" prompt_description = f"[{self.server_name}] Example usage patterns for {resource} resources" prompts.append(Prompt(prompt_name, content, prompt_description)) return prompts def _identify_crud_operations(self) -> Dict[str, Dict[str, str]]: """Identify CRUD operations from OpenAPI paths.""" crud_ops = {} for path, methods in self.openapi_spec.get("paths", {}).items(): path_parts = [p for p in path.split("/") if p and not p.startswith("{")] if not path_parts: continue resource = ResourceNameProcessor.singularize_resource(path_parts[-1]) if resource not in crud_ops: crud_ops[resource] = {} for method, details in methods.items(): op_id = NameSanitizer.sanitize_name(details.get("operationId") or f"{method}_{path}") if method.lower() == "get": if "{" in path and "}" in path: crud_ops[resource]["get"] = op_id else: crud_ops[resource]["list"] = op_id elif method.lower() == "post": crud_ops[resource]["create"] = op_id elif method.lower() in {"put", "patch"}: crud_ops[resource]["update"] = op_id elif method.lower() == "delete": crud_ops[resource]["delete"] = op_id return crud_ops class MCPServer: """Main MCP server class with modular architecture. .. deprecated:: Use :class:`openapi_mcp.fastmcp_server.FastMCPOpenAPIServer` instead. """ def __init__(self, config: ServerConfig): warnings.warn( "MCPServer is deprecated. Use FastMCPOpenAPIServer from " "openapi_mcp.fastmcp_server instead.", DeprecationWarning, stacklevel=2, ) self.config = config self.server_name = config.server_name self.mcp = FastMCP(self.server_name) # Initialize components self.authenticator = AuthenticationManager(config) self.request_handler = RequestHandler(self.authenticator) self.resource_manager = None # SSE components (deprecated - kept for backward compatibility) self.sse_manager = SSEManager() if config.sse_enabled else None self.sse_server_manager = None self.sse_tool_factory = None if self.sse_manager: self.sse_server_manager = SSEServerManager(self.sse_manager, config.sse_host, config.sse_port) self.sse_tool_factory = SSEToolFactory(self.sse_manager) logging.info("SSE support enabled (deprecated - use MCP_HTTP_ENABLED)") # MCP HTTP Transport self.mcp_transport = None if config.mcp_http_enabled: self.mcp_transport = MCPHttpTransport( mcp_server=self, host=config.mcp_http_host, port=config.mcp_http_port, cors_origins=config.mcp_cors_origins, message_size_limit=config.mcp_message_size_limit, batch_timeout=config.mcp_batch_timeout, session_timeout=config.mcp_session_timeout, ) logging.info(f"MCP HTTP transport enabled on {config.mcp_http_host}:{config.mcp_http_port}") # Server state self.registered_tools: Dict[str, Dict[str, Any]] = {} self.openapi_spec: Dict[str, Any] = {} self.operations_info: Dict[str, Dict[str, Any]] = {} self.api_category: Optional[str] = None def initialize(self): """Initialize the server with OpenAPI spec and components.""" try: # Get custom headers for loading the spec auth_headers = self.authenticator.get_custom_headers() if self.authenticator else None # Load OpenAPI spec with auth headers self.openapi_spec = OpenAPILoader.load_spec(self.config.openapi_url, auth_headers) server_url = OpenAPILoader.extract_server_url(self.openapi_spec, self.config.openapi_url) # Parse operations parser = OpenAPIParser(NameSanitizer.sanitize_name) self.operations_info = parser.parse_operations(self.openapi_spec) # Extract API info api_title, self.api_category = parser.extract_api_info(self.openapi_spec) # Initialize resource manager self.resource_manager = ResourceManager(self.server_name, self.api_category) # Initialize tool factory self.tool_factory = ToolFunctionFactory(self.request_handler, server_url) self.metadata_builder = ToolMetadataBuilder(self.server_name, self.api_category) logging.info( "Loaded API: %s (version: %s)", api_title, self.openapi_spec.get("info", {}).get("version", "Unknown") ) except Exception as e: logging.error("Failed to initialize server: %s", e) raise def register_openapi_tools(self) -> int: """Register tools from OpenAPI operations.""" tool_count = 0 for op_id, info in self.operations_info.items(): try: # Create tool function tool_function = self.tool_factory.create_tool_function( op_id, info["method"], info["path"], info.get("parameters", []) ) # Build metadata tool_metadata = self.metadata_builder.build_tool_metadata({op_id: info})[0] # Note: Custom streaming support removed for MCP compliance # MCP transport layer handles streaming via SSE according to official spec # Register tool self._add_tool(op_id, tool_function, info.get("summary", op_id), tool_metadata) tool_count += 1 except Exception as e: logging.error("Failed to register tool for operation %s: %s", op_id, e) return tool_count def register_standard_tools(self): """Register standard MCP tools.""" self._add_tool("initialize", self._initialize_tool, "Initialize MCP server.") self._add_tool("tools_list", self._tools_list_tool, "List available tools with extended metadata.") self._add_tool("tools_call", self._tools_call_tool, "Call a tool by name with provided arguments.") # Register SSE-specific tools if enabled if self.sse_manager: self._add_tool("sse_connections", self._sse_connections_tool, "Get SSE connection information.") self._add_tool("sse_broadcast", self._sse_broadcast_tool, "Broadcast message to all SSE connections.") def register_resources(self) -> int: """Register resources from OpenAPI schemas.""" if not self.resource_manager: return 0 return self.resource_manager.register_resources_from_openapi(self.openapi_spec, self.mcp) def generate_prompts(self) -> int: """Generate and register prompts.""" prompt_generator = PromptGenerator(self.server_name, self.openapi_spec) # Generate general usage prompt general_prompt = prompt_generator.generate_api_usage_prompt() self.mcp.add_prompt(general_prompt) # Generate example prompts example_prompts = prompt_generator.generate_example_prompts() for prompt in example_prompts: self.mcp.add_prompt(prompt) return 1 + len(example_prompts) def _add_tool(self, name: str, func: Any, description: str, metadata: Optional[Dict[str, Any]] = None): """Add a tool to the server.""" prefixed_name = f"{self.server_name}_{name}" safe_name = NameSanitizer.sanitize_tool_name(prefixed_name) enhanced_description = f"[{self.server_name}] {description}" if metadata is None: metadata = { "name": safe_name, "description": enhanced_description, "tags": ["openapi", "api", self.server_name], "serverInfo": {"name": self.server_name}, } else: metadata["name"] = safe_name metadata["description"] = enhanced_description metadata.setdefault("tags", ["openapi", "api", self.server_name]) metadata["serverInfo"] = {"name": self.server_name} self.registered_tools[safe_name] = {"function": func, "metadata": metadata} # Use the tool decorator pattern for FastMCP compatibility self.mcp.tool(name=safe_name, description=enhanced_description)(func) def _initialize_tool(self, req_id: Any = None, **kwargs): """Initialize tool implementation.""" server_description = self.openapi_spec.get("info", {}).get( "description", f"OpenAPI Proxy for {self.server_name}" ) api_title = self.openapi_spec.get("info", {}).get("title", "API") return { "jsonrpc": "2.0", "id": req_id, "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {"listChanged": True}}, "serverInfo": { "name": self.server_name, "version": "1.0.0", "description": f"OpenAPI Proxy for {api_title}: {server_description}", "category": self.api_category or "API Integration", "tags": ["openapi", "api", self.server_name] + ([self.api_category] if self.api_category else []), }, }, } def _tools_list_tool(self, req_id: Any = None): """List tools implementation.""" tool_list = [data["metadata"] for data in self.registered_tools.values()] return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": tool_list}} def _tools_call_tool(self, req_id: Any = None, name: str = None, arguments: Optional[Dict[str, Any]] = None): """Call tool implementation.""" if not name: return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32602, "message": "Missing tool name"}} # Handle case where user forgot the server prefix if name not in self.registered_tools: prefixed_name = f"{self.server_name}_{name}" if prefixed_name in self.registered_tools: name = prefixed_name else: error = ToolNotFoundError(name, f"{self.server_name}_{name}") return error.to_json_rpc_error(req_id) try: func = self.registered_tools[name]["function"] return func(req_id=req_id, **(arguments or {})) except Exception as e: logging.error("Error calling tool %s: %s", name, e) error = RequestExecutionError(str(e)) return error.to_json_rpc_error(req_id) def _get_chunk_processor(self, operation_info: Dict[str, Any]): """Determine appropriate chunk processor for an operation.""" response_schema = operation_info.get("responseSchema", {}) # Check for common streaming content types if "application/json" in str(response_schema): return ChunkProcessors.json_lines_processor elif "text/csv" in str(response_schema): return ChunkProcessors.csv_processor else: return ChunkProcessors.text_processor def _sse_connections_tool(self, req_id: Any = None, **kwargs): """SSE connections information tool.""" if not self.sse_manager: return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": "SSE not enabled"}} connections_info = [] for connection_id, connection in self.sse_manager.connections.items(): connections_info.append( { "connection_id": connection_id, "connected": connection.connected, "last_heartbeat": connection.last_heartbeat, "heartbeat_interval": connection.heartbeat_interval, } ) return { "jsonrpc": "2.0", "id": req_id, "result": { "active_connections": len(connections_info), "connections": connections_info, "sse_server_url": self.sse_server_manager.get_health_url() if self.sse_server_manager else None, }, } def _sse_broadcast_tool(self, req_id: Any = None, message: str = None, **kwargs): """SSE broadcast tool.""" if not self.sse_manager: return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": "SSE not enabled"}} if not message: return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32602, "message": "Missing 'message' parameter"}} import asyncio try: from .sse_handler import SSEEvent, SSEEventType except ImportError: from sse_handler import SSEEvent, SSEEventType async def broadcast(): event = SSEEvent(type=SSEEventType.DATA, data={"broadcast_message": message, "from": "mcp_server"}) await self.sse_manager.broadcast_to_all(event) try: # Run the broadcast asyncio.create_task(broadcast()) return { "jsonrpc": "2.0", "id": req_id, "result": { "success": True, "message": "Broadcast initiated", "connection_count": self.sse_manager.get_connection_count(), }, } except Exception as e: return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32603, "message": f"Broadcast failed: {e}"}} async def start_sse_server(self): """Start the SSE HTTP server if enabled.""" if self.sse_server_manager: await self.sse_server_manager.start() async def stop_sse_server(self): """Stop the SSE HTTP server if running.""" if self.sse_server_manager: await self.sse_server_manager.stop() def run(self): """Run the MCP server.""" import asyncio # Choose transport mode if self.mcp_transport: # Run with MCP HTTP transport async def run_with_mcp_transport(): try: await self.mcp_transport.start() except KeyboardInterrupt: logging.info("Shutting down MCP HTTP transport...") await self.mcp_transport.stop() asyncio.run(run_with_mcp_transport()) elif self.sse_server_manager: # Backward compatibility: Run with deprecated SSE server async def run_with_sse(): await self.start_sse_server() # Give SSE server time to start await asyncio.sleep(1) # Run MCP server (this is blocking) self.mcp.run(transport="stdio") try: asyncio.run(run_with_sse()) except KeyboardInterrupt: logging.info("Shutting down servers...") asyncio.run(self.stop_sse_server()) else: # Default: stdio transport self.mcp.run(transport="stdio") def main(): """Main entry point. .. deprecated:: Use ``openapi_mcp.fastmcp_server.main()`` instead. """ warnings.warn( "openapi_mcp.server.main() is deprecated. " "Use openapi_mcp.fastmcp_server.main() instead.", DeprecationWarning, stacklevel=2, ) try: # Load configuration config = ServerConfig() logging.info("Starting OpenAPI-MCP server with name: %s", config.server_name) logging.info("OpenAPI URL: %s", config.openapi_url) start_time = time.time() # Create and initialize server server = MCPServer(config) server.initialize() # Register components api_tool_count = server.register_openapi_tools() server.register_standard_tools() resource_count = server.register_resources() prompt_count = server.generate_prompts() # Log summary total_tools = len(server.registered_tools) setup_time = time.time() - start_time logging.info("Successfully registered %d/%d API tools", api_tool_count, len(server.operations_info)) logging.info("Total registered tools: %d (API tools: %d, Standard tools: 3)", total_tools, api_tool_count) logging.info("Total registered resources: %d", resource_count) logging.info("Generated %d prompts", prompt_count) logging.info("Server setup completed in %.2f seconds", setup_time) logging.info("Server %s ready", config.server_name) # Start server logging.info("Starting MCP server...") server.run() except ConfigurationError as e: logging.error("Configuration error: %s", e.message) sys.exit(1) except Exception as e: logging.error("Failed to start server: %s", e) sys.exit(1) if __name__ == "__main__": logging.basicConfig(level=logging.ERROR) main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gujord/OpenAPI-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server