Skip to main content
Glama

compare_contexts_across_registries

Analyze and align context differences between source and target registries to ensure consistency in Kafka Schema Registry configurations.

Instructions

Compare contexts across two registries.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
source_contextYes
source_registryYes
target_contextNo
target_registryYes

Implementation Reference

  • The core handler function that executes the 'compare_contexts_across_registries' tool. It compares subjects and schema versions between specified contexts in two registries, computes differences (only_in_source, only_in_target, in_both, schema differences), generates a structured JSON response with summary statistics, progress reporting via MCP context, error handling, and adds HATEOAS resource links.
    @structured_output("compare_contexts_across_registries", fallback_on_error=True) async def compare_contexts_across_registries_tool( source_registry: str, target_registry: str, source_context: str, registry_manager, registry_mode: str, target_context: Optional[str] = None, context: Optional["Context"] = None, ) -> Dict[str, Any]: """ Compare contexts across two registries. Only available in multi-registry mode. Args: source_registry: Source registry name target_registry: Target registry name source_context: Source context name target_context: Target context name (defaults to source_context) context: MCP Context for progress reporting Returns: Context comparison results with structured validation and resource links """ if registry_mode == "single": return create_error_response( "Context comparison across registries is only available in multi-registry mode", details={"suggestion": "Set REGISTRY_MODE=multi to enable this feature"}, error_code="SINGLE_REGISTRY_MODE_LIMITATION", registry_mode=registry_mode, ) try: # Initial setup (0-10%) if context: await context.info( f"Starting context comparison: {source_registry}/{source_context} vs " f"{target_registry}/{target_context or source_context}" ) await context.report_progress(0.0, 100.0, "Initializing context comparison") source_client = registry_manager.get_registry(source_registry) target_client = registry_manager.get_registry(target_registry) if not source_client: return create_error_response( f"Source registry '{source_registry}' not found", error_code="SOURCE_REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) if not target_client: return create_error_response( f"Target registry '{target_registry}' not found", error_code="TARGET_REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) # Use source context for target if not specified if target_context is None: target_context = source_context if context: await context.report_progress(10.0, 100.0, "Registry clients initialized") # Get subjects from source context (10-30%) if context: await context.info(f"Fetching subjects from source context: {source_registry}/{source_context}") await context.report_progress(15.0, 100.0, f"Fetching subjects from {source_registry}/{source_context}") source_subjects = set(source_client.get_subjects(source_context) or []) if context: await context.report_progress(25.0, 100.0, f"Found {len(source_subjects)} subjects in source context") # Get subjects from target context (30-50%) if context: await context.info(f"Fetching subjects from target context: {target_registry}/{target_context}") await context.report_progress(35.0, 100.0, f"Fetching subjects from {target_registry}/{target_context}") target_subjects = set(target_client.get_subjects(target_context) or []) if context: await context.report_progress(45.0, 100.0, f"Found {len(target_subjects)} subjects in target context") # Build comparison structure (50-60%) if context: await context.report_progress(50.0, 100.0, "Building comparison structure") comparison: Dict[str, Any] = { "source": { "registry": source_registry, "context": source_context, "subject_count": len(source_subjects), "subjects": list(source_subjects), }, "target": { "registry": target_registry, "context": target_context, "subject_count": len(target_subjects), "subjects": list(target_subjects), }, "differences": { "only_in_source": list(source_subjects - target_subjects), "only_in_target": list(target_subjects - source_subjects), "in_both": list(source_subjects & target_subjects), }, "timestamp": datetime.now().isoformat(), "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } if context: await context.report_progress(60.0, 100.0, "Analyzing subject differences") # Compare schemas for common subjects (60-90%) common_subjects = source_subjects & target_subjects schema_differences = [] if context: await context.info(f"Comparing schemas for {len(common_subjects)} common subjects") await context.report_progress(65.0, 100.0, f"Comparing schemas for {len(common_subjects)} common subjects") if common_subjects: for i, subject in enumerate(common_subjects): # Report progress for schema comparison if context and len(common_subjects) > 0: progress = 65.0 + (i / len(common_subjects)) * 20.0 # 65% to 85% await context.report_progress(progress, 100.0, f"Comparing schema versions for {subject}") source_versions = source_client.get_schema_versions(subject, source_context) or [] target_versions = target_client.get_schema_versions(subject, target_context) or [] if source_versions != target_versions: schema_differences.append( { "subject": subject, "source_versions": source_versions, "target_versions": target_versions, "source_version_count": (len(source_versions) if isinstance(source_versions, list) else 0), "target_version_count": (len(target_versions) if isinstance(target_versions, list) else 0), } ) if context: await context.report_progress( 85.0, 100.0, f"Found {len(schema_differences)} subjects with version differences" ) comparison["schema_differences"] = { "subjects_with_differences": schema_differences, "count": len(schema_differences), } # Add summary (90-95%) if context: await context.report_progress(90.0, 100.0, "Building comparison summary") comparison["summary"] = { "contexts_match": ( len(comparison["differences"]["only_in_source"]) == 0 and len(comparison["differences"]["only_in_target"]) == 0 and len(schema_differences) == 0 ), "subjects_only_in_source": len(comparison["differences"]["only_in_source"]), "subjects_only_in_target": len(comparison["differences"]["only_in_target"]), "subjects_in_both": len(comparison["differences"]["in_both"]), "schemas_with_version_differences": len(schema_differences), } # Add resource links (95-100%) if context: await context.report_progress(95.0, 100.0, "Adding resource links") comparison = add_links_to_response( comparison, "comparison", source_registry, source_registry=source_registry, target_registry=target_registry, ) if context: await context.info("Context comparison completed successfully") await context.report_progress(100.0, 100.0, "Context comparison completed") return comparison except Exception as e: if context: await context.error(f"Context comparison failed: {str(e)}") return create_error_response(str(e), error_code="CONTEXT_COMPARISON_FAILED", registry_mode=registry_mode)
  • Registers the tool in the OPERATION_METADATA dictionary, indicating it is a long-running operation (LONG duration) that uses the task queue pattern for non-blocking execution.
    "compare_contexts_across_registries": { "duration": OperationDuration.LONG, "pattern": AsyncPattern.TASK_QUEUE,

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server