Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

get_schema_versions

Retrieve all schema versions for a specific subject in the Kafka Schema Registry. Use this tool for backward compatibility, or switch to the 'schema://{name}/{context}/{subject}/versions' resource for improved performance.

Instructions

Get all versions of a schema for a subject.

NOTE: This tool is maintained for backward compatibility. Consider using the 'schema://{name}/{context}/{subject}/versions' resource instead for better performance.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contextNo
registryNo
subjectYes

Implementation Reference

  • Main handler function for the 'get_schema_versions' tool. Performs HTTP GET request to the Schema Registry API endpoint '/subjects/{subject}/versions' (with optional context prefix), handles single/multi-registry modes, formats response with metadata, HATEOAS links, and MCP protocol compliance.
    @structured_output("get_schema_versions", fallback_on_error=True)
    def get_schema_versions_tool(
        subject: str,
        registry_manager,
        registry_mode: str,
        context: Optional[str] = None,
        registry: Optional[str] = None,
        auth=None,
        headers=None,
        schema_registry_url: str = "",
    ) -> Dict[str, Any]:
        """
        Get all versions of a schema for a subject.
    
        Args:
            subject: The subject name
            context: Optional schema context
            registry: Optional registry name (ignored in single-registry mode)
    
        Returns:
            Dictionary containing version numbers with structured validation and resource links
        """
        try:
            if registry_mode == "single":
                # Single-registry mode: use secure session approach
                client = registry_manager.get_registry()
                if client is None:
                    return create_error_response(
                        "No default registry configured",
                        error_code="REGISTRY_NOT_FOUND",
                        registry_mode="single",
                    )
    
                url = client.build_context_url(f"/subjects/{subject}/versions", context)
    
                response = client.session.get(url, auth=client.auth, headers=client.headers)
    
                # Handle 404 specifically - subject doesn't exist
                if response.status_code == 404:
                    versions_list = []
                else:
                    response.raise_for_status()
                    versions_list = response.json()
    
                # Convert to enhanced response format
                result = {
                    "subject": subject,
                    "versions": versions_list,
                    "registry_mode": "single",
                    "mcp_protocol_version": "2025-06-18",
                }
    
                # Add resource links
                registry_name = _get_registry_name(registry_mode, registry)
                result = add_links_to_response(
                    result,
                    "schema_versions",
                    registry_name,
                    subject=subject,
                    context=context,
                )
    
                return result
            else:
                # Multi-registry mode: use client approach
                client = registry_manager.get_registry(registry)
                if client is None:
                    return create_error_response(
                        f"Registry '{registry}' not found",
                        error_code="REGISTRY_NOT_FOUND",
                        registry_mode="multi",
                    )
    
                url = client.build_context_url(f"/subjects/{subject}/versions", context)
    
                response = client.session.get(url, auth=client.auth, headers=client.headers)
    
                # Handle 404 specifically - subject doesn't exist
                if response.status_code == 404:
                    versions_list = []
                else:
                    response.raise_for_status()
                    versions_list = response.json()
    
                # Convert to enhanced response format
                result = {
                    "subject": subject,
                    "versions": versions_list,
                    "registry": client.config.name,
                    "registry_mode": "multi",
                    "mcp_protocol_version": "2025-06-18",
                }
    
                # Add resource links
                result = add_links_to_response(
                    result,
                    "schema_versions",
                    client.config.name,
                    subject=subject,
                    context=context,
                )
    
                return result
        except Exception as e:
            return create_error_response(str(e), error_code="VERSION_RETRIEVAL_FAILED", registry_mode=registry_mode)
  • JSON Schema definition for the 'get_schema_versions' tool output, including subject, versions array, optional registry and _links for HATEOAS.
    GET_SCHEMA_VERSIONS_SCHEMA = {
        "type": "object",
        "properties": {
            "subject": {
                "type": "string",
                "description": "Subject name for which versions are listed",
            },
            "versions": {
                "type": "array",
                "items": {"type": "integer", "minimum": 1},
                "description": "List of available schema versions",
            },
            "registry": {
                "type": "string",
                "description": "Registry name (multi-registry mode)",
            },
            "_links": {
                "type": "object",
                "description": "Navigation links to related resources",
                "additionalProperties": True,
            },
            **METADATA_FIELDS,
        },
        "required": ["subject", "versions"],
        "additionalProperties": True,
    }
  • Registration of the output schema for 'get_schema_versions' tool in the master TOOL_OUTPUT_SCHEMAS dictionary used for MCP tool validation.
    "get_schema_versions": GET_SCHEMA_VERSIONS_SCHEMA,
    "get_schema_by_id": GET_SCHEMA_BY_ID_SCHEMA,
    "get_subjects_by_schema_id": GET_SUBJECTS_BY_SCHEMA_ID_SCHEMA,
  • Helper function get_schema_versions imported from main MCP server module and used in statistics tools for retrieving schema versions.
            from kafka_schema_registry_unified_mcp import get_schema_versions
    
            versions = get_schema_versions(subject, context, registry)
            if isinstance(versions, dict) and "error" in versions:
                return create_error_response(
                    f"Failed to get schema versions: {versions.get('error')}",
                    error_code="SCHEMA_VERSIONS_RETRIEVAL_FAILED",
                    registry_mode=registry_mode,
                )
    
            # Get registry metadata
            metadata = client.get_server_metadata()
    
            result = {
                "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                "context": context or "default",
                "subject": subject,
                "count": len(versions),
                "scope": "versions",
                "versions": versions,
                "counted_at": datetime.now().isoformat(),
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
            # Add metadata information, but preserve the scope field
            metadata_copy = metadata.copy()
            if "scope" in metadata_copy:
                # Preserve the simple string scope, but add server scope info separately
                metadata_copy["server_scope"] = metadata_copy.pop("scope")
            result.update(metadata_copy)
    
            return result
        except Exception as e:
            return create_error_response(str(e), error_code="VERSION_COUNT_FAILED", registry_mode=registry_mode)
    
    
    @structured_output("get_registry_statistics", fallback_on_error=True)
    def get_registry_statistics_tool(
        registry_manager,
        registry_mode: str,
        registry: Optional[str] = None,
        include_context_details: bool = True,
    ) -> Dict[str, Any]:
        """
        Get comprehensive statistics about a registry.
    
        Args:
            registry: Optional registry name (ignored in single-registry mode)
            include_context_details: Whether to include detailed context statistics
    
        Returns:
            Dictionary containing registry statistics with metadata and structured validation
        """
        try:
            if registry_mode == "single":
                client = get_default_client(registry_manager)
            else:
                client = registry_manager.get_registry(registry)
                if client is None:
                    return create_error_response(
                        f"Registry '{registry}' not found",
                        error_code="REGISTRY_NOT_FOUND",
                        registry_mode=registry_mode,
                    )
    
            # Get all contexts
            contexts = client.get_contexts()
            if isinstance(contexts, dict) and "error" in contexts:
                return create_error_response(
                    f"Failed to get contexts: {contexts.get('error')}",
                    error_code="CONTEXTS_RETRIEVAL_FAILED",
                    registry_mode=registry_mode,
                )
    
            total_schemas = 0
            total_versions = 0
            context_stats = []
    
            # Import the function here to avoid circular imports
            from kafka_schema_registry_unified_mcp import get_schema_versions
    
            # Get statistics for each context
            for context in contexts:
                subjects = client.get_subjects(context)
                if isinstance(subjects, dict) and "error" in subjects:
                    continue
    
                context_schemas = len(subjects)
                context_versions = 0
    
                # Count versions for each subject
                for subject in subjects:
                    versions = get_schema_versions(subject, context, registry)
                    if not isinstance(versions, dict):
                        context_versions += len(versions)
    
                total_schemas += context_schemas
                total_versions += context_versions
    
                if include_context_details:
                    context_stats.append(
                        {
                            "name": context,
                            "subject_count": context_schemas,
                            "schema_count": context_versions,
                        }
                    )
    
            # Get default context stats
            default_subjects = client.get_subjects()
            if not isinstance(default_subjects, dict):
                default_schemas = len(default_subjects)
                default_versions = 0
    
                for subject in default_subjects:
                    versions = get_schema_versions(subject, None, registry)
                    if not isinstance(versions, dict):
                        default_versions += len(versions)
    
                total_schemas += default_schemas
                total_versions += default_versions
    
                if include_context_details:
                    context_stats.append(
                        {
                            "name": "default",
                            "subject_count": default_schemas,
                            "schema_count": default_versions,
                        }
                    )
    
            # Get registry metadata
            metadata = client.get_server_metadata()
    
            result = {
                "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                "total_contexts": len(contexts),
                "total_subjects": total_schemas,
                "total_schemas": total_versions,
                "contexts": context_stats if include_context_details else None,
                "generated_at": datetime.now().isoformat(),
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
            # Add metadata information
            result.update(metadata)
    
            return result
        except Exception as e:
            return create_error_response(str(e), error_code="REGISTRY_STATISTICS_FAILED", registry_mode=registry_mode)
    
    
    # ===== OPTIMIZED ASYNC STATISTICS FUNCTIONS =====
    
    
    async def _count_schemas_async(
        registry_manager,
        registry_mode: str,
        context: Optional[str] = None,
        registry: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Async version of count_schemas_tool with better performance.
        Uses parallel API calls when counting multiple contexts.
        Includes registry metadata information.
        """
        try:
            if registry_mode == "single":
                client = get_default_client(registry_manager)
            else:
                client = registry_manager.get_registry(registry)
                if client is None:
                    return {"error": f"Registry '{registry}' not found"}
    
            # Get registry metadata
            metadata = client.get_server_metadata()
    
            if context:
                # Single context - direct call
                subjects = client.get_subjects(context)
                if isinstance(subjects, dict) and "error" in subjects:
                    return subjects
    
                result = {
                    "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                    "context": context,
                    "count": len(subjects),  # Use 'count' to match schema
                    "scope": "schemas",  # Add scope field as string
                    "total_schemas": len(subjects),
                    "schemas": subjects,
                    "counted_at": datetime.now(timezone.utc).isoformat(),
                }
    
                # Add metadata information, but preserve the scope field
                metadata_copy = metadata.copy()
                if "scope" in metadata_copy:
                    # Preserve the simple string scope, but add server scope info separately
                    metadata_copy["server_scope"] = metadata_copy.pop("scope")
                result.update(metadata_copy)
                return result
            else:
                # All contexts - parallel execution
                contexts = client.get_contexts()
                if isinstance(contexts, dict) and "error" in contexts:
                    return contexts
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server