"""
OpenAPI specification parser and loader.
"""
import json
import yaml
from typing import Any, Dict, List, Optional, Union
from urllib.parse import urlparse
import httpx
from pathlib import Path
from .models import (
OpenAPISpec, InfoObject, ServerInfo, ContactInfo, LicenseInfo,
APIOperation, Parameter, RequestBody, Response, SecurityScheme,
HTTPMethod
)
class OpenAPIParser:
"""Parser for OpenAPI specifications."""
def __init__(self, timeout: int = 30):
self.timeout = timeout
self.client = httpx.Client(timeout=timeout)
async def load_spec(self, spec_url: str) -> Dict[str, Any]:
"""Load OpenAPI specification from URL or file path."""
if self._is_url(spec_url):
return await self._load_from_url(spec_url)
else:
return self._load_from_file(spec_url)
def _is_url(self, path: str) -> bool:
"""Check if the path is a URL."""
parsed = urlparse(path)
return parsed.scheme in ("http", "https")
async def _load_from_url(self, url: str) -> Dict[str, Any]:
"""Load specification from a URL."""
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(url)
response.raise_for_status()
content_type = response.headers.get("content-type", "").lower()
if "json" in content_type:
return response.json()
elif "yaml" in content_type or "yml" in content_type:
return yaml.safe_load(response.text)
else:
# Try to parse as JSON first, then YAML
try:
return response.json()
except json.JSONDecodeError:
return yaml.safe_load(response.text)
def _load_from_file(self, file_path: str) -> Dict[str, Any]:
"""Load specification from a file."""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"OpenAPI spec file not found: {file_path}")
content = path.read_text(encoding="utf-8")
if path.suffix.lower() in (".yaml", ".yml"):
return yaml.safe_load(content)
else:
return json.loads(content)
def parse_spec(self, spec_data: Dict[str, Any]) -> OpenAPISpec:
"""Parse OpenAPI specification into structured format."""
# Parse info object
info_data = spec_data.get("info", {})
contact_data = info_data.get("contact", {})
license_data = info_data.get("license", {})
contact = None
if contact_data:
contact = ContactInfo(
name=contact_data.get("name"),
url=contact_data.get("url"),
email=contact_data.get("email"),
)
license_info = None
if license_data:
license_info = LicenseInfo(
name=license_data["name"],
url=license_data.get("url"),
)
info = InfoObject(
title=info_data.get("title", "API"),
version=info_data.get("version", "1.0.0"),
description=info_data.get("description"),
terms_of_service=info_data.get("termsOfService"),
contact=contact,
license=license_info,
)
# Parse servers
servers = []
for server_data in spec_data.get("servers", []):
servers.append(ServerInfo(
url=server_data["url"],
description=server_data.get("description"),
variables=server_data.get("variables", {}),
))
# Parse security schemes
security_schemes = {}
components = spec_data.get("components", {})
for name, scheme_data in components.get("securitySchemes", {}).items():
security_schemes[name] = SecurityScheme(
type=scheme_data["type"],
description=scheme_data.get("description"),
name=scheme_data.get("name"),
location=scheme_data.get("in"),
scheme=scheme_data.get("scheme"),
bearer_format=scheme_data.get("bearerFormat"),
flows=scheme_data.get("flows"),
)
# Parse operations
operations = []
paths = spec_data.get("paths", {})
for path, path_item in paths.items():
for method, operation_data in path_item.items():
if method.lower() not in [m.value for m in HTTPMethod]:
continue
operation = self._parse_operation(
path=path,
method=HTTPMethod(method.lower()),
operation_data=operation_data,
components=components
)
operations.append(operation)
return OpenAPISpec(
openapi_version=spec_data.get("openapi", "3.0.0"),
info=info,
servers=servers,
operations=operations,
security_schemes=security_schemes,
security=spec_data.get("security", []),
components=components,
tags=spec_data.get("tags", []),
external_docs=spec_data.get("externalDocs"),
)
def _parse_operation(
self,
path: str,
method: HTTPMethod,
operation_data: Dict[str, Any],
components: Dict[str, Any]
) -> APIOperation:
"""Parse a single operation from the OpenAPI spec."""
operation_id = operation_data.get("operationId")
if not operation_id:
# Generate operation ID from method and path
clean_path = path.replace("/", "_").replace("{", "").replace("}", "")
operation_id = f"{method.value}{clean_path}"
# Parse parameters
parameters = []
for param_data in operation_data.get("parameters", []):
param = self._parse_parameter(param_data, components)
parameters.append(param)
# Parse request body
request_body = None
if "requestBody" in operation_data:
request_body = self._parse_request_body(
operation_data["requestBody"], components
)
# Parse responses
responses = {}
for status, response_data in operation_data.get("responses", {}).items():
response = self._parse_response(response_data, components)
responses[status] = response
return APIOperation(
operation_id=operation_id,
method=method,
path=path,
summary=operation_data.get("summary"),
description=operation_data.get("description"),
tags=operation_data.get("tags", []),
parameters=parameters,
request_body=request_body,
responses=responses,
security=operation_data.get("security", []),
deprecated=operation_data.get("deprecated", False),
external_docs=operation_data.get("externalDocs"),
)
def _parse_parameter(
self, param_data: Dict[str, Any], components: Dict[str, Any]
) -> Parameter:
"""Parse a parameter from the OpenAPI spec."""
# Resolve $ref if present
if "$ref" in param_data:
ref_path = param_data["$ref"]
param_data = self._resolve_ref(ref_path, components)
schema = param_data.get("schema", {})
return Parameter(
name=param_data["name"],
param_type=param_data["in"],
required=param_data.get("required", False),
schema=schema,
description=param_data.get("description"),
example=param_data.get("example"),
)
def _parse_request_body(
self, body_data: Dict[str, Any], components: Dict[str, Any]
) -> RequestBody:
"""Parse a request body from the OpenAPI spec."""
# Resolve $ref if present
if "$ref" in body_data:
ref_path = body_data["$ref"]
body_data = self._resolve_ref(ref_path, components)
content = body_data.get("content", {})
# Get the first content type (prefer JSON)
content_type = "application/json"
schema = {}
if "application/json" in content:
content_type = "application/json"
schema = content["application/json"].get("schema", {})
elif content:
content_type = list(content.keys())[0]
schema = content[content_type].get("schema", {})
return RequestBody(
content_type=content_type,
schema=schema,
required=body_data.get("required", False),
description=body_data.get("description"),
)
def _parse_response(
self, response_data: Dict[str, Any], components: Dict[str, Any]
) -> Response:
"""Parse a response from the OpenAPI spec."""
# Resolve $ref if present
if "$ref" in response_data:
ref_path = response_data["$ref"]
response_data = self._resolve_ref(ref_path, components)
content = response_data.get("content", {})
content_type = None
schema = {}
if "application/json" in content:
content_type = "application/json"
schema = content["application/json"].get("schema", {})
elif content:
content_type = list(content.keys())[0]
schema = content[content_type].get("schema", {})
return Response(
status_code="200", # This will be set by the caller
description=response_data.get("description"),
content_type=content_type,
schema=schema,
)
def _resolve_ref(self, ref_path: str, components: Dict[str, Any]) -> Dict[str, Any]:
"""Resolve a $ref to its actual content."""
if not ref_path.startswith("#/"):
raise ValueError(f"External refs not supported: {ref_path}")
# Split the path and navigate through the components
path_parts = ref_path[2:].split("/") # Remove '#/'
current = components
for part in path_parts:
if part in current:
current = current[part]
else:
raise ValueError(f"Reference not found: {ref_path}")
return current