Skip to main content
Glama

guided_schema_migration

Initiate a structured workflow for migrating schemas using the Schema Migration Wizard on the MCP Kafka Schema Registry server.

Instructions

Start the Schema Migration Wizard workflow for guided schema migration

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • The main handler function for the guided_schema_migration MCP tool. It registers the tool using @self.mcp.tool and starts the schema_migration_wizard workflow via MultiStepElicitationManager.
    @self.mcp.tool(description="Start the Schema Migration Wizard workflow for guided schema migration") async def guided_schema_migration() -> str: """Convenience method to start Schema Migration workflow.""" workflow_id = "schema_migration_wizard" try: request = await self.multi_step_manager.start_workflow(workflow_id=workflow_id, initial_context={}) if request: return json.dumps( { "status": "started", "workflow_id": workflow_id, "workflow_name": "Schema Migration Wizard", "request_id": request.id, "first_step": request.title, "description": request.description, "message": ( "Schema Migration Wizard started. This workflow will guide you through:\n" "1. Source and target registry selection\n" "2. Schema selection and validation\n" "3. Migration planning and execution\n" "4. Verification and rollback procedures" ), } ) else: return json.dumps({"error": "Failed to start Schema Migration workflow"}) except Exception as e: logger.error(f"Error starting Schema Migration workflow: {str(e)}") return json.dumps({"error": f"Failed to start workflow: {str(e)}"})
  • Helper function that processes the workflow responses from the schema migration wizard to generate an execution plan for the actual migration operations.
    def execute_schema_migration(responses: Dict[str, Any]) -> Dict[str, Any]: """Execute schema migration based on workflow responses.""" migration_type = responses.get("migration_type") source_registry = responses.get("source_registry") target_registry = responses.get("target_registry") result = { "operation": "schema_migration", "migration_type": migration_type, "source": source_registry, "target": target_registry, "status": "pending", } # Add specific migration parameters if migration_type == "single_schema": result["schema_name"] = responses.get("schema_name") result["version"] = responses.get("version", "latest") elif migration_type == "bulk_migration": result["pattern"] = responses.get("schema_pattern") result["include_all_versions"] = responses.get("include_all_versions") result["context_filter"] = responses.get("context_filter") elif migration_type == "context_migration": result["source_context"] = responses.get("source_context") result["include_dependencies"] = responses.get("include_dependencies") # Add migration options result["options"] = { "preserve_ids": responses.get("preserve_ids") == "true", "conflict_resolution": responses.get("conflict_resolution"), "create_backup": responses.get("create_backup") == "true", "dry_run": responses.get("dry_run") == "true", } return result

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server