Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

find_missing_schemas

Identify missing schemas by comparing source and target Kafka Schema Registry instances to ensure consistency across environments.

Instructions

Find schemas that exist in source registry but not in target registry.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contextNo
source_registryYes
target_registryYes

Implementation Reference

  • The core handler function for the 'find_missing_schemas' tool. It compares subjects between source and target registries (optionally filtered by context), identifies missing ones in the target, fetches details like versions and latest schema info for each, adds summary statistics, resource links, and handles errors with structured responses.
    @structured_output("find_missing_schemas", fallback_on_error=True)
    async def find_missing_schemas_tool(
        source_registry: str,
        target_registry: str,
        registry_manager,
        registry_mode: str,
        context: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Find schemas that exist in source registry but not in target registry.
        Only available in multi-registry mode.
    
        Args:
            source_registry: Source registry name
            target_registry: Target registry name
            context: Optional context to limit the search
    
        Returns:
            List of missing schemas with structured validation and resource links
        """
        if registry_mode == "single":
            return create_error_response(
                "Finding missing schemas across registries is only available in multi-registry mode",
                details={"suggestion": "Set REGISTRY_MODE=multi to enable this feature"},
                error_code="SINGLE_REGISTRY_MODE_LIMITATION",
                registry_mode=registry_mode,
            )
    
        try:
            source_client = registry_manager.get_registry(source_registry)
            target_client = registry_manager.get_registry(target_registry)
    
            if not source_client:
                return create_error_response(
                    f"Source registry '{source_registry}' not found",
                    error_code="SOURCE_REGISTRY_NOT_FOUND",
                    registry_mode=registry_mode,
                )
            if not target_client:
                return create_error_response(
                    f"Target registry '{target_registry}' not found",
                    error_code="TARGET_REGISTRY_NOT_FOUND",
                    registry_mode=registry_mode,
                )
    
            # Get subjects based on context
            if context:
                source_subjects = set(source_client.get_subjects(context) or [])
                target_subjects = set(target_client.get_subjects(context) or [])
            else:
                source_subjects = set(source_client.get_subjects() or [])
                target_subjects = set(target_client.get_subjects() or [])
    
            # Find missing subjects
            missing_subjects = source_subjects - target_subjects
    
            result: Dict[str, Any] = {
                "source_registry": source_registry,
                "target_registry": target_registry,
                "context": context,
                "missing_subjects": list(missing_subjects),
                "missing_count": len(missing_subjects),
                "source_subject_count": len(source_subjects),
                "target_subject_count": len(target_subjects),
                "details": [],
                "timestamp": datetime.now().isoformat(),
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
            # Ensure details is treated as a list
            details_list: List[Dict[str, Any]] = result["details"]
    
            # Get details for each missing subject
            for subject in missing_subjects:
                try:
                    versions = source_client.get_schema_versions(subject, context) or []
                    latest_schema = None
    
                    if versions:
                        latest_version = max(versions) if isinstance(versions, list) else "latest"
                        latest_schema = source_client.get_schema(subject, str(latest_version), context)
    
                    details_list.append(
                        {
                            "subject": subject,
                            "versions": versions,
                            "version_count": (len(versions) if isinstance(versions, list) else 0),
                            "latest_version": latest_version if versions else None,
                            "latest_schema_id": (
                                latest_schema.get("id") if latest_schema and isinstance(latest_schema, dict) else None
                            ),
                            "schema_type": (
                                latest_schema.get("schemaType", "AVRO")
                                if latest_schema and isinstance(latest_schema, dict)
                                else None
                            ),
                        }
                    )
                except Exception as e:
                    # If we can't get details for a subject, still include it in the list
                    details_list.append(
                        {
                            "subject": subject,
                            "versions": [],
                            "version_count": 0,
                            "error": f"Failed to get subject details: {str(e)}",
                        }
                    )
    
            # Update result with processed details
            result["details"] = details_list
    
            # Add summary information
            result["summary"] = {
                "migration_needed": len(missing_subjects) > 0,
                "total_versions_to_migrate": sum(detail.get("version_count", 0) for detail in details_list),
                "subjects_with_multiple_versions": len(
                    [detail for detail in details_list if detail.get("version_count", 0) > 1]
                ),
            }
    
            # Add resource links
            result = add_links_to_response(
                result,
                "comparison",
                source_registry,
                source_registry=source_registry,
                target_registry=target_registry,
            )
    
            return result
    
        except Exception as e:
            return create_error_response(
                str(e),
                error_code="MISSING_SCHEMA_SEARCH_FAILED",
                registry_mode=registry_mode,
            )
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden of behavioral disclosure. It states the tool finds missing schemas but does not describe how it operates—e.g., whether it performs a read-only comparison, requires authentication, has rate limits, or returns results in a specific format. For a tool with zero annotation coverage, this lack of detail is a significant gap.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence that front-loads the core purpose without unnecessary words. It directly states what the tool does, making it easy to understand at a glance, with zero waste or redundancy.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity of comparing registries, lack of annotations, no output schema, and low schema description coverage, the description is incomplete. It does not address behavioral aspects like safety, performance, or result format, nor does it provide usage context or parameter details. This leaves significant gaps for an AI agent to invoke the tool correctly.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The description implies parameters for 'source_registry' and 'target_registry' but does not add meaning beyond what the input schema provides. With 0% schema description coverage, the schema titles ('Source Registry', 'Target Registry', 'Context') are basic, and the description does not explain what these parameters represent (e.g., registry names, URLs) or the optional 'context' parameter's purpose. It compensates minimally, aligning with the baseline for low coverage.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: 'Find schemas that exist in source registry but not in target registry.' It specifies the verb ('find') and resources ('schemas'), and distinguishes the operation as a comparison between two registries. However, it does not explicitly differentiate from sibling tools like 'compare_registries' or 'bulk_schema_migration', which may have overlapping functions, preventing a perfect score.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives. It lacks information on prerequisites, such as whether registries need to be connected or accessible, and does not mention sibling tools like 'compare_registries' or 'bulk_schema_migration' that might serve similar purposes. This omission leaves the agent without context for tool selection.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server