Skip to main content
Glama

guided_context_reorganization

Initiate schema reorganization across contexts to streamline Kafka Schema Registry management, ensuring structured and efficient schema alignment.

Instructions

Start the Context Reorganization workflow for reorganizing schemas across contexts

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • Primary handler and registration (via @self.mcp.tool decorator) for the guided_context_reorganization tool. Starts the multi-step context_reorganization workflow.
    @self.mcp.tool(description="Start the Context Reorganization workflow for reorganizing schemas across contexts") async def guided_context_reorganization() -> str: """Convenience method to start Context Reorganization workflow.""" workflow_id = "context_reorganization" try: request = await self.multi_step_manager.start_workflow(workflow_id=workflow_id, initial_context={}) if request: return json.dumps( { "status": "started", "workflow_id": workflow_id, "workflow_name": "Context Reorganization", "request_id": request.id, "first_step": request.title, "description": request.description, "message": ( "Context Reorganization workflow started. This workflow will guide you through:\n" "1. Current context analysis\n" "2. Target context design\n" "3. Schema migration planning\n" "4. Context restructuring execution" ), } ) else: return json.dumps({"error": "Failed to start Context Reorganization workflow"}) except Exception as e: logger.error(f"Error starting Context Reorganization workflow: {str(e)}") return json.dumps({"error": f"Failed to start workflow: {str(e)}"})
  • Helper function that converts workflow responses into a structured execution plan for context reorganization strategies including merge, split, rename, and restructure.
    def execute_context_reorganization(responses: Dict[str, Any]) -> Dict[str, Any]: """Execute context reorganization based on workflow responses.""" strategy = responses.get("strategy") result = {"operation": "context_reorganization", "strategy": strategy, "status": "pending"} # Add strategy-specific parameters if strategy == "merge": result["source_contexts"] = [ctx.strip() for ctx in responses.get("source_contexts", "").split(",")] result["target_context"] = responses.get("target_context") result["handle_duplicates"] = responses.get("handle_duplicates") elif strategy == "split": result["source_context"] = responses.get("source_context") result["split_criteria"] = responses.get("split_criteria") result["target_contexts"] = [ctx.strip() for ctx in responses.get("target_contexts", "").split(",")] result["split_rules"] = responses.get("split_rules") elif strategy == "rename": rename_mappings: Dict[str, str] = {} mappings_str = responses.get("rename_mappings", "") if mappings_str and isinstance(mappings_str, str): mappings = [mapping.strip() for mapping in mappings_str.split(",")] for mapping in mappings: if ":" in mapping and isinstance(mapping, str): old, new = mapping.split(":", 1) if old and new: rename_mappings[old.strip()] = new.strip() result["rename_mappings"] = rename_mappings elif strategy == "restructure": result["structure_definition"] = responses.get("structure_definition") result["migration_strategy"] = responses.get("migration_strategy") # Add common options result["options"] = { "backup_first": responses.get("backup_first") == "true", "test_mode": responses.get("test_mode") == "true", "generate_report": responses.get("generate_report") == "true", } return result

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server