get_migration_status
Retrieve detailed migration status by specifying a migration ID, enabling users to monitor and manage Kafka Schema Registry migrations effectively.
Instructions
Get detailed status of a specific migration.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| migration_id | Yes |
Implementation Reference
- migration_tools.py:945-1016 (handler)The core handler function for the 'get_migration_status' tool. It fetches the task status from task_manager, constructs a structured response with progress estimation, error handling for single/multi mode, adds HATEOAS links, and ensures structured output compliance.@structured_output("get_migration_status", fallback_on_error=True) def get_migration_status_tool(migration_id: str, registry_mode: str) -> Dict[str, Any]: """ Get detailed status of a specific migration. Only available in multi-registry mode. Args: migration_id: The migration task ID to query Returns: Detailed migration status and progress information with structured validation and resource links """ try: if registry_mode == "single": return create_error_response( "Migration tracking not available in single-registry mode", details={"suggestion": "Use multi-registry configuration to enable migration tracking"}, error_code="SINGLE_REGISTRY_MODE_LIMITATION", registry_mode="single", ) # Get the specific migration task task = task_manager.get_task(migration_id) if task is None: return create_error_response( f"Migration '{migration_id}' not found", error_code="MIGRATION_NOT_FOUND", registry_mode=registry_mode, ) migration_status = { "migration_id": task.id, "status": task.status.value, "progress": task.progress, "started_at": task.started_at, "completed_at": task.completed_at, "error": task.error, "result": task.result, "metadata": task.metadata or {}, "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Add estimated time remaining if in progress if task.status == TaskStatus.RUNNING and task.progress > 0: elapsed = time.time() - ( datetime.fromisoformat(task.started_at).timestamp() if task.started_at else time.time() ) if task.progress > 5: # Only estimate if we have meaningful progress estimated_total = elapsed / (task.progress / 100) estimated_remaining = max(0, estimated_total - elapsed) migration_status["estimated_remaining_seconds"] = round(estimated_remaining, 1) # Add resource links - extract registry names from metadata metadata = task.metadata or {} source_registry = metadata.get("source_registry", "unknown") target_registry = metadata.get("target_registry", "unknown") migration_status = add_links_to_response( migration_status, "migration", source_registry, migration_id=migration_id, source_registry=source_registry, target_registry=target_registry, ) return migration_status except Exception as e: return create_error_response(str(e), error_code="MIGRATION_STATUS_FAILED", registry_mode=registry_mode)
- schema_definitions.py:1012-1012 (schema)Output schema definition for the get_migration_status tool, referencing the MIGRATE_SCHEMA_SCHEMA which provides JSON Schema validation for migration status responses."get_migration_status": MIGRATE_SCHEMA_SCHEMA,