count_schema_versions
Track and count schema versions in MCP Kafka Schema Registry by specifying the subject. Use this tool to manage and monitor schema evolution efficiently.
Instructions
Count the number of versions for a specific schema.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| context | No | ||
| registry | No | ||
| subject | Yes |
Implementation Reference
- statistics_tools.py:145-212 (handler)Core implementation of the count_schema_versions tool handler. Fetches schema versions using get_schema_versions helper, computes count, and returns structured response with metadata.@structured_output("count_schema_versions", fallback_on_error=True) def count_schema_versions_tool( subject: str, registry_manager, registry_mode: str, context: Optional[str] = None, registry: Optional[str] = None, ) -> Dict[str, Any]: """ Count the number of versions for a specific schema. Args: subject: The subject name context: Optional schema context registry: Optional registry name (ignored in single-registry mode) Returns: Dictionary containing version count and details with registry metadata and structured validation """ try: if registry_mode == "single": client = get_default_client(registry_manager) else: client = registry_manager.get_registry(registry) if client is None: return create_error_response( f"Registry '{registry}' not found", error_code="REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) # Import the function here to avoid circular imports from kafka_schema_registry_unified_mcp import get_schema_versions versions = get_schema_versions(subject, context, registry) if isinstance(versions, dict) and "error" in versions: return create_error_response( f"Failed to get schema versions: {versions.get('error')}", error_code="SCHEMA_VERSIONS_RETRIEVAL_FAILED", registry_mode=registry_mode, ) # Get registry metadata metadata = client.get_server_metadata() result = { "registry": (client.config.name if hasattr(client.config, "name") else "default"), "context": context or "default", "subject": subject, "count": len(versions), "scope": "versions", "versions": versions, "counted_at": datetime.now().isoformat(), "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Add metadata information, but preserve the scope field metadata_copy = metadata.copy() if "scope" in metadata_copy: # Preserve the simple string scope, but add server scope info separately metadata_copy["server_scope"] = metadata_copy.pop("scope") result.update(metadata_copy) return result except Exception as e: return create_error_response(str(e), error_code="VERSION_COUNT_FAILED", registry_mode=registry_mode)
- schema_definitions.py:591-608 (schema)JSON Schema definition (COUNT_SCHEMA) used for output validation of the count_schema_versions tool and other count tools.COUNT_SCHEMA = { "type": "object", "properties": { "count": {"type": "integer", "minimum": 0, "description": "Count result"}, "scope": { "type": "string", "description": "What was counted (contexts, schemas, versions)", }, "context": { "type": "string", "description": "Context name if scoped to context", }, "registry": {"type": "string", "description": "Registry name"}, **METADATA_FIELDS, }, "required": ["count", "scope"], "additionalProperties": True, }
- schema_definitions.py:1016-1016 (schema)Mapping of 'count_schema_versions' tool name to its output schema (COUNT_SCHEMA) in TOOL_OUTPUT_SCHEMAS dictionary."count_schema_versions": COUNT_SCHEMA,
- task_management.py:196-198 (registration)Registration of operation metadata for count_schema_versions in task manager's OPERATION_METADATA, classifying it as QUICK DIRECT execution."count_schema_versions": { "duration": OperationDuration.QUICK, "pattern": AsyncPattern.DIRECT,