guided_schema_migration
Initiate a structured workflow for migrating schemas using the Schema Migration Wizard on the MCP Kafka Schema Registry server.
Instructions
Start the Schema Migration Wizard workflow for guided schema migration
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- workflow_mcp_integration.py:244-274 (handler)The main handler function for the guided_schema_migration MCP tool. It registers the tool using @self.mcp.tool and starts the schema_migration_wizard workflow via MultiStepElicitationManager.
@self.mcp.tool(description="Start the Schema Migration Wizard workflow for guided schema migration") async def guided_schema_migration() -> str: """Convenience method to start Schema Migration workflow.""" workflow_id = "schema_migration_wizard" try: request = await self.multi_step_manager.start_workflow(workflow_id=workflow_id, initial_context={}) if request: return json.dumps( { "status": "started", "workflow_id": workflow_id, "workflow_name": "Schema Migration Wizard", "request_id": request.id, "first_step": request.title, "description": request.description, "message": ( "Schema Migration Wizard started. This workflow will guide you through:\n" "1. Source and target registry selection\n" "2. Schema selection and validation\n" "3. Migration planning and execution\n" "4. Verification and rollback procedures" ), } ) else: return json.dumps({"error": "Failed to start Schema Migration workflow"}) except Exception as e: logger.error(f"Error starting Schema Migration workflow: {str(e)}") return json.dumps({"error": f"Failed to start workflow: {str(e)}"}) - workflow_mcp_integration.py:365-399 (helper)Helper function that processes the workflow responses from the schema migration wizard to generate an execution plan for the actual migration operations.
def execute_schema_migration(responses: Dict[str, Any]) -> Dict[str, Any]: """Execute schema migration based on workflow responses.""" migration_type = responses.get("migration_type") source_registry = responses.get("source_registry") target_registry = responses.get("target_registry") result = { "operation": "schema_migration", "migration_type": migration_type, "source": source_registry, "target": target_registry, "status": "pending", } # Add specific migration parameters if migration_type == "single_schema": result["schema_name"] = responses.get("schema_name") result["version"] = responses.get("version", "latest") elif migration_type == "bulk_migration": result["pattern"] = responses.get("schema_pattern") result["include_all_versions"] = responses.get("include_all_versions") result["context_filter"] = responses.get("context_filter") elif migration_type == "context_migration": result["source_context"] = responses.get("source_context") result["include_dependencies"] = responses.get("include_dependencies") # Add migration options result["options"] = { "preserve_ids": responses.get("preserve_ids") == "true", "conflict_resolution": responses.get("conflict_resolution"), "create_backup": responses.get("create_backup") == "true", "dry_run": responses.get("dry_run") == "true", } return result