import json
from typing import Dict, Any, Union
from urllib.parse import urlparse
from jsonschema import Draft202012Validator, RefResolver
from jsonschema.exceptions import ValidationError, SchemaError
from utils import DataManager
class JSONSchemaValidator:
"""
JSON validator against JSON Schema Draft 2020-12 with external reference support.
"""
def __init__(self):
"""
Initialize the validator.
"""
self.schema_cache = {}
self.resolver = None
def _resolve_schema_reference(self, ref_url: str) -> Dict[str, Any]:
"""
Resolves a schema reference and loads it from the appropriate source.
Args:
ref_url: Schema reference URL
Returns:
Dictionary with the referenced schema
Note:
- Internal references (#/$defs/name, #) are handled automatically by jsonschema
- HTTP/HTTPS URLs are loaded from web
- Absolute paths (/schemas/file.json) are loaded from database
- Relative paths are loaded from local files
"""
# Check if already in cache
if ref_url in self.schema_cache:
return self.schema_cache[ref_url]
# Internal references (#/$defs/name, #) - jsonschema handles them automatically
if ref_url.startswith('#'):
# These references are resolved automatically by jsonschema
# We don't need to do anything special here
return {}
parsed_url = urlparse(ref_url)
schema = None
try:
# Utils.DataResolver('./schemas', ref_url)
self.data_manager = DataManager()
schema = self.data_manager.load_data('schemas', ref_url)
except Exception as e:
raise ValueError(f"Could not resolve schema reference '{ref_url}': {e}")
# Save to cache
self.schema_cache[ref_url] = schema
return schema
def _create_resolver(self, main_schema: Dict[str, Any]) -> RefResolver:
"""
Creates a custom resolver to handle external references.
Internal references (#/$defs/name, #) are handled automatically.
Args:
main_schema: Main schema
Returns:
Configured RefResolver
"""
def custom_resolver(url: str) -> Dict[str, Any]:
# Only handle external references, internal ones are handled by jsonschema
if not url.startswith('#'):
return self._resolve_schema_reference(url)
return {}
# Create store with main schema
store = {
main_schema.get("$id", ""): main_schema
}
# Create resolver with custom handler
resolver = RefResolver(
base_uri=main_schema.get("$id", ""),
referrer=main_schema,
store=store,
handlers={"": custom_resolver}
)
return resolver
def validate_json(self,
json_data: Union[Dict[str, Any], str],
schema: Union[Dict[str, Any], str],
strict: bool = True) -> Dict[str, Any]:
"""
Validates JSON against a schema with external reference support.
Args:
json_data: JSON data to validate (dict or string)
schema: JSON Schema (dict or string)
strict: If True, uses strict validation
Returns:
Dictionary with validation result:
{
"valid": bool,
"errors": List[str],
"error_count": int
}
"""
# Convert strings to dictionaries if necessary
if isinstance(json_data, str):
try:
json_data = json.loads(json_data)
except json.JSONDecodeError as e:
return {
"valid": False,
"errors": [f"Invalid JSON data: {e}"],
"error_count": 1
}
if isinstance(schema, str):
try:
schema = json.loads(schema)
except json.JSONDecodeError as e:
return {
"valid": False,
"errors": [f"Invalid JSON schema: {e}"],
"error_count": 1
}
try:
# Create custom resolver
self.resolver = self._create_resolver(schema)
# Create Draft 2020-12 validator
validator_class = Draft202012Validator
validator = validator_class(schema, resolver=self.resolver)
# Validate the schema itself
validator.check_schema(schema)
# Validate the data
errors = []
for error in validator.iter_errors(json_data):
error_msg = self._format_validation_error(error)
errors.append(error_msg)
return {
"valid": len(errors) == 0,
"errors": errors,
"error_count": len(errors)
}
except SchemaError as e:
return {
"valid": False,
"errors": [f"Schema error: {e.message}"],
"error_count": 1
}
except Exception as e:
return {
"valid": False,
"errors": [f"Validation error: {str(e)}"],
"error_count": 1
}
def _format_validation_error(self, error: ValidationError) -> str:
"""
Formats a validation error to be more readable.
Args:
error: jsonschema validation error
Returns:
Formatted error string
"""
path = " -> ".join(str(p) for p in error.absolute_path) if error.absolute_path else "root"
return f"Path '{path}': {error.message}"