Skip to main content
Glama
jezweb

Australian Postcodes MCP Server

openapi.py63.3 kB
import json from typing import Any, Generic, Literal, TypeVar, cast from openapi_pydantic import ( OpenAPI, Operation, Parameter, PathItem, Reference, RequestBody, Response, Schema, ) # Import OpenAPI 3.0 models as well from openapi_pydantic.v3.v3_0 import OpenAPI as OpenAPI_30 from openapi_pydantic.v3.v3_0 import Operation as Operation_30 from openapi_pydantic.v3.v3_0 import Parameter as Parameter_30 from openapi_pydantic.v3.v3_0 import PathItem as PathItem_30 from openapi_pydantic.v3.v3_0 import Reference as Reference_30 from openapi_pydantic.v3.v3_0 import RequestBody as RequestBody_30 from openapi_pydantic.v3.v3_0 import Response as Response_30 from openapi_pydantic.v3.v3_0 import Schema as Schema_30 from pydantic import BaseModel, Field, ValidationError from fastmcp.utilities.logging import get_logger from fastmcp.utilities.types import FastMCPBaseModel logger = get_logger(__name__) # --- Intermediate Representation (IR) Definition --- # (IR models remain the same) HttpMethod = Literal[ "GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "TRACE" ] ParameterLocation = Literal["path", "query", "header", "cookie"] JsonSchema = dict[str, Any] def format_array_parameter( values: list, parameter_name: str, is_query_parameter: bool = False ) -> str | list: """ Format an array parameter according to OpenAPI specifications. Args: values: List of values to format parameter_name: Name of the parameter (for error messages) is_query_parameter: If True, can return list for explode=True behavior Returns: String (comma-separated) or list (for query params with explode=True) """ # For arrays of simple types (strings, numbers, etc.), join with commas if all(isinstance(item, str | int | float | bool) for item in values): return ",".join(str(v) for v in values) # For complex types, try to create a simpler representation try: # Try to create a simple string representation formatted_parts = [] for item in values: if isinstance(item, dict): # For objects, serialize key-value pairs item_parts = [] for k, v in item.items(): item_parts.append(f"{k}:{v}") formatted_parts.append(".".join(item_parts)) else: formatted_parts.append(str(item)) return ",".join(formatted_parts) except Exception as e: param_type = "query" if is_query_parameter else "path" logger.warning( f"Failed to format complex array {param_type} parameter '{parameter_name}': {e}" ) if is_query_parameter: # For query parameters, fallback to original list return values else: # For path parameters, fallback to string representation without Python syntax # Use str.translate() for efficient character removal translation_table = str.maketrans("", "", "[]'\"") str_value = str(values).translate(translation_table) return str_value def format_deep_object_parameter( param_value: dict, parameter_name: str ) -> dict[str, str]: """ Format a dictionary parameter for deepObject style serialization. According to OpenAPI 3.0 spec, deepObject style with explode=true serializes object properties as separate query parameters with bracket notation. For example: `{"id": "123", "type": "user"}` becomes `param[id]=123&param[type]=user`. Args: param_value: Dictionary value to format parameter_name: Name of the parameter Returns: Dictionary with bracketed parameter names as keys """ if not isinstance(param_value, dict): logger.warning( f"deepObject style parameter '{parameter_name}' expected dict, got {type(param_value)}" ) return {} result = {} for key, value in param_value.items(): # Format as param[key]=value bracketed_key = f"{parameter_name}[{key}]" result[bracketed_key] = str(value) return result class ParameterInfo(FastMCPBaseModel): """Represents a single parameter for an HTTP operation in our IR.""" name: str location: ParameterLocation # Mapped from 'in' field of openapi-pydantic Parameter required: bool = False schema_: JsonSchema = Field(..., alias="schema") # Target name in IR description: str | None = None explode: bool | None = None # OpenAPI explode property for array parameters style: str | None = None # OpenAPI style property for parameter serialization class RequestBodyInfo(FastMCPBaseModel): """Represents the request body for an HTTP operation in our IR.""" required: bool = False content_schema: dict[str, JsonSchema] = Field( default_factory=dict ) # Key: media type description: str | None = None class ResponseInfo(FastMCPBaseModel): """Represents response information in our IR.""" description: str | None = None # Store schema per media type, key is media type content_schema: dict[str, JsonSchema] = Field(default_factory=dict) class HTTPRoute(FastMCPBaseModel): """Intermediate Representation for a single OpenAPI operation.""" path: str method: HttpMethod operation_id: str | None = None summary: str | None = None description: str | None = None tags: list[str] = Field(default_factory=list) parameters: list[ParameterInfo] = Field(default_factory=list) request_body: RequestBodyInfo | None = None responses: dict[str, ResponseInfo] = Field( default_factory=dict ) # Key: status code str schema_definitions: dict[str, JsonSchema] = Field( default_factory=dict ) # Store component schemas extensions: dict[str, Any] = Field(default_factory=dict) openapi_version: str | None = None # Export public symbols __all__ = [ "HTTPRoute", "ParameterInfo", "RequestBodyInfo", "ResponseInfo", "HttpMethod", "ParameterLocation", "JsonSchema", "parse_openapi_to_http_routes", "extract_output_schema_from_responses", "format_deep_object_parameter", "_handle_nullable_fields", ] # Type variables for generic parser TOpenAPI = TypeVar("TOpenAPI", OpenAPI, OpenAPI_30) TSchema = TypeVar("TSchema", Schema, Schema_30) TReference = TypeVar("TReference", Reference, Reference_30) TParameter = TypeVar("TParameter", Parameter, Parameter_30) TRequestBody = TypeVar("TRequestBody", RequestBody, RequestBody_30) TResponse = TypeVar("TResponse", Response, Response_30) TOperation = TypeVar("TOperation", Operation, Operation_30) TPathItem = TypeVar("TPathItem", PathItem, PathItem_30) def parse_openapi_to_http_routes(openapi_dict: dict[str, Any]) -> list[HTTPRoute]: """ Parses an OpenAPI schema dictionary into a list of HTTPRoute objects using the openapi-pydantic library. Supports both OpenAPI 3.0.x and 3.1.x versions. """ # Check OpenAPI version to use appropriate model openapi_version = openapi_dict.get("openapi", "") try: if openapi_version.startswith("3.0"): # Use OpenAPI 3.0 models openapi_30 = OpenAPI_30.model_validate(openapi_dict) logger.debug( f"Successfully parsed OpenAPI 3.0 schema version: {openapi_30.openapi}" ) parser = OpenAPIParser( openapi_30, Reference_30, Schema_30, Parameter_30, RequestBody_30, Response_30, Operation_30, PathItem_30, openapi_version, ) return parser.parse() else: # Default to OpenAPI 3.1 models openapi_31 = OpenAPI.model_validate(openapi_dict) logger.debug( f"Successfully parsed OpenAPI 3.1 schema version: {openapi_31.openapi}" ) parser = OpenAPIParser( openapi_31, Reference, Schema, Parameter, RequestBody, Response, Operation, PathItem, openapi_version, ) return parser.parse() except ValidationError as e: logger.error(f"OpenAPI schema validation failed: {e}") error_details = e.errors() logger.error(f"Validation errors: {error_details}") raise ValueError(f"Invalid OpenAPI schema: {error_details}") from e class OpenAPIParser( Generic[ TOpenAPI, TReference, TSchema, TParameter, TRequestBody, TResponse, TOperation, TPathItem, ] ): """Unified parser for OpenAPI schemas with generic type parameters to handle both 3.0 and 3.1.""" def __init__( self, openapi: TOpenAPI, reference_cls: type[TReference], schema_cls: type[TSchema], parameter_cls: type[TParameter], request_body_cls: type[TRequestBody], response_cls: type[TResponse], operation_cls: type[TOperation], path_item_cls: type[TPathItem], openapi_version: str, ): """Initialize the parser with the OpenAPI schema and type classes.""" self.openapi = openapi self.reference_cls = reference_cls self.schema_cls = schema_cls self.parameter_cls = parameter_cls self.request_body_cls = request_body_cls self.response_cls = response_cls self.operation_cls = operation_cls self.path_item_cls = path_item_cls self.openapi_version = openapi_version def _convert_to_parameter_location(self, param_in: str) -> ParameterLocation: """Convert string parameter location to our ParameterLocation type.""" if param_in in ["path", "query", "header", "cookie"]: return param_in # type: ignore[return-value] # Safe cast since we checked values logger.warning(f"Unknown parameter location: {param_in}, defaulting to 'query'") return "query" # type: ignore[return-value] # Safe cast to default value def _resolve_ref(self, item: Any) -> Any: """Resolves a reference to its target definition.""" if isinstance(item, self.reference_cls): ref_str = item.ref try: if not ref_str.startswith("#/"): raise ValueError( f"External or non-local reference not supported: {ref_str}" ) parts = ref_str.strip("#/").split("/") target = self.openapi for part in parts: if part.isdigit() and isinstance(target, list): target = target[int(part)] elif isinstance(target, BaseModel): # Check class fields first, then model_extra if part in target.__class__.model_fields: target = getattr(target, part, None) elif target.model_extra and part in target.model_extra: target = target.model_extra[part] else: # Special handling for components if part == "components" and hasattr(target, "components"): target = getattr(target, "components") elif hasattr(target, part): # Fallback check target = getattr(target, part, None) else: target = None # Part not found elif isinstance(target, dict): target = target.get(part) else: raise ValueError( f"Cannot traverse part '{part}' in reference '{ref_str}'" ) if target is None: raise ValueError( f"Reference part '{part}' not found in path '{ref_str}'" ) # Handle nested references if isinstance(target, self.reference_cls): return self._resolve_ref(target) return target except (AttributeError, KeyError, IndexError, TypeError, ValueError) as e: raise ValueError(f"Failed to resolve reference '{ref_str}': {e}") from e return item def _extract_schema_as_dict(self, schema_obj: Any) -> JsonSchema: """Resolves a schema and returns it as a dictionary.""" try: resolved_schema = self._resolve_ref(schema_obj) if isinstance(resolved_schema, (self.schema_cls)): # Convert schema to dictionary result = resolved_schema.model_dump( mode="json", by_alias=True, exclude_none=True ) elif isinstance(resolved_schema, dict): result = resolved_schema else: logger.warning( f"Expected Schema after resolving, got {type(resolved_schema)}. Returning empty dict." ) result = {} return _replace_ref_with_defs(result) except ValueError as e: # Re-raise ValueError for external reference errors and other validation issues if "External or non-local reference not supported" in str(e): raise logger.error(f"Failed to extract schema as dict: {e}", exc_info=False) return {} except Exception as e: logger.error(f"Failed to extract schema as dict: {e}", exc_info=False) return {} def _extract_parameters( self, operation_params: list[Any] | None = None, path_item_params: list[Any] | None = None, ) -> list[ParameterInfo]: """Extract and resolve parameters from operation and path item.""" extracted_params: list[ParameterInfo] = [] seen_params: dict[ tuple[str, str], bool ] = {} # Use tuple of (name, location) as key all_params = (operation_params or []) + (path_item_params or []) for param_or_ref in all_params: try: parameter = self._resolve_ref(param_or_ref) if not isinstance(parameter, self.parameter_cls): logger.warning( f"Expected Parameter after resolving, got {type(parameter)}. Skipping." ) continue # Extract parameter info - handle both 3.0 and 3.1 parameter models param_in = parameter.param_in # Both use param_in # Handle enum or string parameter locations from enum import Enum param_in_str = ( param_in.value if isinstance(param_in, Enum) else param_in ) param_location = self._convert_to_parameter_location(param_in_str) param_schema_obj = parameter.param_schema # Both use param_schema # Skip duplicate parameters (same name and location) param_key = (parameter.name, param_in_str) if param_key in seen_params: continue seen_params[param_key] = True # Extract schema param_schema_dict = {} if param_schema_obj: # Process schema object param_schema_dict = self._extract_schema_as_dict(param_schema_obj) # Handle default value resolved_schema = self._resolve_ref(param_schema_obj) if ( not isinstance(resolved_schema, self.reference_cls) and hasattr(resolved_schema, "default") and resolved_schema.default is not None ): param_schema_dict["default"] = resolved_schema.default elif hasattr(parameter, "content") and parameter.content: # Handle content-based parameters first_media_type = next(iter(parameter.content.values()), None) if ( first_media_type and hasattr(first_media_type, "media_type_schema") and first_media_type.media_type_schema ): media_schema = first_media_type.media_type_schema param_schema_dict = self._extract_schema_as_dict(media_schema) # Handle default value in content schema resolved_media_schema = self._resolve_ref(media_schema) if ( not isinstance(resolved_media_schema, self.reference_cls) and hasattr(resolved_media_schema, "default") and resolved_media_schema.default is not None ): param_schema_dict["default"] = resolved_media_schema.default # Extract explode and style properties if present explode = getattr(parameter, "explode", None) style = getattr(parameter, "style", None) # Create parameter info object param_info = ParameterInfo( name=parameter.name, location=param_location, required=parameter.required, schema=param_schema_dict, description=parameter.description, explode=explode, style=style, ) extracted_params.append(param_info) except Exception as e: param_name = getattr( param_or_ref, "name", getattr(param_or_ref, "ref", "unknown") ) logger.error( f"Failed to extract parameter '{param_name}': {e}", exc_info=False ) return extracted_params def _extract_request_body(self, request_body_or_ref: Any) -> RequestBodyInfo | None: """Extract and resolve request body information.""" if not request_body_or_ref: return None try: request_body = self._resolve_ref(request_body_or_ref) if not isinstance(request_body, self.request_body_cls): logger.warning( f"Expected RequestBody after resolving, got {type(request_body)}. Returning None." ) return None # Create request body info request_body_info = RequestBodyInfo( required=request_body.required, description=request_body.description, ) # Extract content schemas if hasattr(request_body, "content") and request_body.content: for media_type_str, media_type_obj in request_body.content.items(): if ( media_type_obj and hasattr(media_type_obj, "media_type_schema") and media_type_obj.media_type_schema ): try: schema_dict = self._extract_schema_as_dict( media_type_obj.media_type_schema ) request_body_info.content_schema[media_type_str] = ( schema_dict ) except ValueError as e: # Re-raise ValueError for external reference errors if "External or non-local reference not supported" in str( e ): raise logger.error( f"Failed to extract schema for media type '{media_type_str}': {e}" ) except Exception as e: logger.error( f"Failed to extract schema for media type '{media_type_str}': {e}" ) return request_body_info except ValueError as e: # Re-raise ValueError for external reference errors if "External or non-local reference not supported" in str(e): raise ref_name = getattr(request_body_or_ref, "ref", "unknown") logger.error( f"Failed to extract request body '{ref_name}': {e}", exc_info=False ) return None except Exception as e: ref_name = getattr(request_body_or_ref, "ref", "unknown") logger.error( f"Failed to extract request body '{ref_name}': {e}", exc_info=False ) return None def _extract_responses( self, operation_responses: dict[str, Any] | None ) -> dict[str, ResponseInfo]: """Extract and resolve response information.""" extracted_responses: dict[str, ResponseInfo] = {} if not operation_responses: return extracted_responses for status_code, resp_or_ref in operation_responses.items(): try: response = self._resolve_ref(resp_or_ref) if not isinstance(response, self.response_cls): logger.warning( f"Expected Response after resolving for status code {status_code}, " f"got {type(response)}. Skipping." ) continue # Create response info resp_info = ResponseInfo(description=response.description) # Extract content schemas if hasattr(response, "content") and response.content: for media_type_str, media_type_obj in response.content.items(): if ( media_type_obj and hasattr(media_type_obj, "media_type_schema") and media_type_obj.media_type_schema ): try: schema_dict = self._extract_schema_as_dict( media_type_obj.media_type_schema ) resp_info.content_schema[media_type_str] = schema_dict except ValueError as e: # Re-raise ValueError for external reference errors if ( "External or non-local reference not supported" in str(e) ): raise logger.error( f"Failed to extract schema for media type '{media_type_str}' " f"in response {status_code}: {e}" ) except Exception as e: logger.error( f"Failed to extract schema for media type '{media_type_str}' " f"in response {status_code}: {e}" ) extracted_responses[str(status_code)] = resp_info except ValueError as e: # Re-raise ValueError for external reference errors if "External or non-local reference not supported" in str(e): raise ref_name = getattr(resp_or_ref, "ref", "unknown") logger.error( f"Failed to extract response for status code {status_code} " f"from reference '{ref_name}': {e}", exc_info=False, ) except Exception as e: ref_name = getattr(resp_or_ref, "ref", "unknown") logger.error( f"Failed to extract response for status code {status_code} " f"from reference '{ref_name}': {e}", exc_info=False, ) return extracted_responses def parse(self) -> list[HTTPRoute]: """Parse the OpenAPI schema into HTTP routes.""" routes: list[HTTPRoute] = [] if not hasattr(self.openapi, "paths") or not self.openapi.paths: logger.warning("OpenAPI schema has no paths defined.") return [] # Extract component schemas schema_definitions = {} if hasattr(self.openapi, "components") and self.openapi.components: components = self.openapi.components if hasattr(components, "schemas") and components.schemas: for name, schema in components.schemas.items(): try: if isinstance(schema, self.reference_cls): resolved_schema = self._resolve_ref(schema) schema_definitions[name] = self._extract_schema_as_dict( resolved_schema ) else: schema_definitions[name] = self._extract_schema_as_dict( schema ) except Exception as e: logger.warning( f"Failed to extract schema definition '{name}': {e}" ) # Process paths and operations for path_str, path_item_obj in self.openapi.paths.items(): if not isinstance(path_item_obj, self.path_item_cls): logger.warning( f"Skipping invalid path item for path '{path_str}' (type: {type(path_item_obj)})" ) continue path_level_params = ( path_item_obj.parameters if hasattr(path_item_obj, "parameters") else None ) # Get HTTP methods from the path item class fields http_methods = [ "get", "put", "post", "delete", "options", "head", "patch", "trace", ] for method_lower in http_methods: operation = getattr(path_item_obj, method_lower, None) if operation and isinstance(operation, self.operation_cls): # Cast method to HttpMethod - safe since we only use valid HTTP methods method_upper = method_lower.upper() try: parameters = self._extract_parameters( getattr(operation, "parameters", None), path_level_params ) request_body_info = self._extract_request_body( getattr(operation, "requestBody", None) ) responses = self._extract_responses( getattr(operation, "responses", None) ) extensions = {} if hasattr(operation, "model_extra") and operation.model_extra: extensions = { k: v for k, v in operation.model_extra.items() if k.startswith("x-") } route = HTTPRoute( path=path_str, method=method_upper, # type: ignore[arg-type] # Known valid HTTP method operation_id=getattr(operation, "operationId", None), summary=getattr(operation, "summary", None), description=getattr(operation, "description", None), tags=getattr(operation, "tags", []) or [], parameters=parameters, request_body=request_body_info, responses=responses, schema_definitions=schema_definitions, extensions=extensions, openapi_version=self.openapi_version, ) routes.append(route) logger.debug( f"Successfully extracted route: {method_upper} {path_str}" ) except ValueError as op_error: # Re-raise ValueError for external reference errors if "External or non-local reference not supported" in str( op_error ): raise op_id = getattr(operation, "operationId", "unknown") logger.error( f"Failed to process operation {method_upper} {path_str} (ID: {op_id}): {op_error}", exc_info=True, ) except Exception as op_error: op_id = getattr(operation, "operationId", "unknown") logger.error( f"Failed to process operation {method_upper} {path_str} (ID: {op_id}): {op_error}", exc_info=True, ) logger.debug(f"Finished parsing. Extracted {len(routes)} HTTP routes.") return routes def clean_schema_for_display(schema: JsonSchema | None) -> JsonSchema | None: """ Clean up a schema dictionary for display by removing internal/complex fields. """ if not schema or not isinstance(schema, dict): return schema # Make a copy to avoid modifying the input schema cleaned = schema.copy() # Fields commonly removed for simpler display to LLMs or users fields_to_remove = [ "allOf", "anyOf", "oneOf", "not", # Composition keywords "nullable", # Handled by type unions usually "discriminator", "readOnly", "writeOnly", "deprecated", "xml", "externalDocs", # Can be verbose, maybe remove based on flag? # "pattern", "minLength", "maxLength", # "minimum", "maximum", "exclusiveMinimum", "exclusiveMaximum", # "multipleOf", "minItems", "maxItems", "uniqueItems", # "minProperties", "maxProperties" ] for field in fields_to_remove: if field in cleaned: cleaned.pop(field) # Recursively clean properties and items if "properties" in cleaned: cleaned["properties"] = { k: clean_schema_for_display(v) for k, v in cleaned["properties"].items() } # Remove properties section if empty after cleaning if not cleaned["properties"]: cleaned.pop("properties") if "items" in cleaned: cleaned["items"] = clean_schema_for_display(cleaned["items"]) # Remove items section if empty after cleaning if not cleaned["items"]: cleaned.pop("items") if "additionalProperties" in cleaned: # Often verbose, can be simplified if isinstance(cleaned["additionalProperties"], dict): cleaned["additionalProperties"] = clean_schema_for_display( cleaned["additionalProperties"] ) elif cleaned["additionalProperties"] is True: # Maybe keep 'true' or represent as 'Allows additional properties' text? pass # Keep simple boolean for now def generate_example_from_schema(schema: JsonSchema | None) -> Any: """ Generate a simple example value from a JSON schema dictionary. Very basic implementation focusing on types. """ if not schema or not isinstance(schema, dict): return "unknown" # Or None? # Use default value if provided if "default" in schema: return schema["default"] # Use first enum value if provided if "enum" in schema and isinstance(schema["enum"], list) and schema["enum"]: return schema["enum"][0] # Use first example if provided if ( "examples" in schema and isinstance(schema["examples"], list) and schema["examples"] ): return schema["examples"][0] if "example" in schema: return schema["example"] schema_type = schema.get("type") if schema_type == "object": result = {} properties = schema.get("properties", {}) if isinstance(properties, dict): # Generate example for first few properties or required ones? Limit complexity. required_props = set(schema.get("required", [])) props_to_include = list(properties.keys())[ :3 ] # Limit to first 3 for brevity for prop_name in props_to_include: if prop_name in properties: result[prop_name] = generate_example_from_schema( properties[prop_name] ) # Ensure required props are present if possible for req_prop in required_props: if req_prop not in result and req_prop in properties: result[req_prop] = generate_example_from_schema( properties[req_prop] ) return result if result else {"key": "value"} # Basic object if no props elif schema_type == "array": items_schema = schema.get("items") if isinstance(items_schema, dict): # Generate one example item item_example = generate_example_from_schema(items_schema) return [item_example] if item_example is not None else [] return ["example_item"] # Fallback elif schema_type == "string": format_type = schema.get("format") if format_type == "date-time": return "2024-01-01T12:00:00Z" if format_type == "date": return "2024-01-01" if format_type == "email": return "user@example.com" if format_type == "uuid": return "123e4567-e89b-12d3-a456-426614174000" if format_type == "byte": return "ZXhhbXBsZQ==" # "example" base64 return "string" elif schema_type == "integer": return 1 elif schema_type == "number": return 1.5 elif schema_type == "boolean": return True elif schema_type == "null": return None # Fallback if type is unknown or missing return "unknown_type" def format_json_for_description(data: Any, indent: int = 2) -> str: """Formats Python data as a JSON string block for markdown.""" try: json_str = json.dumps(data, indent=indent) return f"```json\n{json_str}\n```" except TypeError: return f"```\nCould not serialize to JSON: {data}\n```" def format_description_with_responses( base_description: str, responses: dict[ str, Any ], # Changed from specific ResponseInfo type to avoid circular imports parameters: list[ParameterInfo] | None = None, # Add parameters parameter request_body: RequestBodyInfo | None = None, # Add request_body parameter ) -> str: """ Formats the base description string with response, parameter, and request body information. Args: base_description (str): The initial description to be formatted. responses (dict[str, Any]): A dictionary of response information, keyed by status code. parameters (list[ParameterInfo] | None, optional): A list of parameter information, including path and query parameters. Each parameter includes details such as name, location, whether it is required, and a description. request_body (RequestBodyInfo | None, optional): Information about the request body, including its description, whether it is required, and its content schema. Returns: str: The formatted description string with additional details about responses, parameters, and the request body. """ desc_parts = [base_description] # Add parameter information if parameters: # Process path parameters path_params = [p for p in parameters if p.location == "path"] if path_params: param_section = "\n\n**Path Parameters:**" desc_parts.append(param_section) for param in path_params: required_marker = " (Required)" if param.required else "" param_desc = f"\n- **{param.name}**{required_marker}: {param.description or 'No description.'}" desc_parts.append(param_desc) # Process query parameters query_params = [p for p in parameters if p.location == "query"] if query_params: param_section = "\n\n**Query Parameters:**" desc_parts.append(param_section) for param in query_params: required_marker = " (Required)" if param.required else "" param_desc = f"\n- **{param.name}**{required_marker}: {param.description or 'No description.'}" desc_parts.append(param_desc) # Add request body information if present if request_body and request_body.description: req_body_section = "\n\n**Request Body:**" desc_parts.append(req_body_section) required_marker = " (Required)" if request_body.required else "" desc_parts.append(f"\n{request_body.description}{required_marker}") # Add request body property descriptions if available if request_body.content_schema: media_type = ( "application/json" if "application/json" in request_body.content_schema else next(iter(request_body.content_schema), None) ) if media_type: schema = request_body.content_schema.get(media_type, {}) if isinstance(schema, dict) and "properties" in schema: desc_parts.append("\n\n**Request Properties:**") for prop_name, prop_schema in schema["properties"].items(): if ( isinstance(prop_schema, dict) and "description" in prop_schema ): required = prop_name in schema.get("required", []) req_mark = " (Required)" if required else "" desc_parts.append( f"\n- **{prop_name}**{req_mark}: {prop_schema['description']}" ) # Add response information if responses: response_section = "\n\n**Responses:**" added_response_section = False # Determine success codes (common ones) success_codes = {"200", "201", "202", "204"} # As strings success_status = next((s for s in success_codes if s in responses), None) # Process all responses responses_to_process = responses.items() for status_code, resp_info in sorted(responses_to_process): if not added_response_section: desc_parts.append(response_section) added_response_section = True status_marker = " (Success)" if status_code == success_status else "" desc_parts.append( f"\n- **{status_code}**{status_marker}: {resp_info.description or 'No description.'}" ) # Process content schemas for this response if resp_info.content_schema: # Prioritize json, then take first available media_type = ( "application/json" if "application/json" in resp_info.content_schema else next(iter(resp_info.content_schema), None) ) if media_type: schema = resp_info.content_schema.get(media_type) desc_parts.append(f" - Content-Type: `{media_type}`") # Add response property descriptions if isinstance(schema, dict): # Handle array responses if schema.get("type") == "array" and "items" in schema: items_schema = schema["items"] if ( isinstance(items_schema, dict) and "properties" in items_schema ): desc_parts.append("\n - **Response Item Properties:**") for prop_name, prop_schema in items_schema[ "properties" ].items(): if ( isinstance(prop_schema, dict) and "description" in prop_schema ): desc_parts.append( f"\n - **{prop_name}**: {prop_schema['description']}" ) # Handle object responses elif "properties" in schema: desc_parts.append("\n - **Response Properties:**") for prop_name, prop_schema in schema["properties"].items(): if ( isinstance(prop_schema, dict) and "description" in prop_schema ): desc_parts.append( f"\n - **{prop_name}**: {prop_schema['description']}" ) # Generate Example if schema: example = generate_example_from_schema(schema) if example != "unknown_type" and example is not None: desc_parts.append("\n - **Example:**") desc_parts.append( format_json_for_description(example, indent=2) ) return "\n".join(desc_parts) def _replace_ref_with_defs( info: dict[str, Any], description: str | None = None ) -> dict[str, Any]: """ Replace openapi $ref with jsonschema $defs Examples: - {"type": "object", "properties": {"$ref": "#/components/schemas/..."}} - {"$ref": "#/components/schemas/..."} - {"items": {"$ref": "#/components/schemas/..."}} - {"anyOf": [{"$ref": "#/components/schemas/..."}]} - {"allOf": [{"$ref": "#/components/schemas/..."}]} - {"oneOf": [{"$ref": "#/components/schemas/..."}]} Args: info: dict[str, Any] description: str | None Returns: dict[str, Any] """ schema = info.copy() if ref_path := schema.get("$ref"): if isinstance(ref_path, str): if ref_path.startswith("#/components/schemas/"): schema_name = ref_path.split("/")[-1] schema["$ref"] = f"#/$defs/{schema_name}" elif not ref_path.startswith("#/"): raise ValueError( f"External or non-local reference not supported: {ref_path}. " f"FastMCP only supports local schema references starting with '#/'. " f"Please include all schema definitions within the OpenAPI document." ) elif properties := schema.get("properties"): if "$ref" in properties: schema["properties"] = _replace_ref_with_defs(properties) else: schema["properties"] = { prop_name: _replace_ref_with_defs(prop_schema) for prop_name, prop_schema in properties.items() } elif item_schema := schema.get("items"): schema["items"] = _replace_ref_with_defs(item_schema) for section in ["anyOf", "allOf", "oneOf"]: for i, item in enumerate(schema.get(section, [])): schema[section][i] = _replace_ref_with_defs(item) if info.get("description", description) and not schema.get("description"): schema["description"] = description return schema def _make_optional_parameter_nullable(schema: dict[str, Any]) -> dict[str, Any]: """ Make an optional parameter schema nullable to allow None values. For optional parameters, we need to allow null values in addition to the specified type to handle cases where None is passed for optional parameters. """ # If schema already has multiple types or is already nullable, don't modify if "anyOf" in schema or "oneOf" in schema or "allOf" in schema: return schema # If it's already nullable (type includes null), don't modify if isinstance(schema.get("type"), list) and "null" in schema["type"]: return schema # Create a new schema that allows null in addition to the original type if "type" in schema: original_type = schema["type"] if isinstance(original_type, str): # Single type - make it a union with null # Optimize: avoid full schema copy by building directly nested_non_nullable_schema = { "type": original_type, } nullable_schema = {} # Define type-specific properties that should move to nested schema type_specific_properties = set() if original_type == "array": # https://json-schema.org/understanding-json-schema/reference/array type_specific_properties = { "items", "prefixItems", "unevaluatedItems", "contains", "minContains", "maxContains", "minItems", "maxItems", "uniqueItems", } elif original_type == "object": # https://json-schema.org/understanding-json-schema/reference/object type_specific_properties = { "properties", "patternProperties", "additionalProperties", "unevaluatedProperties", "required", "propertyNames", "minProperties", "maxProperties", } # Efficiently distribute properties without copying the entire schema for key, value in schema.items(): if key == "type": continue # Already handled elif key in type_specific_properties: nested_non_nullable_schema[key] = value else: nullable_schema[key] = value nullable_schema["anyOf"] = [nested_non_nullable_schema, {"type": "null"}] return nullable_schema return schema def _add_null_to_type(schema: dict[str, Any]) -> None: """Add 'null' to the schema's type field or handle oneOf/anyOf/allOf constructs if not already present.""" if "type" in schema: current_type = schema["type"] if isinstance(current_type, str): # Convert string type to array with null schema["type"] = [current_type, "null"] elif isinstance(current_type, list): # Add null to array if not already present if "null" not in current_type: schema["type"] = current_type + ["null"] elif "oneOf" in schema: # Convert oneOf to anyOf with null type schema["anyOf"] = schema.pop("oneOf") + [{"type": "null"}] elif "anyOf" in schema: # Add null type to anyOf if not already present if not any(item.get("type") == "null" for item in schema["anyOf"]): schema["anyOf"].append({"type": "null"}) elif "allOf" in schema: # For allOf, wrap in anyOf with null - this means (all conditions) OR null schema["anyOf"] = [{"allOf": schema.pop("allOf")}, {"type": "null"}] def _handle_nullable_fields(schema: dict[str, Any] | Any) -> dict[str, Any] | Any: """Convert OpenAPI nullable fields to JSON Schema format: {"type": "string", "nullable": true} -> {"type": ["string", "null"]}""" if not isinstance(schema, dict): return schema # Check if we need to modify anything first to avoid unnecessary copying has_root_nullable_field = "nullable" in schema has_root_nullable_true = ( has_root_nullable_field and schema["nullable"] and ( "type" in schema or "oneOf" in schema or "anyOf" in schema or "allOf" in schema ) ) has_property_nullable_field = False if "properties" in schema: for prop_schema in schema["properties"].values(): if isinstance(prop_schema, dict) and "nullable" in prop_schema: has_property_nullable_field = True break # If no nullable fields at all, return original schema unchanged if not has_root_nullable_field and not has_property_nullable_field: return schema # Only copy if we need to modify result = schema.copy() # Handle root level nullable - always remove the field, convert type if true if has_root_nullable_field: result.pop("nullable") if has_root_nullable_true: _add_null_to_type(result) # Handle properties nullable fields if has_property_nullable_field and "properties" in result: for prop_name, prop_schema in result["properties"].items(): if isinstance(prop_schema, dict) and "nullable" in prop_schema: nullable_value = prop_schema.pop("nullable") if nullable_value and ( "type" in prop_schema or "oneOf" in prop_schema or "anyOf" in prop_schema or "allOf" in prop_schema ): _add_null_to_type(prop_schema) return result def _combine_schemas(route: HTTPRoute) -> dict[str, Any]: """ Combines parameter and request body schemas into a single schema. Handles parameter name collisions by adding location suffixes. Args: route: HTTPRoute object Returns: Combined schema dictionary """ properties = {} required = [] # First pass: collect parameter names by location and body properties param_names_by_location = { "path": set(), "query": set(), "header": set(), "cookie": set(), } body_props = {} for param in route.parameters: param_names_by_location[param.location].add(param.name) if route.request_body and route.request_body.content_schema: content_type = next(iter(route.request_body.content_schema)) body_schema = _replace_ref_with_defs( route.request_body.content_schema[content_type].copy(), route.request_body.description, ) body_props = body_schema.get("properties", {}) # Detect collisions: parameters that exist in both body and path/query/header all_non_body_params = set() for location_params in param_names_by_location.values(): all_non_body_params.update(location_params) body_param_names = set(body_props.keys()) colliding_params = all_non_body_params & body_param_names # Add parameters with suffixes for collisions for param in route.parameters: if param.name in colliding_params: # Add suffix for non-body parameters when collision detected suffixed_name = f"{param.name}__{param.location}" if param.required: required.append(suffixed_name) # Add location info to description param_schema = _replace_ref_with_defs( param.schema_.copy(), param.description ) original_desc = param_schema.get("description", "") location_desc = f"({param.location.capitalize()} parameter)" if original_desc: param_schema["description"] = f"{original_desc} {location_desc}" else: param_schema["description"] = location_desc # Don't make optional parameters nullable - they can simply be omitted # The OpenAPI specification doesn't require optional parameters to accept null values properties[suffixed_name] = param_schema else: # No collision, use original name if param.required: required.append(param.name) param_schema = _replace_ref_with_defs( param.schema_.copy(), param.description ) # Don't make optional parameters nullable - they can simply be omitted # The OpenAPI specification doesn't require optional parameters to accept null values properties[param.name] = param_schema # Add request body properties (no suffixes for body parameters) if route.request_body and route.request_body.content_schema: for prop_name, prop_schema in body_props.items(): properties[prop_name] = prop_schema if route.request_body.required: required.extend(body_schema.get("required", [])) result = { "type": "object", "properties": properties, "required": required, } # Add schema definitions if available if route.schema_definitions: result["$defs"] = route.schema_definitions.copy() # Use lightweight compression - prune additionalProperties and unused definitions if result.get("additionalProperties") is False: result.pop("additionalProperties") # Remove unused definitions (lightweight approach - just check direct $ref usage) if "$defs" in result: used_refs = set() def find_refs_in_value(value): if isinstance(value, dict): if "$ref" in value and isinstance(value["$ref"], str): ref = value["$ref"] if ref.startswith("#/$defs/"): used_refs.add(ref.split("/")[-1]) for v in value.values(): find_refs_in_value(v) elif isinstance(value, list): for item in value: find_refs_in_value(item) # Find refs in the main schema (excluding $defs section) for key, value in result.items(): if key != "$defs": find_refs_in_value(value) # Remove unused definitions if used_refs: result["$defs"] = { name: def_schema for name, def_schema in result["$defs"].items() if name in used_refs } else: result.pop("$defs") return result def _adjust_union_types( schema: dict[str, Any] | list[Any], ) -> dict[str, Any] | list[Any]: """Recursively replace 'oneOf' with 'anyOf' in schema to handle overlapping unions.""" if isinstance(schema, dict): # Optimize: only copy if we need to modify something has_one_of = "oneOf" in schema needs_recursive_processing = False # Check if we need recursive processing for v in schema.values(): if isinstance(v, dict | list): needs_recursive_processing = True break # If nothing to change, return original if not has_one_of and not needs_recursive_processing: return schema # Work on a copy only when modification is needed result = schema.copy() if has_one_of: result["anyOf"] = result.pop("oneOf") # Only recurse where needed if needs_recursive_processing: for k, v in result.items(): if isinstance(v, dict | list): result[k] = _adjust_union_types(v) return result elif isinstance(schema, list): return [_adjust_union_types(item) for item in schema] return schema def extract_output_schema_from_responses( responses: dict[str, ResponseInfo], schema_definitions: dict[str, Any] | None = None, openapi_version: str | None = None, ) -> dict[str, Any] | None: """ Extract output schema from OpenAPI responses for use as MCP tool output schema. This function finds the first successful response (200, 201, 202, 204) with a JSON-compatible content type and extracts its schema. If the schema is not an object type, it wraps it to comply with MCP requirements. Args: responses: Dictionary of ResponseInfo objects keyed by status code schema_definitions: Optional schema definitions to include in the output schema openapi_version: OpenAPI version string, used to optimize nullable field handling Returns: dict: MCP-compliant output schema with potential wrapping, or None if no suitable schema found """ if not responses: return None # Priority order for success status codes success_codes = ["200", "201", "202", "204"] # Find the first successful response response_info = None for status_code in success_codes: if status_code in responses: response_info = responses[status_code] break # If no explicit success codes, try any 2xx response if response_info is None: for status_code, resp_info in responses.items(): if status_code.startswith("2"): response_info = resp_info break if response_info is None or not response_info.content_schema: return None # Prefer application/json, then fall back to other JSON-compatible types json_compatible_types = [ "application/json", "application/vnd.api+json", "application/hal+json", "application/ld+json", "text/json", ] schema = None for content_type in json_compatible_types: if content_type in response_info.content_schema: schema = response_info.content_schema[content_type] break # If no JSON-compatible type found, try the first available content type if schema is None and response_info.content_schema: first_content_type = next(iter(response_info.content_schema)) schema = response_info.content_schema[first_content_type] logger.debug( f"Using non-JSON content type for output schema: {first_content_type}" ) if not schema or not isinstance(schema, dict): return None # Clean and copy the schema output_schema = schema.copy() # If schema has a $ref, resolve it first before processing nullable fields if "$ref" in output_schema and schema_definitions: ref_path = output_schema["$ref"] if ref_path.startswith("#/components/schemas/"): schema_name = ref_path.split("/")[-1] if schema_name in schema_definitions: # Replace $ref with the actual schema definition output_schema = schema_definitions[schema_name].copy() # Handle OpenAPI nullable fields by converting them to JSON Schema format # This prevents "None is not of type 'string'" validation errors # Only needed for OpenAPI 3.0 - 3.1 uses standard JSON Schema null types if openapi_version and openapi_version.startswith("3.0"): output_schema = _handle_nullable_fields(output_schema) # MCP requires output schemas to be objects. If this schema is not an object, # we need to wrap it similar to how ParsedFunction.from_function() does it if output_schema.get("type") != "object": # Create a wrapped schema that contains the original schema under a "result" key wrapped_schema = { "type": "object", "properties": {"result": output_schema}, "required": ["result"], "x-fastmcp-wrap-result": True, } output_schema = wrapped_schema # Add schema definitions if available and handle nullable fields in them # Only add $defs if we didn't resolve the $ref inline above if schema_definitions and "$ref" not in schema.copy(): processed_defs = {} for def_name, def_schema in schema_definitions.items(): # Only handle nullable fields for OpenAPI 3.0 - 3.1 uses standard JSON Schema null types if openapi_version and openapi_version.startswith("3.0"): processed_defs[def_name] = _handle_nullable_fields(def_schema) else: processed_defs[def_name] = def_schema output_schema["$defs"] = processed_defs # Use lightweight compression - prune additionalProperties and unused definitions if output_schema.get("additionalProperties") is False: output_schema.pop("additionalProperties") # Remove unused definitions (lightweight approach - just check direct $ref usage) if "$defs" in output_schema: used_refs = set() def find_refs_in_value(value): if isinstance(value, dict): if "$ref" in value and isinstance(value["$ref"], str): ref = value["$ref"] if ref.startswith("#/$defs/"): used_refs.add(ref.split("/")[-1]) for v in value.values(): find_refs_in_value(v) elif isinstance(value, list): for item in value: find_refs_in_value(item) # Find refs in the main schema (excluding $defs section) for key, value in output_schema.items(): if key != "$defs": find_refs_in_value(value) # Remove unused definitions if used_refs: output_schema["$defs"] = { name: def_schema for name, def_schema in output_schema["$defs"].items() if name in used_refs } else: output_schema.pop("$defs") # Adjust union types to handle overlapping unions output_schema = cast(dict[str, Any], _adjust_union_types(output_schema)) return output_schema

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jezweb/australian-postcodes-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server