bulk_schema_cleanup
Clean up Kafka schemas in bulk with safety checks, including active consumer detection. Supports test schema cleanup, deprecated schema removal, and version purging with configurable options.
Instructions
Clean up schemas in bulk with safety checks.
Detects active consumers and provides options for handling them. Supports test schema cleanup, deprecated schema removal, and version purging.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| check_consumers | No | ||
| cleanup_type | No | test | |
| force | No | ||
| keep_versions | No | ||
| pattern | No |
Implementation Reference
- bulk_operations_wizard.py:186-217 (handler)Core handler for bulk schema cleanup operations. Orchestrates elicitation of cleanup type, items, options, preview, consumer checks, confirmation, and execution.async def _handle_bulk_cleanup(self) -> Dict[str, Any]: """Handle bulk cleanup operations""" # Step 1: Select cleanup type cleanup_type = await self._elicit_cleanup_type() # Step 2: Select items to clean up items = await self._elicit_cleanup_items(cleanup_type) # Step 3: Cleanup options cleanup_options = await self._elicit_cleanup_options(cleanup_type) # Step 4: Preview cleanup preview = await self._generate_preview( BulkOperationType.CLEANUP, items, {"cleanup_type": cleanup_type, **cleanup_options} ) # Step 5: Safety check for active consumers if preview.consumer_impact: action = await self._elicit_consumer_impact_action(preview.consumer_impact) if action == "cancel": return {"status": "cancelled", "reason": "Active consumers detected"} cleanup_options["consumer_action"] = action # Step 6: Confirm operation if not await self._confirm_operation(preview, extra_warnings=True): return {"status": "cancelled", "reason": "User cancelled operation"} # Step 7: Execute cleanup return await self._execute_bulk_operation( BulkOperationType.CLEANUP, items, {"cleanup_type": cleanup_type, **cleanup_options}, preview )
- Input schema validation for the bulk_schema_cleanup tool, defining parameters like cleanup_type, pattern, keep_versions, check_consumers, and force.inputSchema={ "type": "object", "properties": { "cleanup_type": { "type": "string", "enum": ["test", "deprecated", "old_versions", "pattern"], "description": "Type of cleanup to perform", }, "pattern": { "type": "string", "description": "Custom pattern for cleanup (if cleanup_type is 'pattern')", }, "keep_versions": { "type": "integer", "default": 3, "description": "Number of recent versions to keep", }, "check_consumers": { "type": "boolean", "default": True, "description": "Check for active consumers before cleanup", }, "force": { "type": "boolean", "default": False, "description": "Force cleanup even with active consumers (dangerous)", }, }, "required": ["cleanup_type"], },
- bulk_operations_mcp_integration.py:78-117 (registration)MCP tool registration for bulk_schema_cleanup, including name, description, and schema.tools.append( Tool( name="bulk_schema_cleanup", description=( "Clean up schemas in bulk with safety checks. " "Detects active consumers and provides options for handling them. " "Supports test schema cleanup, deprecated schema removal, and version purging." ), inputSchema={ "type": "object", "properties": { "cleanup_type": { "type": "string", "enum": ["test", "deprecated", "old_versions", "pattern"], "description": "Type of cleanup to perform", }, "pattern": { "type": "string", "description": "Custom pattern for cleanup (if cleanup_type is 'pattern')", }, "keep_versions": { "type": "integer", "default": 3, "description": "Number of recent versions to keep", }, "check_consumers": { "type": "boolean", "default": True, "description": "Check for active consumers before cleanup", }, "force": { "type": "boolean", "default": False, "description": "Force cleanup even with active consumers (dangerous)", }, }, "required": ["cleanup_type"], }, ) )
- MCP dispatch handler for bulk_schema_cleanup tool call, delegates to BulkOperationsWizard.start_wizard with CLEANUP type.elif tool_name == "bulk_schema_cleanup": # Direct cleanup with parameters return await wizard.start_wizard(BulkOperationType.CLEANUP)
- bulk_operations_wizard.py:77-85 (helper)Registers the cleanup handler (_handle_bulk_cleanup) for BulkOperationType.CLEANUP in the wizard's operation handlers dictionary.def _register_handlers(self) -> Dict[BulkOperationType, Callable]: """Register operation handlers""" return { BulkOperationType.SCHEMA_UPDATE: self._handle_bulk_schema_update, BulkOperationType.MIGRATION: self._handle_bulk_migration, BulkOperationType.CLEANUP: self._handle_bulk_cleanup, BulkOperationType.CONFIGURATION: self._handle_bulk_configuration, }