find_missing_schemas
Identify missing schemas by comparing source and target Kafka Schema Registry instances to ensure consistency across environments.
Instructions
Find schemas that exist in source registry but not in target registry.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| context | No | ||
| source_registry | Yes | ||
| target_registry | Yes |
Implementation Reference
- comparison_tools.py:462-600 (handler)The core handler function for the 'find_missing_schemas' tool. It compares subjects between source and target registries (optionally filtered by context), identifies missing ones in the target, fetches details like versions and latest schema info for each, adds summary statistics, resource links, and handles errors with structured responses.@structured_output("find_missing_schemas", fallback_on_error=True) async def find_missing_schemas_tool( source_registry: str, target_registry: str, registry_manager, registry_mode: str, context: Optional[str] = None, ) -> Dict[str, Any]: """ Find schemas that exist in source registry but not in target registry. Only available in multi-registry mode. Args: source_registry: Source registry name target_registry: Target registry name context: Optional context to limit the search Returns: List of missing schemas with structured validation and resource links """ if registry_mode == "single": return create_error_response( "Finding missing schemas across registries is only available in multi-registry mode", details={"suggestion": "Set REGISTRY_MODE=multi to enable this feature"}, error_code="SINGLE_REGISTRY_MODE_LIMITATION", registry_mode=registry_mode, ) try: source_client = registry_manager.get_registry(source_registry) target_client = registry_manager.get_registry(target_registry) if not source_client: return create_error_response( f"Source registry '{source_registry}' not found", error_code="SOURCE_REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) if not target_client: return create_error_response( f"Target registry '{target_registry}' not found", error_code="TARGET_REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) # Get subjects based on context if context: source_subjects = set(source_client.get_subjects(context) or []) target_subjects = set(target_client.get_subjects(context) or []) else: source_subjects = set(source_client.get_subjects() or []) target_subjects = set(target_client.get_subjects() or []) # Find missing subjects missing_subjects = source_subjects - target_subjects result: Dict[str, Any] = { "source_registry": source_registry, "target_registry": target_registry, "context": context, "missing_subjects": list(missing_subjects), "missing_count": len(missing_subjects), "source_subject_count": len(source_subjects), "target_subject_count": len(target_subjects), "details": [], "timestamp": datetime.now().isoformat(), "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Ensure details is treated as a list details_list: List[Dict[str, Any]] = result["details"] # Get details for each missing subject for subject in missing_subjects: try: versions = source_client.get_schema_versions(subject, context) or [] latest_schema = None if versions: latest_version = max(versions) if isinstance(versions, list) else "latest" latest_schema = source_client.get_schema(subject, str(latest_version), context) details_list.append( { "subject": subject, "versions": versions, "version_count": (len(versions) if isinstance(versions, list) else 0), "latest_version": latest_version if versions else None, "latest_schema_id": ( latest_schema.get("id") if latest_schema and isinstance(latest_schema, dict) else None ), "schema_type": ( latest_schema.get("schemaType", "AVRO") if latest_schema and isinstance(latest_schema, dict) else None ), } ) except Exception as e: # If we can't get details for a subject, still include it in the list details_list.append( { "subject": subject, "versions": [], "version_count": 0, "error": f"Failed to get subject details: {str(e)}", } ) # Update result with processed details result["details"] = details_list # Add summary information result["summary"] = { "migration_needed": len(missing_subjects) > 0, "total_versions_to_migrate": sum(detail.get("version_count", 0) for detail in details_list), "subjects_with_multiple_versions": len( [detail for detail in details_list if detail.get("version_count", 0) > 1] ), } # Add resource links result = add_links_to_response( result, "comparison", source_registry, source_registry=source_registry, target_registry=target_registry, ) return result except Exception as e: return create_error_response( str(e), error_code="MISSING_SCHEMA_SEARCH_FAILED", registry_mode=registry_mode, )