get_schema_versions
Retrieve all schema versions for a specific subject in the Kafka Schema Registry. Use this tool for backward compatibility, or switch to the 'schema://{name}/{context}/{subject}/versions' resource for improved performance.
Instructions
Get all versions of a schema for a subject.
NOTE: This tool is maintained for backward compatibility. Consider using the 'schema://{name}/{context}/{subject}/versions' resource instead for better performance.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| context | No | ||
| registry | No | ||
| subject | Yes |
Implementation Reference
- core_registry_tools.py:279-384 (handler)Main handler function for the 'get_schema_versions' tool. Performs HTTP GET request to the Schema Registry API endpoint '/subjects/{subject}/versions' (with optional context prefix), handles single/multi-registry modes, formats response with metadata, HATEOAS links, and MCP protocol compliance.@structured_output("get_schema_versions", fallback_on_error=True) def get_schema_versions_tool( subject: str, registry_manager, registry_mode: str, context: Optional[str] = None, registry: Optional[str] = None, auth=None, headers=None, schema_registry_url: str = "", ) -> Dict[str, Any]: """ Get all versions of a schema for a subject. Args: subject: The subject name context: Optional schema context registry: Optional registry name (ignored in single-registry mode) Returns: Dictionary containing version numbers with structured validation and resource links """ try: if registry_mode == "single": # Single-registry mode: use secure session approach client = registry_manager.get_registry() if client is None: return create_error_response( "No default registry configured", error_code="REGISTRY_NOT_FOUND", registry_mode="single", ) url = client.build_context_url(f"/subjects/{subject}/versions", context) response = client.session.get(url, auth=client.auth, headers=client.headers) # Handle 404 specifically - subject doesn't exist if response.status_code == 404: versions_list = [] else: response.raise_for_status() versions_list = response.json() # Convert to enhanced response format result = { "subject": subject, "versions": versions_list, "registry_mode": "single", "mcp_protocol_version": "2025-06-18", } # Add resource links registry_name = _get_registry_name(registry_mode, registry) result = add_links_to_response( result, "schema_versions", registry_name, subject=subject, context=context, ) return result else: # Multi-registry mode: use client approach client = registry_manager.get_registry(registry) if client is None: return create_error_response( f"Registry '{registry}' not found", error_code="REGISTRY_NOT_FOUND", registry_mode="multi", ) url = client.build_context_url(f"/subjects/{subject}/versions", context) response = client.session.get(url, auth=client.auth, headers=client.headers) # Handle 404 specifically - subject doesn't exist if response.status_code == 404: versions_list = [] else: response.raise_for_status() versions_list = response.json() # Convert to enhanced response format result = { "subject": subject, "versions": versions_list, "registry": client.config.name, "registry_mode": "multi", "mcp_protocol_version": "2025-06-18", } # Add resource links result = add_links_to_response( result, "schema_versions", client.config.name, subject=subject, context=context, ) return result except Exception as e: return create_error_response(str(e), error_code="VERSION_RETRIEVAL_FAILED", registry_mode=registry_mode)
- schema_definitions.py:154-179 (schema)JSON Schema definition for the 'get_schema_versions' tool output, including subject, versions array, optional registry and _links for HATEOAS.GET_SCHEMA_VERSIONS_SCHEMA = { "type": "object", "properties": { "subject": { "type": "string", "description": "Subject name for which versions are listed", }, "versions": { "type": "array", "items": {"type": "integer", "minimum": 1}, "description": "List of available schema versions", }, "registry": { "type": "string", "description": "Registry name (multi-registry mode)", }, "_links": { "type": "object", "description": "Navigation links to related resources", "additionalProperties": True, }, **METADATA_FIELDS, }, "required": ["subject", "versions"], "additionalProperties": True, }
- schema_definitions.py:917-919 (registration)Registration of the output schema for 'get_schema_versions' tool in the master TOOL_OUTPUT_SCHEMAS dictionary used for MCP tool validation."get_schema_versions": GET_SCHEMA_VERSIONS_SCHEMA, "get_schema_by_id": GET_SCHEMA_BY_ID_SCHEMA, "get_subjects_by_schema_id": GET_SUBJECTS_BY_SCHEMA_ID_SCHEMA,
- statistics_tools.py:177-384 (helper)Helper function get_schema_versions imported from main MCP server module and used in statistics tools for retrieving schema versions.from kafka_schema_registry_unified_mcp import get_schema_versions versions = get_schema_versions(subject, context, registry) if isinstance(versions, dict) and "error" in versions: return create_error_response( f"Failed to get schema versions: {versions.get('error')}", error_code="SCHEMA_VERSIONS_RETRIEVAL_FAILED", registry_mode=registry_mode, ) # Get registry metadata metadata = client.get_server_metadata() result = { "registry": (client.config.name if hasattr(client.config, "name") else "default"), "context": context or "default", "subject": subject, "count": len(versions), "scope": "versions", "versions": versions, "counted_at": datetime.now().isoformat(), "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Add metadata information, but preserve the scope field metadata_copy = metadata.copy() if "scope" in metadata_copy: # Preserve the simple string scope, but add server scope info separately metadata_copy["server_scope"] = metadata_copy.pop("scope") result.update(metadata_copy) return result except Exception as e: return create_error_response(str(e), error_code="VERSION_COUNT_FAILED", registry_mode=registry_mode) @structured_output("get_registry_statistics", fallback_on_error=True) def get_registry_statistics_tool( registry_manager, registry_mode: str, registry: Optional[str] = None, include_context_details: bool = True, ) -> Dict[str, Any]: """ Get comprehensive statistics about a registry. Args: registry: Optional registry name (ignored in single-registry mode) include_context_details: Whether to include detailed context statistics Returns: Dictionary containing registry statistics with metadata and structured validation """ try: if registry_mode == "single": client = get_default_client(registry_manager) else: client = registry_manager.get_registry(registry) if client is None: return create_error_response( f"Registry '{registry}' not found", error_code="REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) # Get all contexts contexts = client.get_contexts() if isinstance(contexts, dict) and "error" in contexts: return create_error_response( f"Failed to get contexts: {contexts.get('error')}", error_code="CONTEXTS_RETRIEVAL_FAILED", registry_mode=registry_mode, ) total_schemas = 0 total_versions = 0 context_stats = [] # Import the function here to avoid circular imports from kafka_schema_registry_unified_mcp import get_schema_versions # Get statistics for each context for context in contexts: subjects = client.get_subjects(context) if isinstance(subjects, dict) and "error" in subjects: continue context_schemas = len(subjects) context_versions = 0 # Count versions for each subject for subject in subjects: versions = get_schema_versions(subject, context, registry) if not isinstance(versions, dict): context_versions += len(versions) total_schemas += context_schemas total_versions += context_versions if include_context_details: context_stats.append( { "name": context, "subject_count": context_schemas, "schema_count": context_versions, } ) # Get default context stats default_subjects = client.get_subjects() if not isinstance(default_subjects, dict): default_schemas = len(default_subjects) default_versions = 0 for subject in default_subjects: versions = get_schema_versions(subject, None, registry) if not isinstance(versions, dict): default_versions += len(versions) total_schemas += default_schemas total_versions += default_versions if include_context_details: context_stats.append( { "name": "default", "subject_count": default_schemas, "schema_count": default_versions, } ) # Get registry metadata metadata = client.get_server_metadata() result = { "registry": (client.config.name if hasattr(client.config, "name") else "default"), "total_contexts": len(contexts), "total_subjects": total_schemas, "total_schemas": total_versions, "contexts": context_stats if include_context_details else None, "generated_at": datetime.now().isoformat(), "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Add metadata information result.update(metadata) return result except Exception as e: return create_error_response(str(e), error_code="REGISTRY_STATISTICS_FAILED", registry_mode=registry_mode) # ===== OPTIMIZED ASYNC STATISTICS FUNCTIONS ===== async def _count_schemas_async( registry_manager, registry_mode: str, context: Optional[str] = None, registry: Optional[str] = None, ) -> Dict[str, Any]: """ Async version of count_schemas_tool with better performance. Uses parallel API calls when counting multiple contexts. Includes registry metadata information. """ try: if registry_mode == "single": client = get_default_client(registry_manager) else: client = registry_manager.get_registry(registry) if client is None: return {"error": f"Registry '{registry}' not found"} # Get registry metadata metadata = client.get_server_metadata() if context: # Single context - direct call subjects = client.get_subjects(context) if isinstance(subjects, dict) and "error" in subjects: return subjects result = { "registry": (client.config.name if hasattr(client.config, "name") else "default"), "context": context, "count": len(subjects), # Use 'count' to match schema "scope": "schemas", # Add scope field as string "total_schemas": len(subjects), "schemas": subjects, "counted_at": datetime.now(timezone.utc).isoformat(), } # Add metadata information, but preserve the scope field metadata_copy = metadata.copy() if "scope" in metadata_copy: # Preserve the simple string scope, but add server scope info separately metadata_copy["server_scope"] = metadata_copy.pop("scope") result.update(metadata_copy) return result else: # All contexts - parallel execution contexts = client.get_contexts() if isinstance(contexts, dict) and "error" in contexts: return contexts