#!/usr/bin/env python3
"""
Download OpenAPI specification files and recursively resolve all external $ref references.
This script:
1. Loads an OpenAPI spec file (local or remote)
2. Finds all external $ref references (HTTP/HTTPS URLs)
3. Downloads the referenced files
4. Recursively processes downloaded files for their external refs
5. Replaces all external URLs with local relative paths
6. Optionally merges all files into a single self-contained spec
7. Validates the resulting OpenAPI spec
8. Saves the self-contained spec and all dependencies locally
"""
import json
import sys
from pathlib import Path
from typing import Any, Dict, Set
from urllib.parse import urlparse
from urllib.request import urlopen
try:
import yaml
YAML_AVAILABLE = True
except ImportError:
YAML_AVAILABLE = False
try:
from openapi_spec_validator import validate
from openapi_spec_validator.readers import read_from_filename
VALIDATOR_AVAILABLE = True
except ImportError:
VALIDATOR_AVAILABLE = False
def is_external_ref(ref: str) -> bool:
"""Check if a $ref is an external URL."""
return ref.startswith("http://") or ref.startswith("https://")
def is_local_file_ref(ref: str) -> bool:
"""Check if a $ref points to a local file (not starting with #)."""
return not ref.startswith("#") and not is_external_ref(ref)
def extract_refs(data: Any, refs: Set[str]) -> None:
"""Recursively extract all $ref values from the OpenAPI spec."""
if isinstance(data, dict):
if "$ref" in data:
ref_value = data["$ref"]
if is_external_ref(ref_value):
# Strip fragment (e.g., #/components/schemas/Foo)
base_url = ref_value.split("#")[0]
refs.add(base_url)
for value in data.values():
extract_refs(value, refs)
elif isinstance(data, list):
for item in data:
extract_refs(item, refs)
def download_file(url: str, output_dir: Path, url_to_local: Dict[str, str]) -> Path:
"""Download a file from URL and save it to output directory."""
# Check if already downloaded
if url in url_to_local:
return output_dir / url_to_local[url]
parsed = urlparse(url)
filename = Path(parsed.path).name
# Ensure unique filename
output_path = output_dir / filename
counter = 1
while output_path.exists():
stem = Path(filename).stem
suffix = Path(filename).suffix
output_path = output_dir / f"{stem}_{counter}{suffix}"
counter += 1
print(f" Downloading {url} -> {output_path.name}")
with urlopen(url) as response:
content = response.read()
output_path.write_bytes(content)
# Register the mapping
url_to_local[url] = output_path.name
return output_path
def replace_refs(data: Any, url_to_local: Dict[str, str]) -> Any:
"""Recursively replace external $ref URLs with local paths."""
if isinstance(data, dict):
result = {}
for key, value in data.items():
if key == "$ref" and is_external_ref(value):
# Split URL and fragment
parts = value.split("#", 1)
base_url = parts[0]
fragment = f"#{parts[1]}" if len(parts) > 1 else ""
# Replace with local path
if base_url in url_to_local:
result[key] = url_to_local[base_url] + fragment
else:
result[key] = value # Keep original if not found
else:
result[key] = replace_refs(value, url_to_local)
return result
elif isinstance(data, list):
return [replace_refs(item, url_to_local) for item in data]
else:
return data
def process_file_recursively(
file_path: Path,
output_dir: Path,
url_to_local: Dict[str, str],
processed_files: Set[str],
) -> None:
"""Recursively process a file and all its external references."""
file_key = str(file_path)
if file_key in processed_files:
return
processed_files.add(file_key)
# Load the file
with open(file_path) as f:
spec = json.load(f)
# Extract external references
refs: Set[str] = set()
extract_refs(spec, refs)
# Download and process each referenced file
for url in refs:
try:
local_path = download_file(url, output_dir, url_to_local)
# Recursively process the downloaded file
process_file_recursively(local_path, output_dir, url_to_local, processed_files)
except Exception as e:
print(f" ERROR downloading {url}: {e}")
continue
# Replace external refs with local paths
updated_spec = replace_refs(spec, url_to_local)
# Save the updated file
with open(file_path, "w") as f:
json.dump(updated_spec, f, indent=2)
def merge_components(main_spec: Dict[str, Any], file_path: Path) -> None:
"""Merge components from a referenced file into the main spec."""
with open(file_path) as f:
ref_spec = json.load(f)
# Initialize components section if it doesn't exist
if "components" not in main_spec:
main_spec["components"] = {}
# Merge each component type
if "components" in ref_spec:
for component_type, components in ref_spec["components"].items():
if component_type not in main_spec["components"]:
main_spec["components"][component_type] = {}
# Keep original component names (no prefixing)
for name, definition in components.items():
# If there's a conflict, the first definition wins
if name not in main_spec["components"][component_type]:
main_spec["components"][component_type][name] = definition
def inline_local_refs(data: Any, filename_map: Dict[str, str]) -> Any:
"""Replace local file refs with internal component refs."""
if isinstance(data, dict):
result = {}
for key, value in data.items():
if key == "$ref" and is_local_file_ref(value):
# Parse: "users.json#/components/schemas/Minified_User"
parts = value.split("#", 1)
filename = parts[0]
fragment = parts[1] if len(parts) > 1 else ""
if filename in filename_map and fragment:
# Keep the original component reference, just remove the filename part
# "users.json#/components/schemas/Minified_User" -> "#/components/schemas/Minified_User"
result[key] = f"#{fragment}"
else:
result[key] = value # Keep original if file not found
else:
result[key] = inline_local_refs(value, filename_map)
return result
elif isinstance(data, list):
return [inline_local_refs(item, filename_map) for item in data]
else:
return data
def patch_openapi_spec(spec: Dict[str, Any]) -> Dict[str, Any]:
"""Patch common OpenAPI spec issues that cause validation failures.
Fixes:
- anyOf with {"type": "null"} -> unwrap and add "nullable": true
- Python reserved keywords in parameter names -> append underscore
"""
import keyword
import copy
def fix_anyof_null(data: Any) -> Any:
"""Recursively fix anyOf patterns with type: null."""
if isinstance(data, dict):
# Fix anyOf with type: null
if "anyOf" in data and isinstance(data["anyOf"], list):
# Check if anyOf contains {"type": "null"}
has_null = any(
isinstance(item, dict) and item.get("type") == "null"
for item in data["anyOf"]
)
if has_null:
# Remove null items
valid_items = [
item for item in data["anyOf"]
if not (isinstance(item, dict) and item.get("type") == "null")
]
# If only one valid schema remains, unwrap it
if len(valid_items) == 1:
unwrapped = valid_items[0].copy()
del data["anyOf"]
data.update(unwrapped)
else:
# Multiple valid schemas, keep anyOf with valid items
data["anyOf"] = valid_items
# Add nullable: true to indicate it can be null
data["nullable"] = True
# Recurse into all dict values
for key, value in list(data.items()):
data[key] = fix_anyof_null(value)
elif isinstance(data, list):
return [fix_anyof_null(item) for item in data]
return data
def fix_reserved_keywords(data: Any) -> Any:
"""Recursively fix Python reserved keywords in parameter names."""
if isinstance(data, dict):
# Fix parameter names that are Python keywords
if "parameters" in data and isinstance(data["parameters"], list):
for param in data["parameters"]:
if isinstance(param, dict) and "name" in param:
param_name = param["name"]
if keyword.iskeyword(param_name):
# Append underscore to make it valid Python identifier
new_name = f"{param_name}_"
param["name"] = new_name
# Add x-original-name extension to preserve original
param["x-original-name"] = param_name
# Recurse into all dict values
for key, value in list(data.items()):
data[key] = fix_reserved_keywords(value)
elif isinstance(data, list):
return [fix_reserved_keywords(item) for item in data]
return data
# Create a deep copy to avoid modifying the original
patched = copy.deepcopy(spec)
patched = fix_anyof_null(patched)
patched = fix_reserved_keywords(patched)
print("🔧 Applied OpenAPI spec patches (fixed anyOf with null types, Python reserved keywords)")
return patched
def validate_openapi_spec(spec_path: Path) -> None:
"""Validate an OpenAPI spec file. Exits on validation errors."""
if not VALIDATOR_AVAILABLE:
print("\n❌ OpenAPI validator not available. Install with: pip install openapi-spec-validator")
sys.exit(1)
print(f"\nValidating OpenAPI spec: {spec_path.name}")
try:
# Read and validate the spec
spec_dict, spec_url = read_from_filename(str(spec_path))
validate(spec_dict)
print("✅ OpenAPI spec is valid!")
except Exception as e:
print(f"❌ OpenAPI validation failed:")
print(f" {str(e)}")
sys.exit(1)
def merge_all_files(output_dir: Path, main_filename: str) -> Path:
"""Merge all JSON files into a single self-contained spec and clean up."""
main_path = output_dir / main_filename
with open(main_path) as f:
main_spec = json.load(f)
# Find ALL JSON files in the directory (except the main file and any merged file)
all_files = []
for json_file in output_dir.glob("*.json"):
if json_file.name != main_filename and not json_file.name.startswith("merged-"):
all_files.append(json_file.name)
# Create filename map for all files
filename_map = {filename: str(output_dir / filename) for filename in all_files}
print(f"\nMerging {len(all_files)} files into {main_filename}...")
# Merge all files into main spec
for filename in sorted(all_files):
file_path = output_dir / filename
print(f" Merging {filename}")
merge_components(main_spec, file_path)
# Replace all local file refs with internal component refs
main_spec = inline_local_refs(main_spec, filename_map)
# Apply patches to fix common OpenAPI spec issues
main_spec = patch_openapi_spec(main_spec)
# Save merged spec (using the original filename, not prefixed)
merged_path = output_dir / main_filename
with open(merged_path, "w") as f:
json.dump(main_spec, f, indent=2)
print(f"\nMerged spec saved to {merged_path}")
# Clean up: Delete all downloaded dependency files
print(f"\nCleaning up {len(all_files)} downloaded files...")
for filename in all_files:
file_path = output_dir / filename
print(f" Deleting {filename}")
file_path.unlink()
return merged_path
def process_openapi_spec(input_path: str, output_dir: Path, output_filename: str | None = None) -> None:
"""Process an OpenAPI spec file and recursively download all external references."""
# Create output directory
output_dir.mkdir(parents=True, exist_ok=True)
# Load the input spec
print(f"Loading OpenAPI spec from {input_path}")
if input_path.startswith("http://") or input_path.startswith("https://"):
with urlopen(input_path) as response:
content = response.read()
# Try to detect if it's YAML or JSON
try:
spec = json.loads(content)
except json.JSONDecodeError:
if YAML_AVAILABLE:
spec = yaml.safe_load(content)
else:
raise ValueError("Input appears to be YAML but PyYAML is not installed. Install with: pip install pyyaml")
input_filename = output_filename or Path(urlparse(input_path).path).name
else:
# Detect file format by extension
input_file_path = Path(input_path)
if input_file_path.suffix.lower() in ['.yaml', '.yml']:
if not YAML_AVAILABLE:
raise ValueError("YAML input requires PyYAML. Install with: pip install pyyaml")
with open(input_path) as f:
spec = yaml.safe_load(f)
else:
with open(input_path) as f:
spec = json.load(f)
input_filename = output_filename or Path(input_path).name
# Ensure output filename has .json extension
if output_filename and not output_filename.endswith('.json'):
input_filename = f"{Path(output_filename).stem}.json"
elif input_filename and not input_filename.endswith('.json'):
input_filename = f"{Path(input_filename).stem}.json"
# Save the initial spec
output_path = output_dir / input_filename
with open(output_path, "w") as f:
json.dump(spec, f, indent=2)
# Track downloaded files
url_to_local: Dict[str, str] = {}
processed_files: Set[str] = set()
# Recursively process the main file and all dependencies
print(f"\nRecursively downloading and processing external references...")
process_file_recursively(output_path, output_dir, url_to_local, processed_files)
if not url_to_local:
print("\nNo external references found")
else:
print(f"\nSuccess! Processed {len(processed_files)} files")
print(f"Downloaded {len(url_to_local)} external dependencies:")
for url, local in sorted(url_to_local.items()):
print(f" {local} <- {url}")
# Merge all files into a single spec and clean up
merged_path = merge_all_files(output_dir, input_filename)
# Validate the merged spec (required - must be valid)
validate_openapi_spec(merged_path)
def main():
if len(sys.argv) < 2:
print("Usage: python download_openapi_refs.py <input-spec> [output-dir] [output-filename]")
print()
print("This script:")
print(" 1. Downloads an OpenAPI spec and all external $ref dependencies")
print(" 2. Recursively resolves nested references")
print(" 3. Merges everything into a single self-contained spec")
print(" 4. Validates the merged spec with openapi-spec-validator")
print(" 5. Cleans up temporary files")
print()
print("Arguments:")
print(" input-spec URL or local path to the OpenAPI spec")
print(" output-dir Output directory (default: ./openapi-resolved)")
print(" output-filename Output filename (default: derived from input)")
print()
print("Examples:")
print(" python download_openapi_refs.py https://example.com/api.json ./output")
print(" python download_openapi_refs.py ./local-spec.json ./resolved")
print(" python download_openapi_refs.py https://api.com/spec.json ./out openapi-spec.json")
sys.exit(1)
input_path = sys.argv[1]
output_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("./openapi-resolved")
output_filename = sys.argv[3] if len(sys.argv) > 3 else None
process_openapi_spec(input_path, output_dir, output_filename)
if __name__ == "__main__":
main()