Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

guided_schema_evolution

Safely evolve schemas using a guided workflow that analyzes changes, suggests strategies, and coordinates consumer updates for Kafka Schema Registry.

Instructions

Start the Schema Evolution Assistant workflow. This guided workflow helps you safely evolve schemas by analyzing changes, suggesting strategies, and coordinating consumer updates.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
current_schemaNo
subjectNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The MCP tool handler for guided_schema_evolution. Decorated with @self.mcp.tool and implements the core logic by initializing context from inputs and starting the schema_evolution_assistant workflow via the MultiStepElicitationManager.
    @self.mcp.tool(
        description=(
            "Start the Schema Evolution Assistant workflow. "
            "This guided workflow helps you safely evolve schemas by analyzing changes, "
            "suggesting strategies, and coordinating consumer updates."
        ),
    )
    async def guided_schema_evolution(
        subject: Optional[str] = None,
        current_schema: Optional[str] = None,
    ) -> str:
        """Start the Schema Evolution Assistant workflow."""
        initial_context = {}
        if subject:
            initial_context["subject"] = subject
        if current_schema:
            try:
                initial_context["current_schema"] = json.loads(current_schema)
            except json.JSONDecodeError:
                initial_context["current_schema"] = current_schema
    
        workflow_id = "schema_evolution_assistant"
    
        try:
            request = await self.multi_step_manager.start_workflow(
                workflow_id=workflow_id, initial_context=initial_context
            )
    
            if request:
                return json.dumps(
                    {
                        "status": "started",
                        "workflow_id": workflow_id,
                        "workflow_name": "Schema Evolution Assistant",
                        "request_id": request.id,
                        "first_step": request.title,
                        "description": request.description,
                        "message": (
                            "Schema Evolution Assistant started. This workflow will guide you through:\n"
                            "1. Analyzing schema changes\n"
                            "2. Detecting breaking changes\n"
                            "3. Selecting evolution strategy\n"
                            "4. Planning consumer coordination\n"
                            "5. Setting up rollback procedures"
                        ),
                    }
                )
            else:
                return json.dumps({"error": "Failed to start Schema Evolution workflow"})
        except Exception as e:
            logger.error(f"Error starting Schema Evolution workflow: {str(e)}")
            return json.dumps({"error": f"Failed to start workflow: {str(e)}"})
  • Helper function invoked upon workflow completion to process responses and generate a comprehensive schema evolution execution plan, capturing change analysis, strategy configurations, compatibility overrides, consumer coordination details, rollback procedures, and execution options.
    def execute_schema_evolution(responses: Dict[str, Any]) -> Dict[str, Any]:
        """Execute schema evolution based on workflow responses."""
        result = {
            "operation": "schema_evolution",
            "subject": responses.get("subject"),
            "status": "pending",
        }
    
        # Basic change information
        result["change_info"] = {
            "change_type": responses.get("change_type"),
            "description": responses.get("change_description"),
            "current_consumers": responses.get("current_consumers"),
            "production_impact": responses.get("production_impact"),
            "has_breaking_changes": responses.get("has_breaking_changes") == "true",
        }
    
        # Evolution strategy
        strategy = responses.get("evolution_strategy")
        result["evolution_strategy"] = strategy
    
        # Strategy-specific configuration
        if strategy == "multi_version_migration":
            result["migration_config"] = {
                "intermediate_versions": int(responses.get("intermediate_versions", 1)),
                "version_timeline": responses.get("version_timeline"),
                "deprecation_strategy": responses.get("deprecation_strategy"),
            }
        elif strategy == "dual_support":
            result["dual_support_config"] = {
                "support_duration": responses.get("support_duration"),
                "field_mapping": responses.get("field_mapping"),
                "conversion_logic": responses.get("conversion_logic"),
            }
        elif strategy == "gradual_migration":
            result["migration_phases"] = {
                "phase_count": responses.get("phase_count"),
                "phase_criteria": responses.get("phase_criteria"),
                "rollback_checkpoints": responses.get("rollback_checkpoints") == "true",
            }
        else:
            result["implementation"] = {
                "deployment_window": responses.get("deployment_window"),
                "validation_approach": responses.get("validation_approach"),
            }
    
        # Compatibility resolution (if breaking changes)
        change_info = result.get("change_info", {})
        if isinstance(change_info, dict) and change_info.get("has_breaking_changes"):
            result["compatibility_resolution"] = {
                "approach": responses.get("resolution_approach"),
                "override_compatibility": responses.get("compatibility_override") == "true",
                "notes": responses.get("compatibility_notes"),
            }
    
        # Consumer coordination
        result["consumer_coordination"] = {
            "notification_method": responses.get("notification_method"),
            "testing_approach": responses.get("consumer_testing"),
            "support_period": responses.get("support_period"),
        }
    
        # Rollback planning
        result["rollback_plan"] = {
            "trigger": responses.get("rollback_trigger"),
            "max_time": responses.get("rollback_time"),
            "data_handling": responses.get("data_handling"),
            "test_rollback": responses.get("rollback_testing") == "true",
        }
    
        # Final options
        result["documentation"] = {
            "generate_migration_guide": responses.get("generate_migration_guide") == "true",
            "create_runbook": responses.get("create_runbook") == "true",
            "schedule_dry_run": responses.get("schedule_dry_run") == "true",
            "evolution_notes": responses.get("evolution_notes"),
        }
    
        # Execution settings
        result["execution"] = {
            "confirmed": responses.get("final_confirmation") == "true",
            "enable_monitoring": responses.get("monitor_execution") == "true",
        }
    
        return result
  • Core analysis helper that detects and categorizes schema changes (add_field, remove_field, type changes, etc.), determines if breaking, handles Avro complex types (records, arrays, maps, unions), and supports compatibility checking used in the schema evolution workflow.
        return result
    
    
    def analyze_schema_changes(current_schema: Dict[str, Any], proposed_schema: Dict[str, Any]) -> List[Dict[str, Any]]:
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden of behavioral disclosure. It mentions the tool 'starts' a workflow and helps 'safely evolve schemas,' but doesn't clarify if this is a read-only or mutating operation, what permissions are required, whether it's interactive or batch, or what happens upon invocation (e.g., does it block other operations?). The description lacks details on error handling, rate limits, or side effects.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is appropriately concise with two sentences that efficiently state the tool's purpose and high-level functionality. It's front-loaded with the core action ('Start the Schema Evolution Assistant workflow') and avoids unnecessary details. However, it could be slightly more structured by explicitly separating purpose from outcomes.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's complexity (starting a workflow with 2 parameters) and the presence of an output schema (which reduces the need to describe return values), the description is moderately complete. It covers the 'what' but lacks critical context: no parameter explanations, no behavioral details, and no usage guidelines. The output schema helps, but the description doesn't fully compensate for the gaps in schema coverage and missing annotations.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, so the schema provides no parameter documentation. The description doesn't mention any parameters or their purposes. With 2 parameters ('current_schema' and 'subject'), the description fails to explain what these inputs mean, how they affect the workflow, or if they're optional (as indicated by 0 required parameters). This leaves parameters entirely undocumented.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: 'Start the Schema Evolution Assistant workflow' with the specific function of helping 'safely evolve schemas by analyzing changes, suggesting strategies, and coordinating consumer updates.' It provides a verb ('Start') and resource ('Schema Evolution Assistant workflow') with specific outcomes. However, it doesn't explicitly distinguish itself from sibling tools like 'guided_schema_migration' or 'bulk_schema_update,' which may have overlapping domains.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives. It mentions the tool helps with 'safely evolving schemas,' but doesn't specify scenarios, prerequisites, or exclusions. With many sibling tools related to schema operations (e.g., 'guided_schema_migration,' 'bulk_schema_update'), the lack of comparative context leaves usage ambiguous.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server