Skip to main content
Glama

MCP Kafka Schema Reg

MIT License
23
  • Apple
  • Linux
migration_tools.py51.4 kB
#!/usr/bin/env python3 """ Migration Tools Module - Updated with Resource Linking Handles schema and context migration operations between registries with structured tool output support per MCP 2025-06-18 specification including resource linking. Provides schema migration, context migration, and migration status tracking with JSON Schema validation, type-safe responses, and HATEOAS navigation links. """ import json import logging import time from datetime import datetime from typing import Any, Dict, List, Optional from resource_linking import add_links_to_response from schema_validation import ( create_error_response, structured_output, ) from task_management import TaskStatus, TaskType, task_manager # Configure logging logger = logging.getLogger(__name__) class IDPreservationError(Exception): """Exception raised when ID preservation fails and requires user confirmation.""" def __init__(self, message: str, reason: str, original_error: str = None): super().__init__(message) self.reason = reason self.original_error = original_error class MigrationConfirmationRequired(Exception): """Exception raised when migration requires user confirmation to proceed.""" def __init__(self, message: str, confirmation_details: Dict[str, Any]): super().__init__(message) self.confirmation_details = confirmation_details def _get_registry_name_for_linking(registry_mode: str, registry_name: Optional[str] = None) -> str: """Helper function to get registry name for linking.""" if registry_mode == "single": return "default" elif registry_name: return registry_name else: return "unknown" @structured_output("migrate_schema", fallback_on_error=True) def migrate_schema_tool( subject: str, source_registry: str, target_registry: str, registry_manager, registry_mode: str, dry_run: bool = False, preserve_ids: bool = True, source_context: str = ".", target_context: str = ".", versions: Optional[List[int]] = None, migrate_all_versions: bool = False, ) -> Dict[str, Any]: """ Migrate a schema from one registry to another. Only available in multi-registry mode. **MEDIUM-DURATION OPERATION** - Uses task queue pattern. This operation runs asynchronously and returns a task_id immediately. Use get_task_status(task_id) to monitor progress and get results. Args: subject: The subject name source_registry: Source registry name target_registry: Target registry name dry_run: Preview migration without executing preserve_ids: Preserve original schema IDs (requires IMPORT mode) source_context: Source context (default: ".") target_context: Target context (default: ".") versions: Optional list of specific versions to migrate migrate_all_versions: Migrate all versions instead of just latest Returns: Task information with task_id for monitoring progress (multi-registry mode) or simple result (single-registry mode) with structured validation and resource links """ try: if registry_mode == "single": return create_error_response( "Schema migration between registries not available in single-registry mode", details={"suggestion": "Use multi-registry configuration to enable cross-registry migration"}, error_code="SINGLE_REGISTRY_MODE_LIMITATION", registry_mode="single", ) # Multi-registry mode: use task queue # Create migration task task = task_manager.create_task( TaskType.MIGRATION, metadata={ "operation": "migrate_schema", "subject": subject, "source_registry": source_registry, "target_registry": target_registry, "source_context": source_context, "target_context": target_context, "migrate_all_versions": migrate_all_versions, "preserve_ids": preserve_ids, "dry_run": dry_run, }, ) # Implement basic schema migration for testing try: # Check registry connections source_client = registry_manager.get_registry(source_registry) target_client = registry_manager.get_registry(target_registry) if not source_client: task.status = TaskStatus.FAILED task.error = f"Source registry '{source_registry}' not found" return create_error_response( f"Source registry '{source_registry}' not found", error_code="SOURCE_REGISTRY_NOT_FOUND", registry_mode="multi", ) if not target_client: task.status = TaskStatus.FAILED task.error = f"Target registry '{target_registry}' not found" return create_error_response( f"Target registry '{target_registry}' not found", error_code="TARGET_REGISTRY_NOT_FOUND", registry_mode="multi", ) # Mark task as running task.status = TaskStatus.RUNNING task.started_at = datetime.now().isoformat() # Perform actual schema migration try: migration_result = _execute_schema_migration( subject=subject, source_client=source_client, target_client=target_client, source_context=source_context, target_context=target_context, versions=versions, migrate_all_versions=migrate_all_versions, preserve_ids=preserve_ids, dry_run=dry_run, force_without_id_preservation=False, # User confirmation required by default ) except MigrationConfirmationRequired as e: # Handle confirmation required case task.status = TaskStatus.FAILED task.error = str(e) migration_result = { "error": str(e), "error_type": "confirmation_required", "confirmation_details": e.confirmation_details, "user_prompt": ( "Migration requires user confirmation due to ID preservation failure. " "Please confirm if you want to proceed without ID preservation." ), "action_required": "Call migrate_schema_tool again with preserve_ids=False to proceed without ID preservation, or resolve the permission issue first.", } task.completed_at = datetime.now().isoformat() # Add structured output metadata migration_result.update( { "migration_id": task.id, "subject": subject, "source_registry": source_registry, "target_registry": target_registry, "source_context": source_context, "target_context": target_context, "status": task.status.value, "dry_run": dry_run, "registry_mode": "multi", "mcp_protocol_version": "2025-06-18", } ) # Add resource links migration_result = add_links_to_response( migration_result, "migration", source_registry, migration_id=task.id, source_registry=source_registry, target_registry=target_registry, ) return migration_result # Update task with result if "error" in migration_result: task.status = TaskStatus.FAILED task.error = migration_result["error"] else: task.status = TaskStatus.COMPLETED task.progress = 100.0 task.result = migration_result task.completed_at = datetime.now().isoformat() # Add structured output metadata to result migration_result.update( { "migration_id": task.id, "subject": subject, "source_registry": source_registry, "target_registry": target_registry, "source_context": source_context, "target_context": target_context, "status": task.status.value, "dry_run": dry_run, "registry_mode": "multi", "mcp_protocol_version": "2025-06-18", } ) # Add resource links migration_result = add_links_to_response( migration_result, "migration", source_registry, migration_id=task.id, source_registry=source_registry, target_registry=target_registry, ) return migration_result except MigrationConfirmationRequired: # Re-raise confirmation required exceptions - don't convert them to generic errors raise except Exception as e: return create_error_response( f"Migration setup failed: {str(e)}", error_code="MIGRATION_SETUP_FAILED", registry_mode="multi", ) except MigrationConfirmationRequired as e: # Handle confirmation required at the outer level - just return confirmation response return { "error": str(e), "error_type": "confirmation_required", "confirmation_details": e.confirmation_details, "user_prompt": ( "Migration requires user confirmation due to ID preservation failure. " "Please confirm if you want to proceed without ID preservation." ), "action_required": "Call confirm_migration_without_ids_tool to proceed without ID preservation, or resolve the permission issue first.", } except Exception as e: return create_error_response(str(e), error_code="MIGRATION_FAILED", registry_mode=registry_mode) @structured_output("confirm_migration_without_ids", fallback_on_error=True) def confirm_migration_without_ids_tool( subject: str, source_registry: str, target_registry: str, registry_manager, registry_mode: str, dry_run: bool = False, source_context: str = ".", target_context: str = ".", versions: Optional[List[int]] = None, migrate_all_versions: bool = False, ) -> Dict[str, Any]: """ Confirm and proceed with schema migration without ID preservation. Use this after receiving a confirmation_required error from migrate_schema_tool. **MEDIUM-DURATION OPERATION** - Uses task queue pattern. This operation runs asynchronously and returns a task_id immediately. Use get_task_status(task_id) to monitor progress and get results. Args: subject: The subject name source_registry: Source registry name target_registry: Target registry name dry_run: Preview migration without executing source_context: Source context (default: ".") target_context: Target context (default: ".") versions: Optional list of specific versions to migrate migrate_all_versions: Migrate all versions instead of just latest Returns: Task information with task_id for monitoring progress (multi-registry mode) or simple result (single-registry mode) with structured validation and resource links """ try: if registry_mode == "single": return create_error_response( "Schema migration between registries not available in single-registry mode", details={"suggestion": "Use multi-registry configuration to enable cross-registry migration"}, error_code="SINGLE_REGISTRY_MODE_LIMITATION", registry_mode="single", ) # Multi-registry mode: use task queue # Create migration task task = task_manager.create_task( TaskType.MIGRATION, metadata={ "operation": "confirm_migration_without_ids", "subject": subject, "source_registry": source_registry, "target_registry": target_registry, "source_context": source_context, "target_context": target_context, "migrate_all_versions": migrate_all_versions, "preserve_ids": False, # Explicitly disabled "dry_run": dry_run, "force_without_id_preservation": True, # Force proceed without confirmation }, ) # Implement schema migration without ID preservation try: # Check registry connections source_client = registry_manager.get_registry(source_registry) target_client = registry_manager.get_registry(target_registry) if not source_client: task.status = TaskStatus.FAILED task.error = f"Source registry '{source_registry}' not found" return create_error_response( f"Source registry '{source_registry}' not found", error_code="SOURCE_REGISTRY_NOT_FOUND", registry_mode="multi", ) if not target_client: task.status = TaskStatus.FAILED task.error = f"Target registry '{target_registry}' not found" return create_error_response( f"Target registry '{target_registry}' not found", error_code="TARGET_REGISTRY_NOT_FOUND", registry_mode="multi", ) # Mark task as running task.status = TaskStatus.RUNNING task.started_at = datetime.now().isoformat() # Perform actual schema migration (with force flag to skip confirmation) migration_result = _execute_schema_migration( subject=subject, source_client=source_client, target_client=target_client, source_context=source_context, target_context=target_context, versions=versions, migrate_all_versions=migrate_all_versions, preserve_ids=False, # Explicitly disabled dry_run=dry_run, force_without_id_preservation=True, # Force proceed without confirmation ) # Update task with result if "error" in migration_result: task.status = TaskStatus.FAILED task.error = migration_result["error"] else: task.status = TaskStatus.COMPLETED task.progress = 100.0 task.result = migration_result task.completed_at = datetime.now().isoformat() # Add structured output metadata to result migration_result.update( { "migration_id": task.id, "subject": subject, "source_registry": source_registry, "target_registry": target_registry, "source_context": source_context, "target_context": target_context, "status": task.status.value, "dry_run": dry_run, "preserve_ids": False, "confirmed_without_id_preservation": True, "registry_mode": "multi", "mcp_protocol_version": "2025-06-18", } ) # Add resource links migration_result = add_links_to_response( migration_result, "migration", source_registry, migration_id=task.id, source_registry=source_registry, target_registry=target_registry, ) return migration_result except Exception as e: return create_error_response( f"Migration setup failed: {str(e)}", error_code="MIGRATION_SETUP_FAILED", registry_mode="multi", ) except Exception as e: return create_error_response(str(e), error_code="CONFIRMED_MIGRATION_FAILED", registry_mode=registry_mode) def _execute_schema_migration( subject: str, source_client, target_client, source_context: str = ".", target_context: str = ".", versions: Optional[List[int]] = None, migrate_all_versions: bool = False, preserve_ids: bool = True, dry_run: bool = False, force_without_id_preservation: bool = False, ) -> Dict[str, Any]: """ Execute actual schema migration between registries with proper sparse version preservation. Args: subject: The subject name to migrate source_client: Source registry client target_client: Target registry client source_context: Source context target_context: Target context versions: Specific versions to migrate (overrides migrate_all_versions) migrate_all_versions: Whether to migrate all versions preserve_ids: Whether to preserve schema IDs (requires IMPORT mode) dry_run: Whether to simulate without making changes force_without_id_preservation: Whether to proceed without ID preservation when it fails Returns: Migration results with counts and status """ try: logger.info(f"Starting schema migration for subject '{subject}'") logger.info(f"Source: {source_client.config.name}, Target: {target_client.config.name}") logger.info(f"Preserve IDs: {preserve_ids}, Dry run: {dry_run}") # For target operations, we may need to extract the bare subject name # if we're migrating to a different context target_subject_name = subject # If the subject has a context prefix, extract the bare subject name for target if subject.startswith(":.") and ":" in subject[2:]: # Format is :.context:subject parts = subject.split(":", 2) if len(parts) >= 3: # Extract just the subject name for target registration target_subject_name = parts[2] logger.info(f"Subject has context prefix. Full name: {subject}, bare name: {target_subject_name}") # Get subject versions from source try: if source_context and source_context != ".": source_versions_url = ( f"{source_client.config.url}/contexts/{source_context}/subjects/{subject}/versions" ) else: source_versions_url = f"{source_client.config.url}/subjects/{subject}/versions" response = source_client.session.get( source_versions_url, auth=source_client.auth, headers=source_client.headers, timeout=10, ) if response.status_code == 404: return { "total_versions": 0, "successful_migrations": 0, "failed_migrations": 0, "skipped_migrations": 0, "message": f"Subject '{subject}' not found in source registry", "dry_run": dry_run, } elif response.status_code != 200: return { "error": f"Failed to get versions from source: HTTP {response.status_code}", "total_versions": 0, "successful_migrations": 0, "failed_migrations": 0, "skipped_migrations": 0, "dry_run": dry_run, } available_versions = response.json() except Exception as e: return { "error": f"Failed to get source versions: {str(e)}", "total_versions": 0, "successful_migrations": 0, "failed_migrations": 0, "skipped_migrations": 0, "dry_run": dry_run, } # Determine which versions to migrate if versions: versions_to_migrate = [v for v in versions if v in available_versions] elif migrate_all_versions: versions_to_migrate = available_versions else: # Migrate only latest version versions_to_migrate = [max(available_versions)] if available_versions else [] if not versions_to_migrate: return { "total_versions": len(available_versions), "successful_migrations": 0, "failed_migrations": 0, "skipped_migrations": 0, "message": "No versions to migrate", "dry_run": dry_run, "debug_info": { "available_versions": available_versions, "requested_versions": versions, "migrate_all_versions": migrate_all_versions, }, } logger.info(f"Versions to migrate: {sorted(versions_to_migrate)}") # Handle IMPORT mode for ID preservation original_target_mode = None id_preservation_error = None if preserve_ids: try: # Get current target registry mode mode_response = target_client.session.get( f"{target_client.config.url}/mode", auth=target_client.auth, headers=target_client.headers, timeout=10, ) if mode_response.status_code == 200: original_target_mode = mode_response.json().get("mode", "READWRITE") # Set target registry to IMPORT mode for ID preservation if original_target_mode != "IMPORT": logger.info( f"Setting target registry to IMPORT mode for ID preservation " f"(was {original_target_mode})" ) import_response = target_client.session.put( f"{target_client.config.url}/mode", json={"mode": "IMPORT"}, auth=target_client.auth, headers={ **target_client.headers, "Content-Type": "application/vnd.schemaregistry.v1+json", }, timeout=10, ) if import_response.status_code != 200: id_preservation_error = import_response.text logger.warning(f"Failed to set IMPORT mode: {import_response.text}.") # Check if we should ask for user confirmation if not force_without_id_preservation: confirmation_details = { "subject": subject, "source_registry": source_client.config.name, "target_registry": target_client.config.name, "source_context": source_context, "target_context": target_context, "versions_to_migrate": versions_to_migrate, "preserve_ids_requested": True, "id_preservation_error": id_preservation_error, "error_reason": "Permission denied to set IMPORT mode", "options": { "continue_without_id_preservation": "Proceed with migration but schemas will get new IDs", "cancel_migration": "Cancel the migration and resolve permissions first", }, } raise MigrationConfirmationRequired( f"ID preservation failed for subject '{subject}'. " f"Cannot set target registry to IMPORT mode due to insufficient permissions. " f"Do you want to continue migration without ID preservation?", confirmation_details, ) else: logger.info( "Proceeding without ID preservation as requested (force_without_id_preservation=True)" ) preserve_ids = False else: logger.info("✅ Target registry set to IMPORT mode") else: logger.info("✅ Target registry already in IMPORT mode") else: id_preservation_error = f"HTTP {mode_response.status_code}: {mode_response.text}" logger.warning(f"Could not get target registry mode: {mode_response.text}.") if not force_without_id_preservation: confirmation_details = { "subject": subject, "source_registry": source_client.config.name, "target_registry": target_client.config.name, "source_context": source_context, "target_context": target_context, "versions_to_migrate": versions_to_migrate, "preserve_ids_requested": True, "id_preservation_error": id_preservation_error, "error_reason": "Cannot access target registry mode", "options": { "continue_without_id_preservation": "Proceed with migration but schemas will get new IDs", "cancel_migration": "Cancel the migration and check registry access", }, } raise MigrationConfirmationRequired( f"ID preservation failed for subject '{subject}'. " f"Cannot access target registry mode. " f"Do you want to continue migration without ID preservation?", confirmation_details, ) else: preserve_ids = False except MigrationConfirmationRequired: # Re-raise confirmation required exceptions raise except Exception as e: id_preservation_error = str(e) logger.warning(f"Error setting IMPORT mode: {str(e)}.") if not force_without_id_preservation: confirmation_details = { "subject": subject, "source_registry": source_client.config.name, "target_registry": target_client.config.name, "source_context": source_context, "target_context": target_context, "versions_to_migrate": versions_to_migrate, "preserve_ids_requested": True, "id_preservation_error": str(e), "error_reason": "Unexpected error setting IMPORT mode", "options": { "continue_without_id_preservation": "Proceed with migration but schemas will get new IDs", "cancel_migration": "Cancel the migration and investigate the error", }, } raise MigrationConfirmationRequired( f"ID preservation failed for subject '{subject}'. " f"Unexpected error: {str(e)}. " f"Do you want to continue migration without ID preservation?", confirmation_details, ) else: preserve_ids = False # Continue with migration... # Migrate each version successful_count = 0 failed_count = 0 skipped_count = 0 migration_details = [] try: for version in sorted(versions_to_migrate): try: logger.info(f"Processing version {version} of subject '{subject}'") if dry_run: logger.info(f"[DRY RUN] Would migrate {subject} version {version}") successful_count += 1 migration_details.append( { "version": version, "status": "simulated", "message": "Would migrate this version", } ) continue # Get schema from source registry if source_context and source_context != ".": schema_url = ( f"{source_client.config.url}/contexts/{source_context}/" f"subjects/{subject}/versions/{version}" ) else: schema_url = f"{source_client.config.url}/subjects/{subject}/versions/{version}" schema_response = source_client.session.get( schema_url, auth=source_client.auth, headers=source_client.headers, timeout=10, ) if schema_response.status_code != 200: raise Exception(f"Failed to get schema: HTTP {schema_response.status_code}") schema_data = schema_response.json() schema_definition = json.loads(schema_data["schema"]) # Register schema in target registry if target_context and target_context != ".": target_url = ( f"{target_client.config.url}/contexts/{target_context}/" f"subjects/{target_subject_name}/versions" ) else: target_url = f"{target_client.config.url}/subjects/{target_subject_name}/versions" payload = { "schema": json.dumps(schema_definition), "schemaType": schema_data.get("schemaType", "AVRO"), } # Add references if they exist if "references" in schema_data and schema_data["references"]: payload["references"] = schema_data["references"] # Try ID preservation first if requested target_id = None if preserve_ids and "id" in schema_data: # First attempt: try with ID preservation (and version preservation) payload_with_id = payload.copy() payload_with_id["id"] = schema_data["id"] # Also preserve version number in IMPORT mode payload_with_id["version"] = version target_response = target_client.session.post( target_url, json=payload_with_id, auth=target_client.auth, headers={ **target_client.headers, "Content-Type": "application/vnd.schemaregistry.v1+json", }, timeout=10, ) if target_response.status_code in [ 200, 409, ]: # Success with ID preservation target_id = ( target_response.json().get("id") if target_response.status_code == 200 else "existing" ) successful_count += 1 migration_details.append( { "version": version, "status": "migrated", "source_id": schema_data.get("id"), "target_id": target_id, "preserved_version": True, } ) continue elif target_response.status_code == 422 and "import mode" in target_response.text.lower(): # Import mode required for ID preservation - fall back to normal registration logger.warning( f"ID preservation requires IMPORT mode for version {version}, " f"falling back to normal registration" ) else: # Other error - try without ID preservation logger.warning( f"ID preservation failed for version {version}: {target_response.text}, trying without ID" ) # Fallback: register without ID preservation target_response = target_client.session.post( target_url, json=payload, auth=target_client.auth, headers={ **target_client.headers, "Content-Type": "application/vnd.schemaregistry.v1+json", }, timeout=10, ) if target_response.status_code in [ 200, 409, ]: # 409 = already exists target_id = ( target_response.json().get("id") if target_response.status_code == 200 else "existing" ) successful_count += 1 migration_details.append( { "version": version, "status": "migrated", "source_id": schema_data.get("id"), "target_id": target_id, "preserved_version": False, # ID was not preserved "note": ( "ID preservation skipped (registry not in IMPORT mode)" if preserve_ids else None ), } ) else: raise Exception( f"Failed to register schema: HTTP {target_response.status_code} - {target_response.text}" ) except Exception as e: logger.error(f"Error migrating version {version}: {e}") failed_count += 1 migration_details.append( { "version": version, "status": "failed", "error": f"Version migration error: {str(e)}", } ) finally: # Restore original target registry mode if original_target_mode and original_target_mode != "IMPORT": try: logger.info(f"Restoring target registry to original mode: {original_target_mode}") restore_response = target_client.session.put( f"{target_client.config.url}/mode", json={"mode": original_target_mode}, auth=target_client.auth, headers={ **target_client.headers, "Content-Type": "application/vnd.schemaregistry.v1+json", }, timeout=10, ) if restore_response.status_code == 200: logger.info(f"✅ Target registry restored to {original_target_mode} mode") else: logger.warning(f"Failed to restore registry mode: {restore_response.text}") except Exception as e: logger.warning(f"Error restoring registry mode: {str(e)}") logger.info(f"Migration completed for subject '{subject}'. Migrated {successful_count} versions") return { "total_versions": len(available_versions), "versions_to_migrate": len(versions_to_migrate), "successful_migrations": successful_count, "failed_migrations": failed_count, "skipped_migrations": skipped_count, "migration_details": migration_details, "dry_run": dry_run, "preserve_ids": preserve_ids, "message": f"Migrated {successful_count}/{len(versions_to_migrate)} versions successfully" + (" (dry run)" if dry_run else ""), } except MigrationConfirmationRequired: # Re-raise confirmation required exceptions - don't convert them to generic errors raise except Exception as e: logger.error(f"Error in _execute_schema_migration: {e}") return { "error": f"Migration execution failed: {str(e)}", "total_versions": 0, "successful_migrations": 0, "failed_migrations": 0, "skipped_migrations": 0, "dry_run": dry_run, } @structured_output("list_migrations", fallback_on_error=True) def list_migrations_tool(registry_mode: str) -> Dict[str, Any]: """ List all migration tasks and their status. Only available in multi-registry mode. Returns: Dictionary containing migration tasks with their status and progress, including resource links """ try: if registry_mode == "single": return create_error_response( "Migration tracking not available in single-registry mode", details={"suggestion": "Use multi-registry configuration to enable migration tracking"}, error_code="SINGLE_REGISTRY_MODE_LIMITATION", registry_mode="single", ) # Get all migration-related tasks all_tasks = task_manager.list_tasks(task_type=TaskType.MIGRATION) migrations = [] for task in all_tasks: migration_info = { "migration_id": task.id, "type": task.type.value, "status": task.status.value, "created_at": task.created_at, "started_at": task.started_at, "completed_at": task.completed_at, "progress": task.progress, "error": task.error, "metadata": task.metadata or {}, } migrations.append(migration_info) # Convert to enhanced response format result = { "migrations": migrations, "total_migrations": len(migrations), "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Add resource links registry_name = _get_registry_name_for_linking(registry_mode) result = add_links_to_response(result, "migrations_list", registry_name) return result except Exception as e: return create_error_response(str(e), error_code="MIGRATION_LIST_FAILED", registry_mode=registry_mode) @structured_output("get_migration_status", fallback_on_error=True) def get_migration_status_tool(migration_id: str, registry_mode: str) -> Dict[str, Any]: """ Get detailed status of a specific migration. Only available in multi-registry mode. Args: migration_id: The migration task ID to query Returns: Detailed migration status and progress information with structured validation and resource links """ try: if registry_mode == "single": return create_error_response( "Migration tracking not available in single-registry mode", details={"suggestion": "Use multi-registry configuration to enable migration tracking"}, error_code="SINGLE_REGISTRY_MODE_LIMITATION", registry_mode="single", ) # Get the specific migration task task = task_manager.get_task(migration_id) if task is None: return create_error_response( f"Migration '{migration_id}' not found", error_code="MIGRATION_NOT_FOUND", registry_mode=registry_mode, ) migration_status = { "migration_id": task.id, "status": task.status.value, "progress": task.progress, "started_at": task.started_at, "completed_at": task.completed_at, "error": task.error, "result": task.result, "metadata": task.metadata or {}, "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Add estimated time remaining if in progress if task.status == TaskStatus.RUNNING and task.progress > 0: elapsed = time.time() - ( datetime.fromisoformat(task.started_at).timestamp() if task.started_at else time.time() ) if task.progress > 5: # Only estimate if we have meaningful progress estimated_total = elapsed / (task.progress / 100) estimated_remaining = max(0, estimated_total - elapsed) migration_status["estimated_remaining_seconds"] = round(estimated_remaining, 1) # Add resource links - extract registry names from metadata metadata = task.metadata or {} source_registry = metadata.get("source_registry", "unknown") target_registry = metadata.get("target_registry", "unknown") migration_status = add_links_to_response( migration_status, "migration", source_registry, migration_id=migration_id, source_registry=source_registry, target_registry=target_registry, ) return migration_status except Exception as e: return create_error_response(str(e), error_code="MIGRATION_STATUS_FAILED", registry_mode=registry_mode) @structured_output("migrate_context", fallback_on_error=True) def migrate_context_tool( source_registry: str, target_registry: str, registry_manager, registry_mode: str, context: Optional[str] = None, target_context: Optional[str] = None, preserve_ids: bool = True, dry_run: bool = True, migrate_all_versions: bool = True, ) -> Dict[str, Any]: """ Generate Docker command for migrating an entire context using the external kafka-schema-reg-migrator tool. This MCP only supports single schema migration. For context migration, use the specialized external tool. Args: source_registry: Source registry name target_registry: Target registry name context: Source context to migrate (default: ".") target_context: Target context name (defaults to source context) preserve_ids: Preserve original schema IDs (requires IMPORT mode) dry_run: Preview migration without executing migrate_all_versions: Migrate all versions or just latest Returns: Docker command and instructions for running the external migration tool with structured validation and resource links """ try: if registry_mode == "single": return create_error_response( "Context migration between registries not available in single-registry mode", details={"suggestion": "Use multi-registry configuration to enable cross-registry migration"}, error_code="SINGLE_REGISTRY_MODE_LIMITATION", registry_mode="single", ) # Get registry configurations source_client = registry_manager.get_registry(source_registry) target_client = registry_manager.get_registry(target_registry) if not source_client: return create_error_response( f"Source registry '{source_registry}' not found", error_code="SOURCE_REGISTRY_NOT_FOUND", registry_mode="multi", ) if not target_client: return create_error_response( f"Target registry '{target_registry}' not found", error_code="TARGET_REGISTRY_NOT_FOUND", registry_mode="multi", ) # Use default context if not specified context = context or "." target_context = target_context or context # Build environment variables for the docker command env_vars = [ f"SOURCE_SCHEMA_REGISTRY_URL={source_client.config.url}", f"DEST_SCHEMA_REGISTRY_URL={target_client.config.url}", "ENABLE_MIGRATION=true", f"DRY_RUN={str(dry_run).lower()}", f"PRESERVE_IDS={str(preserve_ids).lower()}", ] # Add authentication if available if source_client.config.user: env_vars.append(f"SOURCE_USERNAME={source_client.config.user}") if source_client.config.password: env_vars.append(f"SOURCE_PASSWORD={source_client.config.password}") if target_client.config.user: env_vars.append(f"DEST_USERNAME={target_client.config.user}") if target_client.config.password: env_vars.append(f"DEST_PASSWORD={target_client.config.password}") # Add context information if context != ".": env_vars.append(f"SOURCE_CONTEXT={context}") if target_context != ".": env_vars.append(f"DEST_CONTEXT={target_context}") # Add import mode if preserving IDs if preserve_ids: env_vars.append("DEST_IMPORT_MODE=true") # Build docker run command docker_cmd_parts = ["docker run -it --rm"] # Add environment variables for env_var in env_vars: docker_cmd_parts.append(f"-e {env_var}") # Add the image docker_cmd_parts.append("aywengo/kafka-schema-reg-migrator:latest") docker_command = " \\\n ".join(docker_cmd_parts) result = { "message": "Context migration requires the external kafka-schema-reg-migrator tool", "reason": ( "This MCP only supports single schema migration. " "For context migration, use the specialized external tool." ), "tool": "kafka-schema-reg-migrator", "documentation": "https://github.com/aywengo/kafka-schema-reg-migrator", "docker_hub": "https://hub.docker.com/r/aywengo/kafka-schema-reg-migrator", "docker_docs": "https://github.com/aywengo/kafka-schema-reg-migrator/blob/main/docs/run-in-docker.md", "migration_details": { "source": { "registry": source_registry, "url": source_client.config.url, "context": context, }, "target": { "registry": target_registry, "url": target_client.config.url, "context": target_context, }, "options": { "preserve_ids": preserve_ids, "dry_run": dry_run, "migrate_all_versions": migrate_all_versions, }, }, "docker_command": docker_command, "instructions": [ "1. Copy and run the Docker command below:", f" {docker_command}", "", "2. Monitor the migration output in your terminal", "", "3. For more advanced options, see the documentation:", " https://github.com/aywengo/kafka-schema-reg-migrator/blob/main/docs/run-in-docker.md", "", "4. Alternative: Use environment file approach:", " - Create a .env file with the environment variables", " - Run: docker run -it --rm --env-file .env aywengo/kafka-schema-reg-migrator:latest", ], "env_variables": env_vars, "warnings": [ "⚠️ This will use an external Docker container for migration", "⚠️ Ensure Docker is installed and running", ( "⚠️ " + ( "This is a DRY RUN - no actual changes will be made" if dry_run else "This will perform actual data migration" ) ), "⚠️ Review the documentation for advanced configuration options", ], "status": "completed", # For schema compatibility "source_registry": source_registry, "target_registry": target_registry, "dry_run": dry_run, "registry_mode": "multi", "mcp_protocol_version": "2025-06-18", } # Add resource links result = add_links_to_response( result, "comparison", source_registry, source_registry=source_registry, target_registry=target_registry, ) return result except Exception as e: return create_error_response(str(e), error_code="CONTEXT_MIGRATION_FAILED", registry_mode=registry_mode)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server