guided_schema_migration
Initiate a structured workflow for migrating schemas using the Schema Migration Wizard on the MCP Kafka Schema Registry server.
Instructions
Start the Schema Migration Wizard workflow for guided schema migration
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- workflow_mcp_integration.py:244-274 (handler)The main handler function for the guided_schema_migration MCP tool. It registers the tool using @self.mcp.tool and starts the schema_migration_wizard workflow via MultiStepElicitationManager.@self.mcp.tool(description="Start the Schema Migration Wizard workflow for guided schema migration") async def guided_schema_migration() -> str: """Convenience method to start Schema Migration workflow.""" workflow_id = "schema_migration_wizard" try: request = await self.multi_step_manager.start_workflow(workflow_id=workflow_id, initial_context={}) if request: return json.dumps( { "status": "started", "workflow_id": workflow_id, "workflow_name": "Schema Migration Wizard", "request_id": request.id, "first_step": request.title, "description": request.description, "message": ( "Schema Migration Wizard started. This workflow will guide you through:\n" "1. Source and target registry selection\n" "2. Schema selection and validation\n" "3. Migration planning and execution\n" "4. Verification and rollback procedures" ), } ) else: return json.dumps({"error": "Failed to start Schema Migration workflow"}) except Exception as e: logger.error(f"Error starting Schema Migration workflow: {str(e)}") return json.dumps({"error": f"Failed to start workflow: {str(e)}"})
- workflow_mcp_integration.py:365-399 (helper)Helper function that processes the workflow responses from the schema migration wizard to generate an execution plan for the actual migration operations.def execute_schema_migration(responses: Dict[str, Any]) -> Dict[str, Any]: """Execute schema migration based on workflow responses.""" migration_type = responses.get("migration_type") source_registry = responses.get("source_registry") target_registry = responses.get("target_registry") result = { "operation": "schema_migration", "migration_type": migration_type, "source": source_registry, "target": target_registry, "status": "pending", } # Add specific migration parameters if migration_type == "single_schema": result["schema_name"] = responses.get("schema_name") result["version"] = responses.get("version", "latest") elif migration_type == "bulk_migration": result["pattern"] = responses.get("schema_pattern") result["include_all_versions"] = responses.get("include_all_versions") result["context_filter"] = responses.get("context_filter") elif migration_type == "context_migration": result["source_context"] = responses.get("source_context") result["include_dependencies"] = responses.get("include_dependencies") # Add migration options result["options"] = { "preserve_ids": responses.get("preserve_ids") == "true", "conflict_resolution": responses.get("conflict_resolution"), "create_backup": responses.get("create_backup") == "true", "dry_run": responses.get("dry_run") == "true", } return result