"""
JSON Schema Generator Utility
This module provides functionality to generate JSON Schema from JSON data.
Supports different null handling strategies and nested object analysis.
"""
import json
import logging
from typing import Any, Dict, List, Union, Optional
class SchemaGenerator:
"""
Generates JSON Schema Draft 2020-12 from JSON data.
"""
def __init__(self):
"""Initialize the schema generator."""
self.logger = logging.getLogger(__name__)
def generate_schema(
self,
json_data: Union[Dict, str],
null_handling: str = "allow",
title: str = "Generated Schema",
description: str = "Schema generated from JSON data"
) -> Dict[str, Any]:
"""
Generate JSON Schema from JSON data.
Args:
json_data: JSON data as dict or string
null_handling: How to handle null values ("allow", "ignore", "strict")
title: Schema title
description: Schema description
Returns:
Generated JSON Schema as dictionary
Raises:
ValueError: If json_data is invalid or null_handling is unknown
"""
# Parse JSON string if needed
if isinstance(json_data, str):
try:
parsed_data = json.loads(json_data)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON string: {e}")
else:
parsed_data = json_data
# Validate null_handling parameter
if null_handling not in ["allow", "ignore", "strict"]:
raise ValueError(f"Invalid null_handling value: {null_handling}. Must be 'allow', 'ignore', or 'strict'")
# Generate base schema
schema = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": title,
"description": description
}
# Generate schema for the data
schema.update(self._analyze_value(parsed_data, null_handling))
# Add additionalProperties: false for object schemas
if schema.get("type") == "object":
schema["additionalProperties"] = False
return schema
def _analyze_value(self, value: Any, null_handling: str) -> Dict[str, Any]:
"""
Analyze a value and generate schema properties for it.
Args:
value: The value to analyze
null_handling: How to handle null values
Returns:
Schema properties for the value
"""
if value is None:
return self._handle_null_value(null_handling)
if isinstance(value, bool):
return {"type": "boolean"}
if isinstance(value, int):
return {"type": "integer"}
if isinstance(value, float):
return {"type": "number"}
if isinstance(value, str):
return {"type": "string"}
if isinstance(value, list):
return self._analyze_array(value, null_handling)
if isinstance(value, dict):
return self._analyze_object(value, null_handling)
# Fallback for unknown types
return {"type": "string", "description": f"Unknown type: {type(value).__name__}"}
def _handle_null_value(self, null_handling: str) -> Dict[str, Any]:
"""
Handle null values based on the null_handling strategy.
Args:
null_handling: Strategy for handling nulls
Returns:
Schema properties for null handling
"""
if null_handling == "allow":
return {"type": ["null"]}
elif null_handling == "ignore":
# Return empty dict - will be filtered out
return {}
elif null_handling == "strict":
# Treat as string type (could be customized)
return {"type": "string", "description": "Field had null value, treated as string"}
return {"type": ["null"]}
def _analyze_array(self, array: List[Any], null_handling: str) -> Dict[str, Any]:
"""
Analyze an array and generate schema for it.
Args:
array: The array to analyze
null_handling: How to handle null values
Returns:
Schema properties for the array
"""
if not array:
return {
"type": "array",
"items": {},
"description": "Empty array - no type constraints"
}
# Analyze all items to determine types
item_schemas = []
for item in array:
item_schema = self._analyze_value(item, null_handling)
if item_schema: # Skip empty schemas (ignored nulls)
item_schemas.append(item_schema)
if not item_schemas:
return {
"type": "array",
"items": {},
"description": "Array with only null/ignored values"
}
# If all items have the same type, use that type
if len(set(json.dumps(schema, sort_keys=True) for schema in item_schemas)) == 1:
return {
"type": "array",
"items": item_schemas[0]
}
# Mixed types - use anyOf
unique_schemas = []
seen_schemas = set()
for schema in item_schemas:
schema_str = json.dumps(schema, sort_keys=True)
if schema_str not in seen_schemas:
unique_schemas.append(schema)
seen_schemas.add(schema_str)
if len(unique_schemas) == 1:
return {
"type": "array",
"items": unique_schemas[0]
}
return {
"type": "array",
"items": {
"anyOf": unique_schemas
}
}
def _analyze_object(self, obj: Dict[str, Any], null_handling: str) -> Dict[str, Any]:
"""
Analyze an object and generate schema for it.
Args:
obj: The object to analyze
null_handling: How to handle null values
Returns:
Schema properties for the object
"""
properties = {}
required = []
for key, value in obj.items():
prop_schema = self._analyze_value(value, null_handling)
# Skip empty schemas (ignored nulls)
if not prop_schema:
continue
properties[key] = prop_schema
# Handle required fields based on null handling
if null_handling == "strict":
required.append(key)
elif null_handling == "allow":
if value is not None:
required.append(key)
elif null_handling == "ignore":
if value is not None:
required.append(key)
schema = {
"type": "object",
"properties": properties
}
if required:
schema["required"] = sorted(required)
return schema
def validate_generated_schema(self, schema: Dict[str, Any]) -> tuple[bool, Optional[str]]:
"""
Basic validation of the generated schema.
Args:
schema: The generated schema to validate
Returns:
Tuple of (is_valid, error_message)
"""
try:
# Check required fields
if "$schema" not in schema:
return False, "Missing $schema field"
if "type" not in schema:
return False, "Missing type field"
# Validate schema structure
if schema.get("type") == "object":
if "properties" not in schema:
return False, "Object type missing properties"
if not isinstance(schema["properties"], dict):
return False, "Properties must be an object"
return True, None
except Exception as e:
return False, f"Schema validation error: {str(e)}"