Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

check_compatibility

Verify schema compatibility with the latest version in Kafka Schema Registry to ensure data consistency and prevent integration issues.

Instructions

Check if a schema is compatible with the latest version.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contextNo
registryNo
schema_definitionYes
schema_typeNoAVRO
subjectYes

Implementation Reference

  • The core handler function `check_compatibility_tool` that POSTs the provided schema to the registry's compatibility endpoint (/compatibility/subjects/{subject}/versions/latest) to check backward compatibility with the latest version. Handles single/multi-registry modes, normalizes response fields, adds metadata/links, and uses structured output validation.
    @structured_output("check_compatibility", fallback_on_error=True)
    def check_compatibility_tool(
        subject: str,
        schema_definition: Dict[str, Any],
        registry_manager,
        registry_mode: str,
        schema_type: str = "AVRO",
        context: Optional[str] = None,
        registry: Optional[str] = None,
        auth=None,
        headers=None,
        schema_registry_url: str = "",
    ) -> Dict[str, Any]:
        """
        Check if a schema is compatible with the latest version.
    
        Args:
            subject: The subject name
            schema_definition: The schema definition to check
            schema_type: The schema type (AVRO, JSON, PROTOBUF)
            context: Optional schema context
            registry: Optional registry name (ignored in single-registry mode)
    
        Returns:
            Compatibility check result with structured validation and resource links
        """
        try:
            payload = {"schema": json.dumps(schema_definition), "schemaType": schema_type}
    
            if registry_mode == "single":
                # Single-registry mode: use secure session approach
                client = registry_manager.get_registry()
                if client is None:
                    return create_error_response(
                        "No default registry configured",
                        error_code="REGISTRY_NOT_FOUND",
                        registry_mode="single",
                    )
    
                url = client.build_context_url(f"/compatibility/subjects/{subject}/versions/latest", context)
    
                response = client.session.post(url, data=json.dumps(payload), auth=client.auth, headers=client.headers)
                response.raise_for_status()
                result = response.json()
    
                # Add structured output metadata and normalize field names
                if "is_compatible" not in result:
                    if "isCompatible" in result:
                        result["is_compatible"] = result.pop("isCompatible")
                    elif "compatible" in result:
                        result["is_compatible"] = result.pop("compatible")
                    else:
                        # Fallback: set default value if no compatibility field is found
                        logger.warning(f"No compatibility field found in response: {result.keys()}")
                        result["is_compatible"] = False
    
                result["registry_mode"] = "single"
                result["mcp_protocol_version"] = "2025-06-18"
    
                # Add resource links
                registry_name = _get_registry_name(registry_mode, registry)
                result = add_links_to_response(result, "compatibility", registry_name, subject=subject, context=context)
    
                return result
            else:
                # Multi-registry mode: use client approach
                client = registry_manager.get_registry(registry)
                if client is None:
                    return create_error_response(
                        f"Registry '{registry}' not found",
                        error_code="REGISTRY_NOT_FOUND",
                        registry_mode="multi",
                    )
    
                url = client.build_context_url(f"/compatibility/subjects/{subject}/versions/latest", context)
    
                response = client.session.post(url, data=json.dumps(payload), auth=client.auth, headers=client.headers)
                response.raise_for_status()
                result = response.json()
    
                # Add structured output metadata and normalize field names
                if "is_compatible" not in result:
                    if "isCompatible" in result:
                        result["is_compatible"] = result.pop("isCompatible")
                    elif "compatible" in result:
                        result["is_compatible"] = result.pop("compatible")
                    else:
                        # Fallback: set default value if no compatibility field is found
                        logger.warning(f"No compatibility field found in response: {result.keys()}")
                        result["is_compatible"] = False
    
                result["registry"] = client.config.name
                result["registry_mode"] = "multi"
                result["mcp_protocol_version"] = "2025-06-18"
    
                # Add resource links
                result = add_links_to_response(
                    result,
                    "compatibility",
                    client.config.name,
                    subject=subject,
                    context=context,
                )
    
                return result
        except Exception as e:
            return create_error_response(str(e), error_code="COMPATIBILITY_CHECK_FAILED", registry_mode=registry_mode)
  • JSON Schema defining the expected output format for the check_compatibility tool response, including is_compatible boolean, compatibility_level, messages array, and metadata.
    CHECK_COMPATIBILITY_SCHEMA = {
        "type": "object",
        "properties": {
            "is_compatible": {
                "type": "boolean",
                "description": "Whether the schema is compatible",
            },
            "compatibility_level": {
                "type": "string",
                "enum": [
                    "BACKWARD",
                    "FORWARD",
                    "FULL",
                    "NONE",
                    "BACKWARD_TRANSITIVE",
                    "FORWARD_TRANSITIVE",
                    "FULL_TRANSITIVE",
                ],
                "description": "Compatibility level used for check",
            },
            "messages": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Detailed compatibility messages",
            },
            "registry": {
                "type": "string",
                "description": "Registry name (multi-registry mode)",
            },
            **METADATA_FIELDS,
        },
        "required": ["is_compatible"],
        "additionalProperties": True,
    }
  • The check_compatibility tool is registered/mapped to its output schema CHECK_COMPATIBILITY_SCHEMA in the central TOOL_OUTPUT_SCHEMAS dictionary used for structured validation.
    "check_compatibility": CHECK_COMPATIBILITY_SCHEMA,
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server