import json
from typing import Any, Generic, Literal, TypeVar, cast
from openapi_pydantic import (
OpenAPI,
Operation,
Parameter,
PathItem,
Reference,
RequestBody,
Response,
Schema,
)
# Import OpenAPI 3.0 models as well
from openapi_pydantic.v3.v3_0 import OpenAPI as OpenAPI_30
from openapi_pydantic.v3.v3_0 import Operation as Operation_30
from openapi_pydantic.v3.v3_0 import Parameter as Parameter_30
from openapi_pydantic.v3.v3_0 import PathItem as PathItem_30
from openapi_pydantic.v3.v3_0 import Reference as Reference_30
from openapi_pydantic.v3.v3_0 import RequestBody as RequestBody_30
from openapi_pydantic.v3.v3_0 import Response as Response_30
from openapi_pydantic.v3.v3_0 import Schema as Schema_30
from pydantic import BaseModel, Field, ValidationError
from fastmcp.utilities.logging import get_logger
from fastmcp.utilities.types import FastMCPBaseModel
logger = get_logger(__name__)
# --- Intermediate Representation (IR) Definition ---
# (IR models remain the same)
HttpMethod = Literal[
"GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "TRACE"
]
ParameterLocation = Literal["path", "query", "header", "cookie"]
JsonSchema = dict[str, Any]
def format_array_parameter(
values: list, parameter_name: str, is_query_parameter: bool = False
) -> str | list:
"""
Format an array parameter according to OpenAPI specifications.
Args:
values: List of values to format
parameter_name: Name of the parameter (for error messages)
is_query_parameter: If True, can return list for explode=True behavior
Returns:
String (comma-separated) or list (for query params with explode=True)
"""
# For arrays of simple types (strings, numbers, etc.), join with commas
if all(isinstance(item, str | int | float | bool) for item in values):
return ",".join(str(v) for v in values)
# For complex types, try to create a simpler representation
try:
# Try to create a simple string representation
formatted_parts = []
for item in values:
if isinstance(item, dict):
# For objects, serialize key-value pairs
item_parts = []
for k, v in item.items():
item_parts.append(f"{k}:{v}")
formatted_parts.append(".".join(item_parts))
else:
formatted_parts.append(str(item))
return ",".join(formatted_parts)
except Exception as e:
param_type = "query" if is_query_parameter else "path"
logger.warning(
f"Failed to format complex array {param_type} parameter '{parameter_name}': {e}"
)
if is_query_parameter:
# For query parameters, fallback to original list
return values
else:
# For path parameters, fallback to string representation without Python syntax
# Use str.translate() for efficient character removal
translation_table = str.maketrans("", "", "[]'\"")
str_value = str(values).translate(translation_table)
return str_value
def format_deep_object_parameter(
param_value: dict, parameter_name: str
) -> dict[str, str]:
"""
Format a dictionary parameter for deepObject style serialization.
According to OpenAPI 3.0 spec, deepObject style with explode=true serializes
object properties as separate query parameters with bracket notation.
For example: `{"id": "123", "type": "user"}` becomes `param[id]=123¶m[type]=user`.
Args:
param_value: Dictionary value to format
parameter_name: Name of the parameter
Returns:
Dictionary with bracketed parameter names as keys
"""
if not isinstance(param_value, dict):
logger.warning(
f"deepObject style parameter '{parameter_name}' expected dict, got {type(param_value)}"
)
return {}
result = {}
for key, value in param_value.items():
# Format as param[key]=value
bracketed_key = f"{parameter_name}[{key}]"
result[bracketed_key] = str(value)
return result
class ParameterInfo(FastMCPBaseModel):
"""Represents a single parameter for an HTTP operation in our IR."""
name: str
location: ParameterLocation # Mapped from 'in' field of openapi-pydantic Parameter
required: bool = False
schema_: JsonSchema = Field(..., alias="schema") # Target name in IR
description: str | None = None
explode: bool | None = None # OpenAPI explode property for array parameters
style: str | None = None # OpenAPI style property for parameter serialization
class RequestBodyInfo(FastMCPBaseModel):
"""Represents the request body for an HTTP operation in our IR."""
required: bool = False
content_schema: dict[str, JsonSchema] = Field(
default_factory=dict
) # Key: media type
description: str | None = None
class ResponseInfo(FastMCPBaseModel):
"""Represents response information in our IR."""
description: str | None = None
# Store schema per media type, key is media type
content_schema: dict[str, JsonSchema] = Field(default_factory=dict)
class HTTPRoute(FastMCPBaseModel):
"""Intermediate Representation for a single OpenAPI operation."""
path: str
method: HttpMethod
operation_id: str | None = None
summary: str | None = None
description: str | None = None
tags: list[str] = Field(default_factory=list)
parameters: list[ParameterInfo] = Field(default_factory=list)
request_body: RequestBodyInfo | None = None
responses: dict[str, ResponseInfo] = Field(
default_factory=dict
) # Key: status code str
schema_definitions: dict[str, JsonSchema] = Field(
default_factory=dict
) # Store component schemas
extensions: dict[str, Any] = Field(default_factory=dict)
openapi_version: str | None = None
# Export public symbols
__all__ = [
"HTTPRoute",
"ParameterInfo",
"RequestBodyInfo",
"ResponseInfo",
"HttpMethod",
"ParameterLocation",
"JsonSchema",
"parse_openapi_to_http_routes",
"extract_output_schema_from_responses",
"format_deep_object_parameter",
"_handle_nullable_fields",
]
# Type variables for generic parser
TOpenAPI = TypeVar("TOpenAPI", OpenAPI, OpenAPI_30)
TSchema = TypeVar("TSchema", Schema, Schema_30)
TReference = TypeVar("TReference", Reference, Reference_30)
TParameter = TypeVar("TParameter", Parameter, Parameter_30)
TRequestBody = TypeVar("TRequestBody", RequestBody, RequestBody_30)
TResponse = TypeVar("TResponse", Response, Response_30)
TOperation = TypeVar("TOperation", Operation, Operation_30)
TPathItem = TypeVar("TPathItem", PathItem, PathItem_30)
def parse_openapi_to_http_routes(openapi_dict: dict[str, Any]) -> list[HTTPRoute]:
"""
Parses an OpenAPI schema dictionary into a list of HTTPRoute objects
using the openapi-pydantic library.
Supports both OpenAPI 3.0.x and 3.1.x versions.
"""
# Check OpenAPI version to use appropriate model
openapi_version = openapi_dict.get("openapi", "")
try:
if openapi_version.startswith("3.0"):
# Use OpenAPI 3.0 models
openapi_30 = OpenAPI_30.model_validate(openapi_dict)
logger.debug(
f"Successfully parsed OpenAPI 3.0 schema version: {openapi_30.openapi}"
)
parser = OpenAPIParser(
openapi_30,
Reference_30,
Schema_30,
Parameter_30,
RequestBody_30,
Response_30,
Operation_30,
PathItem_30,
openapi_version,
)
return parser.parse()
else:
# Default to OpenAPI 3.1 models
openapi_31 = OpenAPI.model_validate(openapi_dict)
logger.debug(
f"Successfully parsed OpenAPI 3.1 schema version: {openapi_31.openapi}"
)
parser = OpenAPIParser(
openapi_31,
Reference,
Schema,
Parameter,
RequestBody,
Response,
Operation,
PathItem,
openapi_version,
)
return parser.parse()
except ValidationError as e:
logger.error(f"OpenAPI schema validation failed: {e}")
error_details = e.errors()
logger.error(f"Validation errors: {error_details}")
raise ValueError(f"Invalid OpenAPI schema: {error_details}") from e
class OpenAPIParser(
Generic[
TOpenAPI,
TReference,
TSchema,
TParameter,
TRequestBody,
TResponse,
TOperation,
TPathItem,
]
):
"""Unified parser for OpenAPI schemas with generic type parameters to handle both 3.0 and 3.1."""
def __init__(
self,
openapi: TOpenAPI,
reference_cls: type[TReference],
schema_cls: type[TSchema],
parameter_cls: type[TParameter],
request_body_cls: type[TRequestBody],
response_cls: type[TResponse],
operation_cls: type[TOperation],
path_item_cls: type[TPathItem],
openapi_version: str,
):
"""Initialize the parser with the OpenAPI schema and type classes."""
self.openapi = openapi
self.reference_cls = reference_cls
self.schema_cls = schema_cls
self.parameter_cls = parameter_cls
self.request_body_cls = request_body_cls
self.response_cls = response_cls
self.operation_cls = operation_cls
self.path_item_cls = path_item_cls
self.openapi_version = openapi_version
def _convert_to_parameter_location(self, param_in: str) -> ParameterLocation:
"""Convert string parameter location to our ParameterLocation type."""
if param_in in ["path", "query", "header", "cookie"]:
return param_in # type: ignore[return-value] # Safe cast since we checked values
logger.warning(f"Unknown parameter location: {param_in}, defaulting to 'query'")
return "query" # type: ignore[return-value] # Safe cast to default value
def _resolve_ref(self, item: Any) -> Any:
"""Resolves a reference to its target definition."""
if isinstance(item, self.reference_cls):
ref_str = item.ref
try:
if not ref_str.startswith("#/"):
raise ValueError(
f"External or non-local reference not supported: {ref_str}"
)
parts = ref_str.strip("#/").split("/")
target = self.openapi
for part in parts:
if part.isdigit() and isinstance(target, list):
target = target[int(part)]
elif isinstance(target, BaseModel):
# Check class fields first, then model_extra
if part in target.__class__.model_fields:
target = getattr(target, part, None)
elif target.model_extra and part in target.model_extra:
target = target.model_extra[part]
else:
# Special handling for components
if part == "components" and hasattr(target, "components"):
target = getattr(target, "components")
elif hasattr(target, part): # Fallback check
target = getattr(target, part, None)
else:
target = None # Part not found
elif isinstance(target, dict):
target = target.get(part)
else:
raise ValueError(
f"Cannot traverse part '{part}' in reference '{ref_str}'"
)
if target is None:
raise ValueError(
f"Reference part '{part}' not found in path '{ref_str}'"
)
# Handle nested references
if isinstance(target, self.reference_cls):
return self._resolve_ref(target)
return target
except (AttributeError, KeyError, IndexError, TypeError, ValueError) as e:
raise ValueError(f"Failed to resolve reference '{ref_str}': {e}") from e
return item
def _extract_schema_as_dict(self, schema_obj: Any) -> JsonSchema:
"""Resolves a schema and returns it as a dictionary."""
try:
resolved_schema = self._resolve_ref(schema_obj)
if isinstance(resolved_schema, (self.schema_cls)):
# Convert schema to dictionary
result = resolved_schema.model_dump(
mode="json", by_alias=True, exclude_none=True
)
elif isinstance(resolved_schema, dict):
result = resolved_schema
else:
logger.warning(
f"Expected Schema after resolving, got {type(resolved_schema)}. Returning empty dict."
)
result = {}
return _replace_ref_with_defs(result)
except ValueError as e:
# Re-raise ValueError for external reference errors and other validation issues
if "External or non-local reference not supported" in str(e):
raise
logger.error(f"Failed to extract schema as dict: {e}", exc_info=False)
return {}
except Exception as e:
logger.error(f"Failed to extract schema as dict: {e}", exc_info=False)
return {}
def _extract_parameters(
self,
operation_params: list[Any] | None = None,
path_item_params: list[Any] | None = None,
) -> list[ParameterInfo]:
"""Extract and resolve parameters from operation and path item."""
extracted_params: list[ParameterInfo] = []
seen_params: dict[
tuple[str, str], bool
] = {} # Use tuple of (name, location) as key
all_params = (operation_params or []) + (path_item_params or [])
for param_or_ref in all_params:
try:
parameter = self._resolve_ref(param_or_ref)
if not isinstance(parameter, self.parameter_cls):
logger.warning(
f"Expected Parameter after resolving, got {type(parameter)}. Skipping."
)
continue
# Extract parameter info - handle both 3.0 and 3.1 parameter models
param_in = parameter.param_in # Both use param_in
# Handle enum or string parameter locations
from enum import Enum
param_in_str = (
param_in.value if isinstance(param_in, Enum) else param_in
)
param_location = self._convert_to_parameter_location(param_in_str)
param_schema_obj = parameter.param_schema # Both use param_schema
# Skip duplicate parameters (same name and location)
param_key = (parameter.name, param_in_str)
if param_key in seen_params:
continue
seen_params[param_key] = True
# Extract schema
param_schema_dict = {}
if param_schema_obj:
# Process schema object
param_schema_dict = self._extract_schema_as_dict(param_schema_obj)
# Handle default value
resolved_schema = self._resolve_ref(param_schema_obj)
if (
not isinstance(resolved_schema, self.reference_cls)
and hasattr(resolved_schema, "default")
and resolved_schema.default is not None
):
param_schema_dict["default"] = resolved_schema.default
elif hasattr(parameter, "content") and parameter.content:
# Handle content-based parameters
first_media_type = next(iter(parameter.content.values()), None)
if (
first_media_type
and hasattr(first_media_type, "media_type_schema")
and first_media_type.media_type_schema
):
media_schema = first_media_type.media_type_schema
param_schema_dict = self._extract_schema_as_dict(media_schema)
# Handle default value in content schema
resolved_media_schema = self._resolve_ref(media_schema)
if (
not isinstance(resolved_media_schema, self.reference_cls)
and hasattr(resolved_media_schema, "default")
and resolved_media_schema.default is not None
):
param_schema_dict["default"] = resolved_media_schema.default
# Extract explode and style properties if present
explode = getattr(parameter, "explode", None)
style = getattr(parameter, "style", None)
# Create parameter info object
param_info = ParameterInfo(
name=parameter.name,
location=param_location,
required=parameter.required,
schema=param_schema_dict,
description=parameter.description,
explode=explode,
style=style,
)
extracted_params.append(param_info)
except Exception as e:
param_name = getattr(
param_or_ref, "name", getattr(param_or_ref, "ref", "unknown")
)
logger.error(
f"Failed to extract parameter '{param_name}': {e}", exc_info=False
)
return extracted_params
def _extract_request_body(self, request_body_or_ref: Any) -> RequestBodyInfo | None:
"""Extract and resolve request body information."""
if not request_body_or_ref:
return None
try:
request_body = self._resolve_ref(request_body_or_ref)
if not isinstance(request_body, self.request_body_cls):
logger.warning(
f"Expected RequestBody after resolving, got {type(request_body)}. Returning None."
)
return None
# Create request body info
request_body_info = RequestBodyInfo(
required=request_body.required,
description=request_body.description,
)
# Extract content schemas
if hasattr(request_body, "content") and request_body.content:
for media_type_str, media_type_obj in request_body.content.items():
if (
media_type_obj
and hasattr(media_type_obj, "media_type_schema")
and media_type_obj.media_type_schema
):
try:
schema_dict = self._extract_schema_as_dict(
media_type_obj.media_type_schema
)
request_body_info.content_schema[media_type_str] = (
schema_dict
)
except ValueError as e:
# Re-raise ValueError for external reference errors
if "External or non-local reference not supported" in str(
e
):
raise
logger.error(
f"Failed to extract schema for media type '{media_type_str}': {e}"
)
except Exception as e:
logger.error(
f"Failed to extract schema for media type '{media_type_str}': {e}"
)
return request_body_info
except ValueError as e:
# Re-raise ValueError for external reference errors
if "External or non-local reference not supported" in str(e):
raise
ref_name = getattr(request_body_or_ref, "ref", "unknown")
logger.error(
f"Failed to extract request body '{ref_name}': {e}", exc_info=False
)
return None
except Exception as e:
ref_name = getattr(request_body_or_ref, "ref", "unknown")
logger.error(
f"Failed to extract request body '{ref_name}': {e}", exc_info=False
)
return None
def _extract_responses(
self, operation_responses: dict[str, Any] | None
) -> dict[str, ResponseInfo]:
"""Extract and resolve response information."""
extracted_responses: dict[str, ResponseInfo] = {}
if not operation_responses:
return extracted_responses
for status_code, resp_or_ref in operation_responses.items():
try:
response = self._resolve_ref(resp_or_ref)
if not isinstance(response, self.response_cls):
logger.warning(
f"Expected Response after resolving for status code {status_code}, "
f"got {type(response)}. Skipping."
)
continue
# Create response info
resp_info = ResponseInfo(description=response.description)
# Extract content schemas
if hasattr(response, "content") and response.content:
for media_type_str, media_type_obj in response.content.items():
if (
media_type_obj
and hasattr(media_type_obj, "media_type_schema")
and media_type_obj.media_type_schema
):
try:
schema_dict = self._extract_schema_as_dict(
media_type_obj.media_type_schema
)
resp_info.content_schema[media_type_str] = schema_dict
except ValueError as e:
# Re-raise ValueError for external reference errors
if (
"External or non-local reference not supported"
in str(e)
):
raise
logger.error(
f"Failed to extract schema for media type '{media_type_str}' "
f"in response {status_code}: {e}"
)
except Exception as e:
logger.error(
f"Failed to extract schema for media type '{media_type_str}' "
f"in response {status_code}: {e}"
)
extracted_responses[str(status_code)] = resp_info
except ValueError as e:
# Re-raise ValueError for external reference errors
if "External or non-local reference not supported" in str(e):
raise
ref_name = getattr(resp_or_ref, "ref", "unknown")
logger.error(
f"Failed to extract response for status code {status_code} "
f"from reference '{ref_name}': {e}",
exc_info=False,
)
except Exception as e:
ref_name = getattr(resp_or_ref, "ref", "unknown")
logger.error(
f"Failed to extract response for status code {status_code} "
f"from reference '{ref_name}': {e}",
exc_info=False,
)
return extracted_responses
def parse(self) -> list[HTTPRoute]:
"""Parse the OpenAPI schema into HTTP routes."""
routes: list[HTTPRoute] = []
if not hasattr(self.openapi, "paths") or not self.openapi.paths:
logger.warning("OpenAPI schema has no paths defined.")
return []
# Extract component schemas
schema_definitions = {}
if hasattr(self.openapi, "components") and self.openapi.components:
components = self.openapi.components
if hasattr(components, "schemas") and components.schemas:
for name, schema in components.schemas.items():
try:
if isinstance(schema, self.reference_cls):
resolved_schema = self._resolve_ref(schema)
schema_definitions[name] = self._extract_schema_as_dict(
resolved_schema
)
else:
schema_definitions[name] = self._extract_schema_as_dict(
schema
)
except Exception as e:
logger.warning(
f"Failed to extract schema definition '{name}': {e}"
)
# Process paths and operations
for path_str, path_item_obj in self.openapi.paths.items():
if not isinstance(path_item_obj, self.path_item_cls):
logger.warning(
f"Skipping invalid path item for path '{path_str}' (type: {type(path_item_obj)})"
)
continue
path_level_params = (
path_item_obj.parameters
if hasattr(path_item_obj, "parameters")
else None
)
# Get HTTP methods from the path item class fields
http_methods = [
"get",
"put",
"post",
"delete",
"options",
"head",
"patch",
"trace",
]
for method_lower in http_methods:
operation = getattr(path_item_obj, method_lower, None)
if operation and isinstance(operation, self.operation_cls):
# Cast method to HttpMethod - safe since we only use valid HTTP methods
method_upper = method_lower.upper()
try:
parameters = self._extract_parameters(
getattr(operation, "parameters", None), path_level_params
)
request_body_info = self._extract_request_body(
getattr(operation, "requestBody", None)
)
responses = self._extract_responses(
getattr(operation, "responses", None)
)
extensions = {}
if hasattr(operation, "model_extra") and operation.model_extra:
extensions = {
k: v
for k, v in operation.model_extra.items()
if k.startswith("x-")
}
route = HTTPRoute(
path=path_str,
method=method_upper, # type: ignore[arg-type] # Known valid HTTP method
operation_id=getattr(operation, "operationId", None),
summary=getattr(operation, "summary", None),
description=getattr(operation, "description", None),
tags=getattr(operation, "tags", []) or [],
parameters=parameters,
request_body=request_body_info,
responses=responses,
schema_definitions=schema_definitions,
extensions=extensions,
openapi_version=self.openapi_version,
)
routes.append(route)
logger.debug(
f"Successfully extracted route: {method_upper} {path_str}"
)
except ValueError as op_error:
# Re-raise ValueError for external reference errors
if "External or non-local reference not supported" in str(
op_error
):
raise
op_id = getattr(operation, "operationId", "unknown")
logger.error(
f"Failed to process operation {method_upper} {path_str} (ID: {op_id}): {op_error}",
exc_info=True,
)
except Exception as op_error:
op_id = getattr(operation, "operationId", "unknown")
logger.error(
f"Failed to process operation {method_upper} {path_str} (ID: {op_id}): {op_error}",
exc_info=True,
)
logger.debug(f"Finished parsing. Extracted {len(routes)} HTTP routes.")
return routes
def clean_schema_for_display(schema: JsonSchema | None) -> JsonSchema | None:
"""
Clean up a schema dictionary for display by removing internal/complex fields.
"""
if not schema or not isinstance(schema, dict):
return schema
# Make a copy to avoid modifying the input schema
cleaned = schema.copy()
# Fields commonly removed for simpler display to LLMs or users
fields_to_remove = [
"allOf",
"anyOf",
"oneOf",
"not", # Composition keywords
"nullable", # Handled by type unions usually
"discriminator",
"readOnly",
"writeOnly",
"deprecated",
"xml",
"externalDocs",
# Can be verbose, maybe remove based on flag?
# "pattern", "minLength", "maxLength",
# "minimum", "maximum", "exclusiveMinimum", "exclusiveMaximum",
# "multipleOf", "minItems", "maxItems", "uniqueItems",
# "minProperties", "maxProperties"
]
for field in fields_to_remove:
if field in cleaned:
cleaned.pop(field)
# Recursively clean properties and items
if "properties" in cleaned:
cleaned["properties"] = {
k: clean_schema_for_display(v) for k, v in cleaned["properties"].items()
}
# Remove properties section if empty after cleaning
if not cleaned["properties"]:
cleaned.pop("properties")
if "items" in cleaned:
cleaned["items"] = clean_schema_for_display(cleaned["items"])
# Remove items section if empty after cleaning
if not cleaned["items"]:
cleaned.pop("items")
if "additionalProperties" in cleaned:
# Often verbose, can be simplified
if isinstance(cleaned["additionalProperties"], dict):
cleaned["additionalProperties"] = clean_schema_for_display(
cleaned["additionalProperties"]
)
elif cleaned["additionalProperties"] is True:
# Maybe keep 'true' or represent as 'Allows additional properties' text?
pass # Keep simple boolean for now
def generate_example_from_schema(schema: JsonSchema | None) -> Any:
"""
Generate a simple example value from a JSON schema dictionary.
Very basic implementation focusing on types.
"""
if not schema or not isinstance(schema, dict):
return "unknown" # Or None?
# Use default value if provided
if "default" in schema:
return schema["default"]
# Use first enum value if provided
if "enum" in schema and isinstance(schema["enum"], list) and schema["enum"]:
return schema["enum"][0]
# Use first example if provided
if (
"examples" in schema
and isinstance(schema["examples"], list)
and schema["examples"]
):
return schema["examples"][0]
if "example" in schema:
return schema["example"]
schema_type = schema.get("type")
if schema_type == "object":
result = {}
properties = schema.get("properties", {})
if isinstance(properties, dict):
# Generate example for first few properties or required ones? Limit complexity.
required_props = set(schema.get("required", []))
props_to_include = list(properties.keys())[
:3
] # Limit to first 3 for brevity
for prop_name in props_to_include:
if prop_name in properties:
result[prop_name] = generate_example_from_schema(
properties[prop_name]
)
# Ensure required props are present if possible
for req_prop in required_props:
if req_prop not in result and req_prop in properties:
result[req_prop] = generate_example_from_schema(
properties[req_prop]
)
return result if result else {"key": "value"} # Basic object if no props
elif schema_type == "array":
items_schema = schema.get("items")
if isinstance(items_schema, dict):
# Generate one example item
item_example = generate_example_from_schema(items_schema)
return [item_example] if item_example is not None else []
return ["example_item"] # Fallback
elif schema_type == "string":
format_type = schema.get("format")
if format_type == "date-time":
return "2024-01-01T12:00:00Z"
if format_type == "date":
return "2024-01-01"
if format_type == "email":
return "user@example.com"
if format_type == "uuid":
return "123e4567-e89b-12d3-a456-426614174000"
if format_type == "byte":
return "ZXhhbXBsZQ==" # "example" base64
return "string"
elif schema_type == "integer":
return 1
elif schema_type == "number":
return 1.5
elif schema_type == "boolean":
return True
elif schema_type == "null":
return None
# Fallback if type is unknown or missing
return "unknown_type"
def format_json_for_description(data: Any, indent: int = 2) -> str:
"""Formats Python data as a JSON string block for markdown."""
try:
json_str = json.dumps(data, indent=indent)
return f"```json\n{json_str}\n```"
except TypeError:
return f"```\nCould not serialize to JSON: {data}\n```"
def format_description_with_responses(
base_description: str,
responses: dict[
str, Any
], # Changed from specific ResponseInfo type to avoid circular imports
parameters: list[ParameterInfo] | None = None, # Add parameters parameter
request_body: RequestBodyInfo | None = None, # Add request_body parameter
) -> str:
"""
Formats the base description string with response, parameter, and request body information.
Args:
base_description (str): The initial description to be formatted.
responses (dict[str, Any]): A dictionary of response information, keyed by status code.
parameters (list[ParameterInfo] | None, optional): A list of parameter information,
including path and query parameters. Each parameter includes details such as name,
location, whether it is required, and a description.
request_body (RequestBodyInfo | None, optional): Information about the request body,
including its description, whether it is required, and its content schema.
Returns:
str: The formatted description string with additional details about responses, parameters,
and the request body.
"""
desc_parts = [base_description]
# Add parameter information
if parameters:
# Process path parameters
path_params = [p for p in parameters if p.location == "path"]
if path_params:
param_section = "\n\n**Path Parameters:**"
desc_parts.append(param_section)
for param in path_params:
required_marker = " (Required)" if param.required else ""
param_desc = f"\n- **{param.name}**{required_marker}: {param.description or 'No description.'}"
desc_parts.append(param_desc)
# Process query parameters
query_params = [p for p in parameters if p.location == "query"]
if query_params:
param_section = "\n\n**Query Parameters:**"
desc_parts.append(param_section)
for param in query_params:
required_marker = " (Required)" if param.required else ""
param_desc = f"\n- **{param.name}**{required_marker}: {param.description or 'No description.'}"
desc_parts.append(param_desc)
# Add request body information if present
if request_body and request_body.description:
req_body_section = "\n\n**Request Body:**"
desc_parts.append(req_body_section)
required_marker = " (Required)" if request_body.required else ""
desc_parts.append(f"\n{request_body.description}{required_marker}")
# Add request body property descriptions if available
if request_body.content_schema:
media_type = (
"application/json"
if "application/json" in request_body.content_schema
else next(iter(request_body.content_schema), None)
)
if media_type:
schema = request_body.content_schema.get(media_type, {})
if isinstance(schema, dict) and "properties" in schema:
desc_parts.append("\n\n**Request Properties:**")
for prop_name, prop_schema in schema["properties"].items():
if (
isinstance(prop_schema, dict)
and "description" in prop_schema
):
required = prop_name in schema.get("required", [])
req_mark = " (Required)" if required else ""
desc_parts.append(
f"\n- **{prop_name}**{req_mark}: {prop_schema['description']}"
)
# Add response information
if responses:
response_section = "\n\n**Responses:**"
added_response_section = False
# Determine success codes (common ones)
success_codes = {"200", "201", "202", "204"} # As strings
success_status = next((s for s in success_codes if s in responses), None)
# Process all responses
responses_to_process = responses.items()
for status_code, resp_info in sorted(responses_to_process):
if not added_response_section:
desc_parts.append(response_section)
added_response_section = True
status_marker = " (Success)" if status_code == success_status else ""
desc_parts.append(
f"\n- **{status_code}**{status_marker}: {resp_info.description or 'No description.'}"
)
# Process content schemas for this response
if resp_info.content_schema:
# Prioritize json, then take first available
media_type = (
"application/json"
if "application/json" in resp_info.content_schema
else next(iter(resp_info.content_schema), None)
)
if media_type:
schema = resp_info.content_schema.get(media_type)
desc_parts.append(f" - Content-Type: `{media_type}`")
# Add response property descriptions
if isinstance(schema, dict):
# Handle array responses
if schema.get("type") == "array" and "items" in schema:
items_schema = schema["items"]
if (
isinstance(items_schema, dict)
and "properties" in items_schema
):
desc_parts.append("\n - **Response Item Properties:**")
for prop_name, prop_schema in items_schema[
"properties"
].items():
if (
isinstance(prop_schema, dict)
and "description" in prop_schema
):
desc_parts.append(
f"\n - **{prop_name}**: {prop_schema['description']}"
)
# Handle object responses
elif "properties" in schema:
desc_parts.append("\n - **Response Properties:**")
for prop_name, prop_schema in schema["properties"].items():
if (
isinstance(prop_schema, dict)
and "description" in prop_schema
):
desc_parts.append(
f"\n - **{prop_name}**: {prop_schema['description']}"
)
# Generate Example
if schema:
example = generate_example_from_schema(schema)
if example != "unknown_type" and example is not None:
desc_parts.append("\n - **Example:**")
desc_parts.append(
format_json_for_description(example, indent=2)
)
return "\n".join(desc_parts)
def _replace_ref_with_defs(
info: dict[str, Any], description: str | None = None
) -> dict[str, Any]:
"""
Replace openapi $ref with jsonschema $defs
Examples:
- {"type": "object", "properties": {"$ref": "#/components/schemas/..."}}
- {"$ref": "#/components/schemas/..."}
- {"items": {"$ref": "#/components/schemas/..."}}
- {"anyOf": [{"$ref": "#/components/schemas/..."}]}
- {"allOf": [{"$ref": "#/components/schemas/..."}]}
- {"oneOf": [{"$ref": "#/components/schemas/..."}]}
Args:
info: dict[str, Any]
description: str | None
Returns:
dict[str, Any]
"""
schema = info.copy()
if ref_path := schema.get("$ref"):
if isinstance(ref_path, str):
if ref_path.startswith("#/components/schemas/"):
schema_name = ref_path.split("/")[-1]
schema["$ref"] = f"#/$defs/{schema_name}"
elif not ref_path.startswith("#/"):
raise ValueError(
f"External or non-local reference not supported: {ref_path}. "
f"FastMCP only supports local schema references starting with '#/'. "
f"Please include all schema definitions within the OpenAPI document."
)
elif properties := schema.get("properties"):
if "$ref" in properties:
schema["properties"] = _replace_ref_with_defs(properties)
else:
schema["properties"] = {
prop_name: _replace_ref_with_defs(prop_schema)
for prop_name, prop_schema in properties.items()
}
elif item_schema := schema.get("items"):
schema["items"] = _replace_ref_with_defs(item_schema)
for section in ["anyOf", "allOf", "oneOf"]:
for i, item in enumerate(schema.get(section, [])):
schema[section][i] = _replace_ref_with_defs(item)
if info.get("description", description) and not schema.get("description"):
schema["description"] = description
return schema
def _make_optional_parameter_nullable(schema: dict[str, Any]) -> dict[str, Any]:
"""
Make an optional parameter schema nullable to allow None values.
For optional parameters, we need to allow null values in addition to the
specified type to handle cases where None is passed for optional parameters.
"""
# If schema already has multiple types or is already nullable, don't modify
if "anyOf" in schema or "oneOf" in schema or "allOf" in schema:
return schema
# If it's already nullable (type includes null), don't modify
if isinstance(schema.get("type"), list) and "null" in schema["type"]:
return schema
# Create a new schema that allows null in addition to the original type
if "type" in schema:
original_type = schema["type"]
if isinstance(original_type, str):
# Single type - make it a union with null
# Optimize: avoid full schema copy by building directly
nested_non_nullable_schema = {
"type": original_type,
}
nullable_schema = {}
# Define type-specific properties that should move to nested schema
type_specific_properties = set()
if original_type == "array":
# https://json-schema.org/understanding-json-schema/reference/array
type_specific_properties = {
"items",
"prefixItems",
"unevaluatedItems",
"contains",
"minContains",
"maxContains",
"minItems",
"maxItems",
"uniqueItems",
}
elif original_type == "object":
# https://json-schema.org/understanding-json-schema/reference/object
type_specific_properties = {
"properties",
"patternProperties",
"additionalProperties",
"unevaluatedProperties",
"required",
"propertyNames",
"minProperties",
"maxProperties",
}
# Efficiently distribute properties without copying the entire schema
for key, value in schema.items():
if key == "type":
continue # Already handled
elif key in type_specific_properties:
nested_non_nullable_schema[key] = value
else:
nullable_schema[key] = value
nullable_schema["anyOf"] = [nested_non_nullable_schema, {"type": "null"}]
return nullable_schema
return schema
def _add_null_to_type(schema: dict[str, Any]) -> None:
"""Add 'null' to the schema's type field or handle oneOf/anyOf/allOf constructs if not already present."""
if "type" in schema:
current_type = schema["type"]
if isinstance(current_type, str):
# Convert string type to array with null
schema["type"] = [current_type, "null"]
elif isinstance(current_type, list):
# Add null to array if not already present
if "null" not in current_type:
schema["type"] = current_type + ["null"]
elif "oneOf" in schema:
# Convert oneOf to anyOf with null type
schema["anyOf"] = schema.pop("oneOf") + [{"type": "null"}]
elif "anyOf" in schema:
# Add null type to anyOf if not already present
if not any(item.get("type") == "null" for item in schema["anyOf"]):
schema["anyOf"].append({"type": "null"})
elif "allOf" in schema:
# For allOf, wrap in anyOf with null - this means (all conditions) OR null
schema["anyOf"] = [{"allOf": schema.pop("allOf")}, {"type": "null"}]
def _handle_nullable_fields(schema: dict[str, Any] | Any) -> dict[str, Any] | Any:
"""Convert OpenAPI nullable fields to JSON Schema format: {"type": "string",
"nullable": true} -> {"type": ["string", "null"]}"""
if not isinstance(schema, dict):
return schema
# Check if we need to modify anything first to avoid unnecessary copying
has_root_nullable_field = "nullable" in schema
has_root_nullable_true = (
has_root_nullable_field
and schema["nullable"]
and (
"type" in schema
or "oneOf" in schema
or "anyOf" in schema
or "allOf" in schema
)
)
has_property_nullable_field = False
if "properties" in schema:
for prop_schema in schema["properties"].values():
if isinstance(prop_schema, dict) and "nullable" in prop_schema:
has_property_nullable_field = True
break
# If no nullable fields at all, return original schema unchanged
if not has_root_nullable_field and not has_property_nullable_field:
return schema
# Only copy if we need to modify
result = schema.copy()
# Handle root level nullable - always remove the field, convert type if true
if has_root_nullable_field:
result.pop("nullable")
if has_root_nullable_true:
_add_null_to_type(result)
# Handle properties nullable fields
if has_property_nullable_field and "properties" in result:
for prop_name, prop_schema in result["properties"].items():
if isinstance(prop_schema, dict) and "nullable" in prop_schema:
nullable_value = prop_schema.pop("nullable")
if nullable_value and (
"type" in prop_schema
or "oneOf" in prop_schema
or "anyOf" in prop_schema
or "allOf" in prop_schema
):
_add_null_to_type(prop_schema)
return result
def _combine_schemas(route: HTTPRoute) -> dict[str, Any]:
"""
Combines parameter and request body schemas into a single schema.
Handles parameter name collisions by adding location suffixes.
Args:
route: HTTPRoute object
Returns:
Combined schema dictionary
"""
properties = {}
required = []
# First pass: collect parameter names by location and body properties
param_names_by_location = {
"path": set(),
"query": set(),
"header": set(),
"cookie": set(),
}
body_props = {}
for param in route.parameters:
param_names_by_location[param.location].add(param.name)
if route.request_body and route.request_body.content_schema:
content_type = next(iter(route.request_body.content_schema))
body_schema = _replace_ref_with_defs(
route.request_body.content_schema[content_type].copy(),
route.request_body.description,
)
body_props = body_schema.get("properties", {})
# Detect collisions: parameters that exist in both body and path/query/header
all_non_body_params = set()
for location_params in param_names_by_location.values():
all_non_body_params.update(location_params)
body_param_names = set(body_props.keys())
colliding_params = all_non_body_params & body_param_names
# Add parameters with suffixes for collisions
for param in route.parameters:
if param.name in colliding_params:
# Add suffix for non-body parameters when collision detected
suffixed_name = f"{param.name}__{param.location}"
if param.required:
required.append(suffixed_name)
# Add location info to description
param_schema = _replace_ref_with_defs(
param.schema_.copy(), param.description
)
original_desc = param_schema.get("description", "")
location_desc = f"({param.location.capitalize()} parameter)"
if original_desc:
param_schema["description"] = f"{original_desc} {location_desc}"
else:
param_schema["description"] = location_desc
# Don't make optional parameters nullable - they can simply be omitted
# The OpenAPI specification doesn't require optional parameters to accept null values
properties[suffixed_name] = param_schema
else:
# No collision, use original name
if param.required:
required.append(param.name)
param_schema = _replace_ref_with_defs(
param.schema_.copy(), param.description
)
# Don't make optional parameters nullable - they can simply be omitted
# The OpenAPI specification doesn't require optional parameters to accept null values
properties[param.name] = param_schema
# Add request body properties (no suffixes for body parameters)
if route.request_body and route.request_body.content_schema:
for prop_name, prop_schema in body_props.items():
properties[prop_name] = prop_schema
if route.request_body.required:
required.extend(body_schema.get("required", []))
result = {
"type": "object",
"properties": properties,
"required": required,
}
# Add schema definitions if available
if route.schema_definitions:
result["$defs"] = route.schema_definitions.copy()
# Use lightweight compression - prune additionalProperties and unused definitions
if result.get("additionalProperties") is False:
result.pop("additionalProperties")
# Remove unused definitions (lightweight approach - just check direct $ref usage)
if "$defs" in result:
used_refs = set()
def find_refs_in_value(value):
if isinstance(value, dict):
if "$ref" in value and isinstance(value["$ref"], str):
ref = value["$ref"]
if ref.startswith("#/$defs/"):
used_refs.add(ref.split("/")[-1])
for v in value.values():
find_refs_in_value(v)
elif isinstance(value, list):
for item in value:
find_refs_in_value(item)
# Find refs in the main schema (excluding $defs section)
for key, value in result.items():
if key != "$defs":
find_refs_in_value(value)
# Remove unused definitions
if used_refs:
result["$defs"] = {
name: def_schema
for name, def_schema in result["$defs"].items()
if name in used_refs
}
else:
result.pop("$defs")
return result
def _adjust_union_types(
schema: dict[str, Any] | list[Any],
) -> dict[str, Any] | list[Any]:
"""Recursively replace 'oneOf' with 'anyOf' in schema to handle overlapping unions."""
if isinstance(schema, dict):
# Optimize: only copy if we need to modify something
has_one_of = "oneOf" in schema
needs_recursive_processing = False
# Check if we need recursive processing
for v in schema.values():
if isinstance(v, dict | list):
needs_recursive_processing = True
break
# If nothing to change, return original
if not has_one_of and not needs_recursive_processing:
return schema
# Work on a copy only when modification is needed
result = schema.copy()
if has_one_of:
result["anyOf"] = result.pop("oneOf")
# Only recurse where needed
if needs_recursive_processing:
for k, v in result.items():
if isinstance(v, dict | list):
result[k] = _adjust_union_types(v)
return result
elif isinstance(schema, list):
return [_adjust_union_types(item) for item in schema]
return schema
def extract_output_schema_from_responses(
responses: dict[str, ResponseInfo],
schema_definitions: dict[str, Any] | None = None,
openapi_version: str | None = None,
) -> dict[str, Any] | None:
"""
Extract output schema from OpenAPI responses for use as MCP tool output schema.
This function finds the first successful response (200, 201, 202, 204) with a
JSON-compatible content type and extracts its schema. If the schema is not an
object type, it wraps it to comply with MCP requirements.
Args:
responses: Dictionary of ResponseInfo objects keyed by status code
schema_definitions: Optional schema definitions to include in the output schema
openapi_version: OpenAPI version string, used to optimize nullable field handling
Returns:
dict: MCP-compliant output schema with potential wrapping, or None if no suitable schema found
"""
if not responses:
return None
# Priority order for success status codes
success_codes = ["200", "201", "202", "204"]
# Find the first successful response
response_info = None
for status_code in success_codes:
if status_code in responses:
response_info = responses[status_code]
break
# If no explicit success codes, try any 2xx response
if response_info is None:
for status_code, resp_info in responses.items():
if status_code.startswith("2"):
response_info = resp_info
break
if response_info is None or not response_info.content_schema:
return None
# Prefer application/json, then fall back to other JSON-compatible types
json_compatible_types = [
"application/json",
"application/vnd.api+json",
"application/hal+json",
"application/ld+json",
"text/json",
]
schema = None
for content_type in json_compatible_types:
if content_type in response_info.content_schema:
schema = response_info.content_schema[content_type]
break
# If no JSON-compatible type found, try the first available content type
if schema is None and response_info.content_schema:
first_content_type = next(iter(response_info.content_schema))
schema = response_info.content_schema[first_content_type]
logger.debug(
f"Using non-JSON content type for output schema: {first_content_type}"
)
if not schema or not isinstance(schema, dict):
return None
# Clean and copy the schema
output_schema = schema.copy()
# If schema has a $ref, resolve it first before processing nullable fields
if "$ref" in output_schema and schema_definitions:
ref_path = output_schema["$ref"]
if ref_path.startswith("#/components/schemas/"):
schema_name = ref_path.split("/")[-1]
if schema_name in schema_definitions:
# Replace $ref with the actual schema definition
output_schema = schema_definitions[schema_name].copy()
# Handle OpenAPI nullable fields by converting them to JSON Schema format
# This prevents "None is not of type 'string'" validation errors
# Only needed for OpenAPI 3.0 - 3.1 uses standard JSON Schema null types
if openapi_version and openapi_version.startswith("3.0"):
output_schema = _handle_nullable_fields(output_schema)
# MCP requires output schemas to be objects. If this schema is not an object,
# we need to wrap it similar to how ParsedFunction.from_function() does it
if output_schema.get("type") != "object":
# Create a wrapped schema that contains the original schema under a "result" key
wrapped_schema = {
"type": "object",
"properties": {"result": output_schema},
"required": ["result"],
"x-fastmcp-wrap-result": True,
}
output_schema = wrapped_schema
# Add schema definitions if available and handle nullable fields in them
# Only add $defs if we didn't resolve the $ref inline above
if schema_definitions and "$ref" not in schema.copy():
processed_defs = {}
for def_name, def_schema in schema_definitions.items():
# Only handle nullable fields for OpenAPI 3.0 - 3.1 uses standard JSON Schema null types
if openapi_version and openapi_version.startswith("3.0"):
processed_defs[def_name] = _handle_nullable_fields(def_schema)
else:
processed_defs[def_name] = def_schema
output_schema["$defs"] = processed_defs
# Use lightweight compression - prune additionalProperties and unused definitions
if output_schema.get("additionalProperties") is False:
output_schema.pop("additionalProperties")
# Remove unused definitions (lightweight approach - just check direct $ref usage)
if "$defs" in output_schema:
used_refs = set()
def find_refs_in_value(value):
if isinstance(value, dict):
if "$ref" in value and isinstance(value["$ref"], str):
ref = value["$ref"]
if ref.startswith("#/$defs/"):
used_refs.add(ref.split("/")[-1])
for v in value.values():
find_refs_in_value(v)
elif isinstance(value, list):
for item in value:
find_refs_in_value(item)
# Find refs in the main schema (excluding $defs section)
for key, value in output_schema.items():
if key != "$defs":
find_refs_in_value(value)
# Remove unused definitions
if used_refs:
output_schema["$defs"] = {
name: def_schema
for name, def_schema in output_schema["$defs"].items()
if name in used_refs
}
else:
output_schema.pop("$defs")
# Adjust union types to handle overlapping unions
output_schema = cast(dict[str, Any], _adjust_union_types(output_schema))
return output_schema