Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

bulk_schema_migration

Migrate schemas between contexts or registries with pattern-based selection, preserving schema IDs. Preview changes and enable rollback for controlled migration processes.

Instructions

Migrate schemas between contexts or registries.

Supports pattern-based selection and maintains schema IDs. Includes preview and rollback capabilities.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dry_runNo
preserve_idsNo
schema_patternNo
source_contextNo
source_registryNo
target_contextNo
target_registryNo

Implementation Reference

  • Input schema defining parameters for bulk schema migration tool, including source/target contexts/registries, pattern, and options.
    inputSchema={
        "type": "object",
        "properties": {
            "source_context": {"type": "string", "description": "Source context (default: current context)"},
            "target_context": {"type": "string", "description": "Target context for migration"},
            "source_registry": {
                "type": "string",
                "description": "Source registry (for cross-registry migration)",
            },
            "target_registry": {
                "type": "string",
                "description": "Target registry (for cross-registry migration)",
            },
            "schema_pattern": {"type": "string", "description": "Pattern to match schemas for migration"},
            "preserve_ids": {
                "type": "boolean",
                "default": True,
                "description": "Preserve schema IDs during migration",
            },
            "dry_run": {
                "type": "boolean",
                "default": True,
                "description": "Preview migration without executing",
            },
        },
  • Registers the 'bulk_schema_migration' MCP tool with name, description, and input schema.
    tools.append(
        Tool(
            name="bulk_schema_migration",
            description=(
                "Migrate schemas between contexts or registries. "
                "Supports pattern-based selection and maintains schema IDs. "
                "Includes preview and rollback capabilities."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "source_context": {"type": "string", "description": "Source context (default: current context)"},
                    "target_context": {"type": "string", "description": "Target context for migration"},
                    "source_registry": {
                        "type": "string",
                        "description": "Source registry (for cross-registry migration)",
                    },
                    "target_registry": {
                        "type": "string",
                        "description": "Target registry (for cross-registry migration)",
                    },
                    "schema_pattern": {"type": "string", "description": "Pattern to match schemas for migration"},
                    "preserve_ids": {
                        "type": "boolean",
                        "default": True,
                        "description": "Preserve schema IDs during migration",
                    },
                    "dry_run": {
                        "type": "boolean",
                        "default": True,
                        "description": "Preview migration without executing",
                    },
                },
            },
        )
    )
  • MCP tool handler dispatcher that invokes the BulkOperationsWizard for MIGRATION operation.
    elif tool_name == "bulk_schema_migration":
        # Direct migration with parameters
        return await wizard.start_wizard(BulkOperationType.MIGRATION)
  • Core implementation of bulk schema migration: elicits source/target, schemas, options; generates preview; confirms and executes via generic bulk operation executor.
    async def _handle_bulk_migration(self) -> Dict[str, Any]:
        """Handle bulk migration operations"""
        # Step 1: Select source and target
        source_target = await self._elicit_migration_source_target()  # type: ignore
    
        # Step 2: Select schemas to migrate
        schemas = await self._elicit_schema_selection(
            "Select schemas to migrate", context=source_target["source_context"]
        )
    
        # Step 3: Migration options
        migration_options = await self._elicit_migration_options()  # type: ignore
    
        # Step 4: Preview migration
        preview = await self._generate_preview(
            BulkOperationType.MIGRATION, schemas, {**source_target, **migration_options}
        )
    
        # Step 5: Confirm operation
        if not await self._confirm_operation(preview):
            return {"status": "cancelled", "reason": "User cancelled operation"}
    
        # Step 6: Execute migration
        return await self._execute_bulk_operation(
            BulkOperationType.MIGRATION, schemas, {**source_target, **migration_options}, preview
        )
  • Enum defining operation types, including MIGRATION used by the bulk_schema_migration tool.
    class BulkOperationType(Enum):
        """Supported bulk operation types"""
    
        SCHEMA_UPDATE = "schema_update"
        MIGRATION = "migration"
        CLEANUP = "cleanup"
        CONFIGURATION = "configuration"
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server