bulk_schema_update
Update multiple Kafka schemas in bulk with interactive guidance, pattern matching, and support for compatibility settings, naming conventions, and metadata updates. Use dry runs for testing.
Instructions
Update schemas in bulk with interactive guidance.
Supports compatibility settings, naming conventions, and metadata updates. Pattern matching supported (e.g., test-, deprecated-).
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| batch_size | No | ||
| dry_run | No | ||
| pattern | No | ||
| update_type | No | compatibility |
Implementation Reference
- bulk_operations_wizard.py:138-158 (handler)Core handler implementation for bulk_schema_update tool. Orchestrates schema selection, update type elicitation, parameter gathering, preview generation, user confirmation, and operation execution.async def _handle_bulk_schema_update(self) -> Dict[str, Any]: """Handle bulk schema update operations""" # Step 1: Select schemas schemas = await self._elicit_schema_selection("Select schemas to update", allow_patterns=True) # Step 2: Select update type update_type = await self._elicit_update_type() # Step 3: Get update parameters update_params = await self._elicit_update_parameters(update_type) # Step 4: Preview changes preview = await self._generate_preview(BulkOperationType.SCHEMA_UPDATE, schemas, update_params) # Step 5: Confirm operation if not await self._confirm_operation(preview): return {"status": "cancelled", "reason": "User cancelled operation"} # Step 6: Execute operation return await self._execute_bulk_operation(BulkOperationType.SCHEMA_UPDATE, schemas, update_params, preview)
- bulk_operations_mcp_integration.py:48-75 (registration)MCP tool registration for 'bulk_schema_update', including name, description, and input schema definition.tools.append( Tool( name="bulk_schema_update", description=( "Update schemas in bulk with interactive guidance. " "Supports compatibility settings, naming conventions, and metadata updates. " "Pattern matching supported (e.g., test-*, deprecated-*)." ), inputSchema={ "type": "object", "properties": { "pattern": {"type": "string", "description": "Schema name pattern (e.g., 'test-*', 'prod-*')"}, "update_type": { "type": "string", "enum": ["compatibility", "naming", "metadata"], "description": "Type of update to perform", }, "dry_run": {"type": "boolean", "default": True, "description": "Preview changes without applying"}, "batch_size": { "type": "integer", "default": 10, "description": "Number of schemas to process at once", }, }, "required": ["update_type"], }, ) )
- bulk_operations_wizard.py:77-85 (registration)Internal registration of the schema update handler within the BulkOperationsWizard class.def _register_handlers(self) -> Dict[BulkOperationType, Callable]: """Register operation handlers""" return { BulkOperationType.SCHEMA_UPDATE: self._handle_bulk_schema_update, BulkOperationType.MIGRATION: self._handle_bulk_migration, BulkOperationType.CLEANUP: self._handle_bulk_cleanup, BulkOperationType.CONFIGURATION: self._handle_bulk_configuration, }
- Tool dispatcher that handles the 'bulk_schema_update' tool call by invoking the wizard's start_wizard with SCHEMA_UPDATE type.elif tool_name == "bulk_schema_update": # Direct schema update with parameters # In real implementation, would pass parameters to wizard return await wizard.start_wizard(BulkOperationType.SCHEMA_UPDATE)