import json
import re
import logging
from typing import Dict, Any, Union, List, Tuple
from jsonschema import Draft202012Validator, SchemaError
from urllib.parse import urlparse
class SchemaValidator:
"""
Utility class for validating JSON schemas and schema IDs.
"""
def __init__(self):
"""
Initialize the schema validator.
"""
self.logger = logging.getLogger(__name__)
def validate_schema_id(self, schema_id: str) -> Tuple[bool, str]:
"""
Validate that schema_id is a valid Linux-style path.
Args:
schema_id: The schema identifier to validate
Returns:
Tuple of (is_valid: bool, error_message: str)
Rules:
- Must be a valid Linux path format
- Only lowercase letters, numbers, hyphens, underscores, forward slashes, and dots
- Must end with .json
- No special characters or spaces
- No double slashes or relative path components
- Maximum length of 255 characters
"""
if not schema_id:
return False, "Schema ID cannot be empty"
if len(schema_id) > 255:
return False, "Schema ID too long (maximum 255 characters)"
if not schema_id.endswith('.json'):
return False, "Schema ID must end with '.json'"
# Check for valid Linux path characters only
# Allow: lowercase letters, numbers, hyphens, underscores, forward slashes, dots
valid_pattern = re.compile(r'^[a-z0-9_/.-]+\.json$')
if not valid_pattern.match(schema_id):
return False, "Schema ID contains invalid characters. Only lowercase letters, numbers, hyphens, underscores, forward slashes, and dots are allowed"
# Check for path traversal attempts
if '..' in schema_id:
return False, "Schema ID cannot contain relative path components (..)"
# Check for double slashes
if '//' in schema_id:
return False, "Schema ID cannot contain double slashes (//)"
# Check that it doesn't start with a slash (absolute path)
if schema_id.startswith('/'):
return False, "Schema ID cannot start with a forward slash"
# Split path and validate each component
path_parts = schema_id.split('/')
for part in path_parts[:-1]: # Exclude the filename (.json)
if not part:
return False, "Schema ID cannot have empty path components"
if part.startswith('.') or part.startswith('-'):
return False, "Path components cannot start with '.' or '-'"
if len(part) > 50:
return False, "Path components cannot exceed 50 characters"
# Validate filename (last part)
filename = path_parts[-1]
if not filename or filename == '.json':
return False, "Invalid filename"
# Check filename without extension
name_without_ext = filename[:-5] # Remove .json
if not name_without_ext:
return False, "Filename cannot be just '.json'"
if name_without_ext.startswith('.') or name_without_ext.startswith('-'):
return False, "Filename cannot start with '.' or '-'"
return True, ""
def validate_json_schema_string(self, schema_string: str) -> Tuple[bool, str, Dict[str, Any]]:
"""
Validate that a string contains a valid JSON Schema.
Args:
schema_string: JSON schema as string
Returns:
Tuple of (is_valid: bool, error_message: str, parsed_schema: dict)
"""
if not schema_string:
return False, "Schema string cannot be empty", {}
if not isinstance(schema_string, str):
if schema_string is None or schema_string == []:
return False, "Schema string cannot be empty", {}
return False, "Schema must be a string", {}
# Parse JSON
try:
schema_dict = json.loads(schema_string)
except json.JSONDecodeError as e:
return False, f"Invalid JSON: {e}", {}
if not isinstance(schema_dict, dict):
return False, "Schema must be a JSON object", {}
# Validate against JSON Schema meta-schema
try:
validator = Draft202012Validator.META_SCHEMA
Draft202012Validator(validator).validate(schema_dict)
except SchemaError as e:
return False, f"Invalid JSON Schema: {e.message}", {}
except Exception as e:
return False, f"Schema validation error: {str(e)}", {}
# Additional basic validations
validation_errors = self._validate_schema_structure(schema_dict)
if validation_errors:
return False, f"Schema structure errors: {'; '.join(validation_errors)}", {}
return True, "", schema_dict
def _validate_schema_structure(self, schema: Dict[str, Any]) -> List[str]:
"""
Perform additional structural validations on the schema.
Args:
schema: Parsed schema dictionary
Returns:
List of validation error messages
"""
errors = []
# Check for required basic structure
if not isinstance(schema, dict):
errors.append("Schema must be an object")
return errors
# Recommend $schema field
if '$schema' not in schema:
self.logger.info("Schema missing $schema field (recommended)")
# Check type field if present
if 'type' in schema:
valid_types = ['null', 'boolean', 'object', 'array', 'number', 'string', 'integer']
schema_type = schema['type']
if isinstance(schema_type, str):
if schema_type not in valid_types:
errors.append(f"Invalid type '{schema_type}'. Must be one of: {valid_types}")
elif isinstance(schema_type, list):
for t in schema_type:
if t not in valid_types:
errors.append(f"Invalid type '{t}' in type array. Must be one of: {valid_types}")
else:
errors.append("Type field must be a string or array of strings")
# Validate properties if present
if 'properties' in schema:
if not isinstance(schema['properties'], dict):
errors.append("Properties field must be an object")
else:
# Recursively validate nested schemas
for prop_name, prop_schema in schema['properties'].items():
if not isinstance(prop_schema, dict):
errors.append(f"Property '{prop_name}' schema must be an object")
# Validate required field if present
if 'required' in schema:
if not isinstance(schema['required'], list):
errors.append("Required field must be an array")
else:
properties = schema.get('properties', {})
for req_field in schema['required']:
if not isinstance(req_field, str):
errors.append(f"Required field name must be a string, got: {type(req_field)}")
elif req_field not in properties:
self.logger.warning(f"Required field '{req_field}' not found in properties")
# Validate additionalProperties if present
if 'additionalProperties' in schema:
additional = schema['additionalProperties']
if not isinstance(additional, (bool, dict)):
errors.append("additionalProperties must be boolean or object")
# Check for common typos in field names
common_typos = {
'proprties': 'properties',
'requried': 'required',
'additonalProperties': 'additionalProperties',
'minLenght': 'minLength',
'maxLenght': 'maxLength'
}
for typo, correct in common_typos.items():
if typo in schema:
errors.append(f"Possible typo: '{typo}' should be '{correct}'")
return errors
def get_schema_info(self, schema: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract useful information from a schema.
Args:
schema: Parsed schema dictionary
Returns:
Dictionary with schema information
"""
info = {
'title': schema.get('title', 'Untitled Schema'),
'description': schema.get('description', ''),
'schema_version': schema.get('$schema', 'Not specified'),
'id': schema.get('$id', ''),
'type': schema.get('type', 'Not specified'),
'has_properties': 'properties' in schema,
'property_count': len(schema.get('properties', {})),
'has_required': 'required' in schema,
'required_count': len(schema.get('required', [])),
'has_additional_properties': 'additionalProperties' in schema,
'allows_additional_properties': schema.get('additionalProperties', True)
}
# Check for nested schemas
if 'properties' in schema:
nested_objects = sum(1 for prop in schema['properties'].values()
if isinstance(prop, dict) and prop.get('type') == 'object')
info['nested_object_count'] = nested_objects
# Check for arrays
if 'properties' in schema:
array_properties = sum(1 for prop in schema['properties'].values()
if isinstance(prop, dict) and prop.get('type') == 'array')
info['array_property_count'] = array_properties
return info