guided_disaster_recovery
Configure disaster recovery strategies for Kafka Schema Registry by initiating the setup workflow to ensure data availability and resilience.
Instructions
Start the Disaster Recovery Setup workflow for configuring DR strategies
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- workflow_mcp_integration.py:308-338 (handler)The primary MCP tool handler for 'guided_disaster_recovery'. This async function is decorated with @self.mcp.tool and initiates the multi-step 'disaster_recovery_setup' workflow, returning a JSON response with workflow status and first step details.@self.mcp.tool(description="Start the Disaster Recovery Setup workflow for configuring DR strategies") async def guided_disaster_recovery() -> str: """Convenience method to start Disaster Recovery workflow.""" workflow_id = "disaster_recovery_setup" try: request = await self.multi_step_manager.start_workflow(workflow_id=workflow_id, initial_context={}) if request: return json.dumps( { "status": "started", "workflow_id": workflow_id, "workflow_name": "Disaster Recovery Setup", "request_id": request.id, "first_step": request.title, "description": request.description, "message": ( "Disaster Recovery Setup workflow started. This workflow will guide you through:\n" "1. Current infrastructure assessment\n" "2. Backup strategy configuration\n" "3. Recovery procedures planning\n" "4. Testing and validation setup" ), } ) else: return json.dumps({"error": "Failed to start Disaster Recovery workflow"}) except Exception as e: logger.error(f"Error starting Disaster Recovery workflow: {str(e)}") return json.dumps({"error": f"Failed to start workflow: {str(e)}"})
- workflow_mcp_integration.py:443-486 (helper)Supporting helper function that takes the completed workflow responses and constructs a detailed disaster recovery execution plan/strategy based on user inputs during the guided workflow.def execute_disaster_recovery_setup(responses: Dict[str, Any]) -> Dict[str, Any]: """Execute disaster recovery setup based on workflow responses.""" dr_strategy = responses.get("dr_strategy") result = {"operation": "disaster_recovery_setup", "strategy": dr_strategy, "status": "pending"} # Add strategy-specific configuration if dr_strategy == "active_passive": result["config"] = { "primary_registry": responses.get("primary_registry"), "standby_registry": responses.get("standby_registry"), "replication_interval": responses.get("replication_interval"), "failover_mode": responses.get("failover_mode"), } elif dr_strategy == "active_active": result["config"] = { "active_registries": [reg.strip() for reg in responses.get("active_registries", "").split(",")], "conflict_resolution": responses.get("conflict_resolution"), "sync_topology": responses.get("sync_topology"), } elif dr_strategy == "backup_restore": result["config"] = { "backup_schedule": responses.get("backup_schedule"), "backup_location": responses.get("backup_location"), "retention_policy": responses.get("retention_policy"), "encryption": responses.get("encryption") == "true", } elif dr_strategy == "multi_region": result["config"] = { "regions": [region.strip() for region in responses.get("regions", "").split(",")], "primary_region": responses.get("primary_region"), "data_residency": responses.get("data_residency") == "true", "cross_region_replication": responses.get("cross_region_replication"), } # Add common DR options result["options"] = { "enable_monitoring": responses.get("enable_monitoring") == "true", "run_dr_drill": responses.get("run_dr_drill") == "true", "generate_runbook": responses.get("generate_runbook") == "true", "initial_sync": responses.get("initial_sync") == "true", } return result