MCP-OpenAPI
by rmasters
- mcp-openapi
- mcp_openapi
from http import HTTPMethod
from typing import Annotated, Any
from mcp.server.fastmcp.server import FastMCP, Settings as FastMCPSettings
from mcp.server.fastmcp.tools.tool_manager import (
ToolManager,
logger as tool_manager_logger,
)
from mcp.server.fastmcp.tools.base import Tool
from mcp.server.fastmcp.utilities.func_metadata import FuncMetadata, ArgModelBase
from pydantic import Field, WithJsonSchema, create_model
from pydantic.fields import FieldInfo
from pydantic_core import PydanticUndefined
import yarl
from openapi_parser import parse as openapi_parser_parse
import openapi_parser.specification as openapi_spec
from mcp_openapi.api_client import APIClient
from mcp_openapi.settings import Settings as OpenAPISettings
class ServerFactory:
openapi_settings: OpenAPISettings
fastmcp_settings: FastMCPSettings
def __init__(
self, openapi_settings: OpenAPISettings, fastmcp_settings: FastMCPSettings
):
self.openapi_settings = openapi_settings
self.fastmcp_settings = fastmcp_settings
async def load_openapi_spec(self) -> openapi_spec.Specification:
"""Load the OpenAPI spec from the URL - this should resolve any components in other specs as well"""
return openapi_parser_parse(
uri=str(self.openapi_settings.openapi_url), strict_enum=False
)
async def build_server(self) -> FastMCP:
"""Build the server from the OpenAPI spec"""
spec = await self.load_openapi_spec()
# Get first server URL
# TODO: Allow this to be overridden by a CLI arg
if not spec.servers:
raise ValueError("No servers found in OpenAPI spec")
server_url = yarl.URL(spec.servers[0].url)
if not server_url.is_absolute():
# TODO: test this jank
server_url = yarl.URL(str(self.openapi_settings.openapi_url)).join(
server_url
)
# Initialize API client
api_client = APIClient(str(server_url).rstrip("/"))
# Register each path operation as a tool
# TODO: Filter by tags
# TODO: Filter by operation_id matches
# TODO: Filter by HTTP method
# TODO: Restrict to specified subset of paths
mcp = FastMCP()
for path in spec.paths:
for op in path.operations:
tool = tool_from_path(api_client, path, op)
register_tool(mcp, tool)
return mcp
def register_tool(server: FastMCP, tool: Tool) -> None:
tool_manager: ToolManager = server._tool_manager
# Duplicated code from ToolManager.add_tool, without the Tool.function introspector
# Perhaps we should go lower level than FastMCP, might make all the pydantic stuff redundant
existing = tool_manager._tools.get(tool.name)
if existing:
if tool_manager.warn_on_duplicate_tools:
tool_manager_logger.warning(f"Tool already exists: {tool.name}")
return
tool_manager._tools[tool.name] = tool
def json_schema_type_to_python_type(
schema: openapi_spec.Schema,
) -> type[int | str | bool | list[Any] | dict[str, Any] | None | float]:
"""Convert a JSON Schema type to a Python type - likely incomplete"""
if schema.type == openapi_spec.DataType.INTEGER:
return int
if schema.type == openapi_spec.DataType.STRING:
return str
if schema.type == openapi_spec.DataType.BOOLEAN:
return bool
if schema.type == openapi_spec.DataType.ARRAY:
return list[Any]
if schema.type == openapi_spec.DataType.OBJECT:
return dict[str, Any]
if schema.type == openapi_spec.DataType.NUMBER:
return float
if schema.type == openapi_spec.DataType.NULL:
return type(None)
raise ValueError(f"Unsupported type: {schema.type}")
def tool_params_from_operation(op: openapi_spec.Operation) -> FuncMetadata:
"""
Collect the parameters required for the operation
This can include path parameters, query parameters, and body parameters.
TODO: Here be bugs for sure.
"""
def create_param(
p: openapi_spec.Parameter | openapi_spec.Property,
is_required: bool | None = None,
) -> tuple[type, FieldInfo]:
type_ = json_schema_type_to_python_type(p.schema)
if isinstance(p, openapi_spec.Parameter):
is_required = is_required if is_required is not None else p.required
annotation = Annotated[
type_, # type: ignore[valid-type] # This seems impossible to make mypy happy, hopefully we can remove it later
Field(),
WithJsonSchema(
{
"title": p.name,
"type": p.schema.type.value,
"description": p.schema.description,
"default": p.schema.default,
"required": is_required,
"enum": p.schema.enum,
"examples": p.schema.example,
"deprecated": p.schema.deprecated,
"readOnly": p.schema.read_only,
"writeOnly": p.schema.write_only,
}
),
]
default = p.schema.default if p.schema.default else PydanticUndefined
field_info = FieldInfo.from_annotated_attribute(
annotation, # type: ignore[arg-type]
default,
)
assert field_info.annotation is not None
return field_info.annotation, field_info
params = {}
if op.parameters:
for p in op.parameters:
params[p.name] = create_param(p)
if rb := op.request_body:
for content in rb.content:
if isinstance(content.schema, openapi_spec.Object):
for prop in content.schema.properties:
params[prop.name] = create_param(
prop, is_required=prop.name in content.schema.required
)
arg_model = create_model(
f"{op.operation_id}Arguments",
**params,
__base__=ArgModelBase, # type: ignore[call-overload]
)
return FuncMetadata(
arg_model=arg_model,
)
def tool_from_path(
api_client: APIClient, path: openapi_spec.Path, op: openapi_spec.Operation
) -> Tool:
"""Convert a path operation to a tool"""
async def fn(*args, **kwargs) -> str:
"""The function that will be called when the tool is invoked"""
assert op.operation_id is not None
return await api_client.call(op.operation_id, *args, **kwargs)
# Register the operation with the API client, so it can match up parameters to path/query/body parameters
assert op.operation_id is not None
api_client.add_request(
op.operation_id, HTTPMethod(op.method.value.upper()), path.url, op
)
# Create the tool metadata
fn_metadata = tool_params_from_operation(op)
# Build the tool
return Tool(
fn=fn,
fn_metadata=fn_metadata,
name=op.operation_id,
description=" - ".join(filter(None, [op.summary, op.description])),
parameters=fn_metadata.arg_model.model_json_schema(),
is_async=True,
context_kwarg=None,
)