guided_context_reorganization
Initiate schema reorganization across contexts to streamline Kafka Schema Registry management, ensuring structured and efficient schema alignment.
Instructions
Start the Context Reorganization workflow for reorganizing schemas across contexts
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- workflow_mcp_integration.py:276-306 (handler)Primary handler and registration (via @self.mcp.tool decorator) for the guided_context_reorganization tool. Starts the multi-step context_reorganization workflow.@self.mcp.tool(description="Start the Context Reorganization workflow for reorganizing schemas across contexts") async def guided_context_reorganization() -> str: """Convenience method to start Context Reorganization workflow.""" workflow_id = "context_reorganization" try: request = await self.multi_step_manager.start_workflow(workflow_id=workflow_id, initial_context={}) if request: return json.dumps( { "status": "started", "workflow_id": workflow_id, "workflow_name": "Context Reorganization", "request_id": request.id, "first_step": request.title, "description": request.description, "message": ( "Context Reorganization workflow started. This workflow will guide you through:\n" "1. Current context analysis\n" "2. Target context design\n" "3. Schema migration planning\n" "4. Context restructuring execution" ), } ) else: return json.dumps({"error": "Failed to start Context Reorganization workflow"}) except Exception as e: logger.error(f"Error starting Context Reorganization workflow: {str(e)}") return json.dumps({"error": f"Failed to start workflow: {str(e)}"})
- workflow_mcp_integration.py:402-440 (helper)Helper function that converts workflow responses into a structured execution plan for context reorganization strategies including merge, split, rename, and restructure.def execute_context_reorganization(responses: Dict[str, Any]) -> Dict[str, Any]: """Execute context reorganization based on workflow responses.""" strategy = responses.get("strategy") result = {"operation": "context_reorganization", "strategy": strategy, "status": "pending"} # Add strategy-specific parameters if strategy == "merge": result["source_contexts"] = [ctx.strip() for ctx in responses.get("source_contexts", "").split(",")] result["target_context"] = responses.get("target_context") result["handle_duplicates"] = responses.get("handle_duplicates") elif strategy == "split": result["source_context"] = responses.get("source_context") result["split_criteria"] = responses.get("split_criteria") result["target_contexts"] = [ctx.strip() for ctx in responses.get("target_contexts", "").split(",")] result["split_rules"] = responses.get("split_rules") elif strategy == "rename": rename_mappings: Dict[str, str] = {} mappings_str = responses.get("rename_mappings", "") if mappings_str and isinstance(mappings_str, str): mappings = [mapping.strip() for mapping in mappings_str.split(",")] for mapping in mappings: if ":" in mapping and isinstance(mapping, str): old, new = mapping.split(":", 1) if old and new: rename_mappings[old.strip()] = new.strip() result["rename_mappings"] = rename_mappings elif strategy == "restructure": result["structure_definition"] = responses.get("structure_definition") result["migration_strategy"] = responses.get("migration_strategy") # Add common options result["options"] = { "backup_first": responses.get("backup_first") == "true", "test_mode": responses.get("test_mode") == "true", "generate_report": responses.get("generate_report") == "true", } return result