Skip to main content
Glama

guided_schema_evolution

Safely evolve schemas using a guided workflow that analyzes changes, suggests strategies, and coordinates consumer updates for Kafka Schema Registry.

Instructions

Start the Schema Evolution Assistant workflow. This guided workflow helps you safely evolve schemas by analyzing changes, suggesting strategies, and coordinating consumer updates.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
current_schemaNo
subjectNo

Implementation Reference

  • The MCP tool handler for guided_schema_evolution. Decorated with @self.mcp.tool and implements the core logic by initializing context from inputs and starting the schema_evolution_assistant workflow via the MultiStepElicitationManager.
    @self.mcp.tool( description=( "Start the Schema Evolution Assistant workflow. " "This guided workflow helps you safely evolve schemas by analyzing changes, " "suggesting strategies, and coordinating consumer updates." ), ) async def guided_schema_evolution( subject: Optional[str] = None, current_schema: Optional[str] = None, ) -> str: """Start the Schema Evolution Assistant workflow.""" initial_context = {} if subject: initial_context["subject"] = subject if current_schema: try: initial_context["current_schema"] = json.loads(current_schema) except json.JSONDecodeError: initial_context["current_schema"] = current_schema workflow_id = "schema_evolution_assistant" try: request = await self.multi_step_manager.start_workflow( workflow_id=workflow_id, initial_context=initial_context ) if request: return json.dumps( { "status": "started", "workflow_id": workflow_id, "workflow_name": "Schema Evolution Assistant", "request_id": request.id, "first_step": request.title, "description": request.description, "message": ( "Schema Evolution Assistant started. This workflow will guide you through:\n" "1. Analyzing schema changes\n" "2. Detecting breaking changes\n" "3. Selecting evolution strategy\n" "4. Planning consumer coordination\n" "5. Setting up rollback procedures" ), } ) else: return json.dumps({"error": "Failed to start Schema Evolution workflow"}) except Exception as e: logger.error(f"Error starting Schema Evolution workflow: {str(e)}") return json.dumps({"error": f"Failed to start workflow: {str(e)}"})
  • Helper function invoked upon workflow completion to process responses and generate a comprehensive schema evolution execution plan, capturing change analysis, strategy configurations, compatibility overrides, consumer coordination details, rollback procedures, and execution options.
    def execute_schema_evolution(responses: Dict[str, Any]) -> Dict[str, Any]: """Execute schema evolution based on workflow responses.""" result = { "operation": "schema_evolution", "subject": responses.get("subject"), "status": "pending", } # Basic change information result["change_info"] = { "change_type": responses.get("change_type"), "description": responses.get("change_description"), "current_consumers": responses.get("current_consumers"), "production_impact": responses.get("production_impact"), "has_breaking_changes": responses.get("has_breaking_changes") == "true", } # Evolution strategy strategy = responses.get("evolution_strategy") result["evolution_strategy"] = strategy # Strategy-specific configuration if strategy == "multi_version_migration": result["migration_config"] = { "intermediate_versions": int(responses.get("intermediate_versions", 1)), "version_timeline": responses.get("version_timeline"), "deprecation_strategy": responses.get("deprecation_strategy"), } elif strategy == "dual_support": result["dual_support_config"] = { "support_duration": responses.get("support_duration"), "field_mapping": responses.get("field_mapping"), "conversion_logic": responses.get("conversion_logic"), } elif strategy == "gradual_migration": result["migration_phases"] = { "phase_count": responses.get("phase_count"), "phase_criteria": responses.get("phase_criteria"), "rollback_checkpoints": responses.get("rollback_checkpoints") == "true", } else: result["implementation"] = { "deployment_window": responses.get("deployment_window"), "validation_approach": responses.get("validation_approach"), } # Compatibility resolution (if breaking changes) change_info = result.get("change_info", {}) if isinstance(change_info, dict) and change_info.get("has_breaking_changes"): result["compatibility_resolution"] = { "approach": responses.get("resolution_approach"), "override_compatibility": responses.get("compatibility_override") == "true", "notes": responses.get("compatibility_notes"), } # Consumer coordination result["consumer_coordination"] = { "notification_method": responses.get("notification_method"), "testing_approach": responses.get("consumer_testing"), "support_period": responses.get("support_period"), } # Rollback planning result["rollback_plan"] = { "trigger": responses.get("rollback_trigger"), "max_time": responses.get("rollback_time"), "data_handling": responses.get("data_handling"), "test_rollback": responses.get("rollback_testing") == "true", } # Final options result["documentation"] = { "generate_migration_guide": responses.get("generate_migration_guide") == "true", "create_runbook": responses.get("create_runbook") == "true", "schedule_dry_run": responses.get("schedule_dry_run") == "true", "evolution_notes": responses.get("evolution_notes"), } # Execution settings result["execution"] = { "confirmed": responses.get("final_confirmation") == "true", "enable_monitoring": responses.get("monitor_execution") == "true", } return result
  • Core analysis helper that detects and categorizes schema changes (add_field, remove_field, type changes, etc.), determines if breaking, handles Avro complex types (records, arrays, maps, unions), and supports compatibility checking used in the schema evolution workflow.
    return result def analyze_schema_changes(current_schema: Dict[str, Any], proposed_schema: Dict[str, Any]) -> List[Dict[str, Any]]:

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server