compare_contexts_across_registries
Analyze and align context differences between source and target registries to ensure consistency in Kafka Schema Registry configurations.
Instructions
Compare contexts across two registries.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| source_context | Yes | ||
| source_registry | Yes | ||
| target_context | No | ||
| target_registry | Yes |
Implementation Reference
- comparison_tools.py:269-460 (handler)The core handler function that executes the 'compare_contexts_across_registries' tool. It compares subjects and schema versions between specified contexts in two registries, computes differences (only_in_source, only_in_target, in_both, schema differences), generates a structured JSON response with summary statistics, progress reporting via MCP context, error handling, and adds HATEOAS resource links.@structured_output("compare_contexts_across_registries", fallback_on_error=True) async def compare_contexts_across_registries_tool( source_registry: str, target_registry: str, source_context: str, registry_manager, registry_mode: str, target_context: Optional[str] = None, context: Optional["Context"] = None, ) -> Dict[str, Any]: """ Compare contexts across two registries. Only available in multi-registry mode. Args: source_registry: Source registry name target_registry: Target registry name source_context: Source context name target_context: Target context name (defaults to source_context) context: MCP Context for progress reporting Returns: Context comparison results with structured validation and resource links """ if registry_mode == "single": return create_error_response( "Context comparison across registries is only available in multi-registry mode", details={"suggestion": "Set REGISTRY_MODE=multi to enable this feature"}, error_code="SINGLE_REGISTRY_MODE_LIMITATION", registry_mode=registry_mode, ) try: # Initial setup (0-10%) if context: await context.info( f"Starting context comparison: {source_registry}/{source_context} vs " f"{target_registry}/{target_context or source_context}" ) await context.report_progress(0.0, 100.0, "Initializing context comparison") source_client = registry_manager.get_registry(source_registry) target_client = registry_manager.get_registry(target_registry) if not source_client: return create_error_response( f"Source registry '{source_registry}' not found", error_code="SOURCE_REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) if not target_client: return create_error_response( f"Target registry '{target_registry}' not found", error_code="TARGET_REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) # Use source context for target if not specified if target_context is None: target_context = source_context if context: await context.report_progress(10.0, 100.0, "Registry clients initialized") # Get subjects from source context (10-30%) if context: await context.info(f"Fetching subjects from source context: {source_registry}/{source_context}") await context.report_progress(15.0, 100.0, f"Fetching subjects from {source_registry}/{source_context}") source_subjects = set(source_client.get_subjects(source_context) or []) if context: await context.report_progress(25.0, 100.0, f"Found {len(source_subjects)} subjects in source context") # Get subjects from target context (30-50%) if context: await context.info(f"Fetching subjects from target context: {target_registry}/{target_context}") await context.report_progress(35.0, 100.0, f"Fetching subjects from {target_registry}/{target_context}") target_subjects = set(target_client.get_subjects(target_context) or []) if context: await context.report_progress(45.0, 100.0, f"Found {len(target_subjects)} subjects in target context") # Build comparison structure (50-60%) if context: await context.report_progress(50.0, 100.0, "Building comparison structure") comparison: Dict[str, Any] = { "source": { "registry": source_registry, "context": source_context, "subject_count": len(source_subjects), "subjects": list(source_subjects), }, "target": { "registry": target_registry, "context": target_context, "subject_count": len(target_subjects), "subjects": list(target_subjects), }, "differences": { "only_in_source": list(source_subjects - target_subjects), "only_in_target": list(target_subjects - source_subjects), "in_both": list(source_subjects & target_subjects), }, "timestamp": datetime.now().isoformat(), "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } if context: await context.report_progress(60.0, 100.0, "Analyzing subject differences") # Compare schemas for common subjects (60-90%) common_subjects = source_subjects & target_subjects schema_differences = [] if context: await context.info(f"Comparing schemas for {len(common_subjects)} common subjects") await context.report_progress(65.0, 100.0, f"Comparing schemas for {len(common_subjects)} common subjects") if common_subjects: for i, subject in enumerate(common_subjects): # Report progress for schema comparison if context and len(common_subjects) > 0: progress = 65.0 + (i / len(common_subjects)) * 20.0 # 65% to 85% await context.report_progress(progress, 100.0, f"Comparing schema versions for {subject}") source_versions = source_client.get_schema_versions(subject, source_context) or [] target_versions = target_client.get_schema_versions(subject, target_context) or [] if source_versions != target_versions: schema_differences.append( { "subject": subject, "source_versions": source_versions, "target_versions": target_versions, "source_version_count": (len(source_versions) if isinstance(source_versions, list) else 0), "target_version_count": (len(target_versions) if isinstance(target_versions, list) else 0), } ) if context: await context.report_progress( 85.0, 100.0, f"Found {len(schema_differences)} subjects with version differences" ) comparison["schema_differences"] = { "subjects_with_differences": schema_differences, "count": len(schema_differences), } # Add summary (90-95%) if context: await context.report_progress(90.0, 100.0, "Building comparison summary") comparison["summary"] = { "contexts_match": ( len(comparison["differences"]["only_in_source"]) == 0 and len(comparison["differences"]["only_in_target"]) == 0 and len(schema_differences) == 0 ), "subjects_only_in_source": len(comparison["differences"]["only_in_source"]), "subjects_only_in_target": len(comparison["differences"]["only_in_target"]), "subjects_in_both": len(comparison["differences"]["in_both"]), "schemas_with_version_differences": len(schema_differences), } # Add resource links (95-100%) if context: await context.report_progress(95.0, 100.0, "Adding resource links") comparison = add_links_to_response( comparison, "comparison", source_registry, source_registry=source_registry, target_registry=target_registry, ) if context: await context.info("Context comparison completed successfully") await context.report_progress(100.0, 100.0, "Context comparison completed") return comparison except Exception as e: if context: await context.error(f"Context comparison failed: {str(e)}") return create_error_response(str(e), error_code="CONTEXT_COMPARISON_FAILED", registry_mode=registry_mode)
- task_management.py:162-164 (registration)Registers the tool in the OPERATION_METADATA dictionary, indicating it is a long-running operation (LONG duration) that uses the task queue pattern for non-blocking execution."compare_contexts_across_registries": { "duration": OperationDuration.LONG, "pattern": AsyncPattern.TASK_QUEUE,