"""
Schema Management Handlers for JSON Schema MCP Server
This module contains handlers for schema management operations (CRUD).
"""
import json
import logging
from typing import Any, Dict
from mcp.types import CallToolResult, TextContent
from tools.SchemaGenerator import SchemaGenerator
# Configure logger at module level
logger = logging.getLogger(__name__)
async def handle_generate_schema(arguments: Dict[str, Any], validator, data_manager, schema_validator) -> CallToolResult:
"""
Handles generating JSON Schema from JSON data and saving it to collections.
"""
schema_id = arguments.get("schema_id")
json_data = arguments.get("json_data")
null_handling = arguments.get("null_handling", "allow")
if not schema_id or json_data is None:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Required parameters 'schema_id' and 'json_data'"
)
]
)
try:
# Validate schema_id format
valid_id, id_error = schema_validator.validate_schema_id(schema_id)
if not valid_id:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Invalid Schema ID: {id_error}"
)
]
)
# Initialize schema generator and generate schema
schema_generator = SchemaGenerator()
try:
generated_schema = schema_generator.generate_schema(
json_data=json_data,
null_handling=null_handling,
title=f"Generated Schema for {schema_id}",
description=f"Schema automatically generated from JSON data for {schema_id}"
)
except ValueError as e:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Invalid JSON data: {str(e)}"
)
]
)
# Check if schema already exists
if data_manager.schema_exists(".schemas", schema_id):
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Conflict: Schema '{schema_id}' already exists. Use the 'add_update_schema' tool to update it if needed."
)
]
)
# Save the generated schema
save_result = data_manager.save_data(".schemas", schema_id, generated_schema, save_to_db=True, save_to_file=True)
if save_result["success"]:
# Get schema information for response
schema_info = schema_validator.get_schema_info(generated_schema)
response_text = f"""Schema generated and saved successfully
Generated schema details:
- ID: {schema_id}
- Title: {schema_info['title']}
- Type: {schema_info['type']}
- Schema version: {schema_info['schema_version']}
- Properties: {schema_info['property_count']}
- Required fields: {schema_info['required_count']}
- Null handling: {null_handling}
The generated JSON schema is available using the 'get_schema' tool with schema_id: {schema_id}"""
else:
error_details = "; ".join(save_result['errors']) if save_result['errors'] else "Unknown error"
response_text = f"Error saving generated schema: {error_details}"
return CallToolResult(
content=[
TextContent(
type="text",
text=response_text
)
]
)
except Exception as e:
logger.error(f"Error in generate_schema: {e}")
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Internal error: {str(e)}"
)
]
)
async def handle_add_update_schema(arguments: Dict[str, Any], validator, data_manager, schema_validator) -> CallToolResult:
"""
Handles adding or updating JSON schemas in collections.
"""
schema_id = arguments.get("schema_id")
schema_content = arguments.get("schema_content")
update_if_exists = arguments.get("update_if_exists", False)
if not schema_id or not schema_content:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Required parameters 'schema_id' and 'schema_content'"
)
]
)
try:
# Validate schema_id format
valid_id, id_error = schema_validator.validate_schema_id(schema_id)
if not valid_id:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Invalid Schema ID: {id_error}"
)
]
)
# Validate JSON schema content
valid_schema, schema_error, parsed_schema = schema_validator.validate_json_schema_string(schema_content)
if not valid_schema:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Invalid JSON Schema: {schema_error}"
)
]
)
# Check if schema already exists
schema_exists = data_manager.schema_exists(".schemas", schema_id)
if schema_exists and not update_if_exists:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Conflict: Schema '{schema_id}' already exists. Use update_if_exists=true to update it."
)
]
)
# Save the schema
save_result = data_manager.save_data(".schemas", schema_id, parsed_schema, save_to_db=True, save_to_file=True)
if save_result["success"]:
# Get schema information for response
schema_info = schema_validator.get_schema_info(parsed_schema)
action = "updated" if update_if_exists and schema_exists else "added"
status = f"Schema {action} successfully"
details = f"""Schema details:
- ID: {schema_id}
- Title: {schema_info['title']}
- Type: {schema_info['type']}
- Schema version: {schema_info['schema_version']}
- Properties: {schema_info['property_count']}
- Required fields: {schema_info['required_count']}"""
response_text = f"""{status}
{details}
The JSON schema is available using the 'get_schema' tool with schema_id: {schema_id}"""
else:
error_details = "; ".join(save_result['errors']) if save_result['errors'] else "Unknown error"
response_text = f"Error saving schema: {error_details}"
return CallToolResult(
content=[
TextContent(
type="text",
text=response_text
)
]
)
except Exception as e:
logger.error(f"Error in add_update_schema: {e}")
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Internal error: {str(e)}"
)
]
)
async def handle_delete_schema(arguments: Dict[str, Any], validator, data_manager, schema_validator) -> CallToolResult:
"""
Handles deleting JSON schemas from collections.
"""
schema_id = arguments.get("schema_id")
confirm_deletion = arguments.get("confirm_deletion", False)
if not schema_id:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Required parameter 'schema_id'"
)
]
)
if not confirm_deletion:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Must confirm deletion by setting 'confirm_deletion=true'. This operation is irreversible."
)
]
)
try:
# Validate schema_id format
valid_id, id_error = schema_validator.validate_schema_id(schema_id)
if not valid_id:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Invalid Schema ID: {id_error}"
)
]
)
# Check if schema exists first
if not data_manager.schema_exists(".schemas", schema_id):
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Schema '{schema_id}' does not exist in the .schemas collection"
)
]
)
# Load schema data for getting info before deletion
existing_schema = data_manager.load_data(".schemas", schema_id)
# Get schema info before deletion for the response
schema_info = schema_validator.get_schema_info(existing_schema)
# Delete the schema
delete_result = data_manager.delete_data(".schemas", schema_id, delete_from_db=True, delete_from_file=True)
if delete_result.get("database_success", False) or delete_result.get("file_success", False):
response_text = f"""Schema deleted successfully
Deleted schema details:
- ID: {schema_id}
- Title: {schema_info['title']}
- Type: {schema_info['type']}
- Schema version: {schema_info['schema_version']}
- Properties: {schema_info['property_count']}"""
if delete_result.get('errors'):
error_details = "; ".join(delete_result['errors'])
response_text += f"\n- Warnings: {error_details}"
else:
error_details = "; ".join(delete_result.get('errors', [])) if delete_result.get('errors') else "Unknown error"
response_text = f"Error deleting schema: {error_details}"
return CallToolResult(
content=[
TextContent(
type="text",
text=response_text
)
]
)
except Exception as e:
logger.error(f"Error in delete_schema: {e}")
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Internal error: {str(e)}"
)
]
)
async def handle_get_schema(arguments: Dict[str, Any], validator, data_manager, schema_validator) -> CallToolResult:
"""
Handles getting JSON schema content from collections.
Returns only the raw schema data.
"""
schema_id = arguments.get("schema_id")
if not schema_id:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Required parameter 'schema_id'"
)
]
)
try:
# Validate schema_id format
valid_id, id_error = schema_validator.validate_schema_id(schema_id)
if not valid_id:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Invalid Schema ID: {id_error}"
)
]
)
# Check if schema exists first
if not data_manager.schema_exists(".schemas", schema_id):
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Schema '{schema_id}' does not exist in the .schemas collection"
)
]
)
# Load schema from collection
schema_data = data_manager.load_data(".schemas", schema_id)
# Return raw schema data as JSON string
schema_json = json.dumps(schema_data, indent=2, ensure_ascii=False)
return CallToolResult(
content=[
TextContent(
type="text",
text=schema_json
)
]
)
except Exception as e:
logger.error(f"Error in get_schema: {e}")
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Internal error: {str(e)}"
)
]
)
async def handle_list_schemas(arguments: Dict[str, Any], validator, data_manager, schema_validator) -> CallToolResult:
"""
Handles listing all available schema IDs from collections.
"""
try:
# Get list of schemas from data manager
schema_list = data_manager.list_schemas()
if schema_list["success"]:
schema_ids = schema_list["schema_ids"]
count = schema_list["count"]
source = schema_list.get("source", "unknown")
if count == 0:
message = schema_list.get("message", "No schemas found")
response_text = f"""Lista de Schemas Disponibles
{message}
Total de schemas: {count}
Para agregar un nuevo schema, use la herramienta 'add_update_schema'."""
else:
# Format schema list
schema_list_text = "\n".join([f" • {schema_id}" for schema_id in schema_ids])
response_text = f"""Lista de Schemas Disponibles
Total de schemas: {count}
Schemas disponibles:
{schema_list_text}
Para obtener un schema específico, use 'get_schema' con el schema_id deseado."""
else:
error_msg = schema_list.get("error", "Error desconocido")
response_text = f"""Error al listar schemas
Error: {error_msg}
No se pudieron obtener los schemas desde ninguna fuente disponible.
Verifique que la base de datos esté disponible o que exista el directorio .schemas"""
return CallToolResult(
content=[
TextContent(
type="text",
text=response_text
)
]
)
except Exception as e:
logger.error(f"Error en list_schemas: {e}")
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Error interno: {str(e)}"
)
]
)