count_schemas
Count the number of schemas within a specific context or registry in Kafka Schema Registry using this MCP server tool.
Instructions
Count the number of schemas in a context or registry.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| context | No | ||
| registry | No |
Implementation Reference
- statistics_tools.py:82-142 (handler)Primary handler function for the 'count_schemas' MCP tool. Counts schemas in a context or registry, returns structured output with metadata.@structured_output("count_schemas", fallback_on_error=True) def count_schemas_tool( registry_manager, registry_mode: str, context: Optional[str] = None, registry: Optional[str] = None, ) -> Dict[str, Any]: """ Count the number of schemas in a context or registry. Args: context: Optional schema context registry: Optional registry name (ignored in single-registry mode) Returns: Dictionary containing schema count and details with registry metadata and structured validation """ try: if registry_mode == "single": client = get_default_client(registry_manager) else: client = registry_manager.get_registry(registry) if client is None: return create_error_response( f"Registry '{registry}' not found", error_code="REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) subjects = client.get_subjects(context) if isinstance(subjects, dict) and "error" in subjects: return create_error_response( f"Failed to get subjects: {subjects.get('error')}", error_code="SUBJECTS_RETRIEVAL_FAILED", registry_mode=registry_mode, ) # Get registry metadata metadata = client.get_server_metadata() result = { "registry": (client.config.name if hasattr(client.config, "name") else "default"), "context": context or "default", "count": len(subjects), "scope": "schemas", "schemas": subjects, "counted_at": datetime.now().isoformat(), "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Add metadata information, but preserve the scope field metadata_copy = metadata.copy() if "scope" in metadata_copy: # Preserve the simple string scope, but add server scope info separately metadata_copy["server_scope"] = metadata_copy.pop("scope") result.update(metadata_copy) return result except Exception as e: return create_error_response(str(e), error_code="SCHEMA_COUNT_FAILED", registry_mode=registry_mode)
- schema_definitions.py:591-608 (schema)JSON Schema definition (COUNT_SCHEMA) used for validating the output of the count_schemas tool.COUNT_SCHEMA = { "type": "object", "properties": { "count": {"type": "integer", "minimum": 0, "description": "Count result"}, "scope": { "type": "string", "description": "What was counted (contexts, schemas, versions)", }, "context": { "type": "string", "description": "Context name if scoped to context", }, "registry": {"type": "string", "description": "Registry name"}, **METADATA_FIELDS, }, "required": ["count", "scope"], "additionalProperties": True, }
- statistics_tools.py:334-424 (helper)Async helper function used by task queue version for performant schema counting across multiple contexts using parallel execution.async def _count_schemas_async( registry_manager, registry_mode: str, context: Optional[str] = None, registry: Optional[str] = None, ) -> Dict[str, Any]: """ Async version of count_schemas_tool with better performance. Uses parallel API calls when counting multiple contexts. Includes registry metadata information. """ try: if registry_mode == "single": client = get_default_client(registry_manager) else: client = registry_manager.get_registry(registry) if client is None: return {"error": f"Registry '{registry}' not found"} # Get registry metadata metadata = client.get_server_metadata() if context: # Single context - direct call subjects = client.get_subjects(context) if isinstance(subjects, dict) and "error" in subjects: return subjects result = { "registry": (client.config.name if hasattr(client.config, "name") else "default"), "context": context, "count": len(subjects), # Use 'count' to match schema "scope": "schemas", # Add scope field as string "total_schemas": len(subjects), "schemas": subjects, "counted_at": datetime.now(timezone.utc).isoformat(), } # Add metadata information, but preserve the scope field metadata_copy = metadata.copy() if "scope" in metadata_copy: # Preserve the simple string scope, but add server scope info separately metadata_copy["server_scope"] = metadata_copy.pop("scope") result.update(metadata_copy) return result else: # All contexts - parallel execution contexts = client.get_contexts() if isinstance(contexts, dict) and "error" in contexts: return contexts total_schemas = 0 all_schemas = {} # Parallel execution for better performance with ThreadPoolExecutor(max_workers=5) as executor: future_to_context = {executor.submit(client.get_subjects, ctx): ctx for ctx in contexts} # Add default context future_to_context[executor.submit(client.get_subjects, None)] = "default" for future in as_completed(future_to_context): ctx = future_to_context[future] try: subjects = future.result() if not isinstance(subjects, dict): all_schemas[ctx] = subjects total_schemas += len(subjects) except Exception as e: all_schemas[ctx] = {"error": str(e)} result = { "registry": (client.config.name if hasattr(client.config, "name") else "default"), "count": total_schemas, # Use 'count' to match schema "scope": "schemas", # Add scope field as string "total_schemas": total_schemas, "schemas_by_context": all_schemas, "contexts_analyzed": len(all_schemas), "counted_at": datetime.now(timezone.utc).isoformat(), } # Add metadata information, but preserve the scope field metadata_copy = metadata.copy() if "scope" in metadata_copy: # Preserve the simple string scope, but add server scope info separately metadata_copy["server_scope"] = metadata_copy.pop("scope") result.update(metadata_copy) return result except Exception as e: return {"error": str(e)}
- schema_definitions.py:1014-1016 (registration)TOOL_OUTPUT_SCHEMAS mapping that registers 'count_schemas' to use COUNT_SCHEMA for structured output validation."count_contexts": COUNT_SCHEMA, "count_schemas": COUNT_SCHEMA, "count_schema_versions": COUNT_SCHEMA,
- statistics_tools.py:426-497 (helper)Task queue wrapper tool that offloads count_schemas to background task for non-blocking operation.@structured_output("count_schemas_task_queue", fallback_on_error=True) def count_schemas_task_queue_tool( registry_manager, registry_mode: str, context: Optional[str] = None, registry: Optional[str] = None, ) -> Dict[str, Any]: """ Task queue version of count_schemas for better performance on large registries. Returns task_id immediately for async execution. Returns: Task information with task_id for monitoring progress with structured validation """ try: # Create async task task = task_manager.create_task( TaskType.STATISTICS, metadata={ "operation": "count_schemas", "context": context, "registry": registry, }, ) # Start async execution try: asyncio.create_task( task_manager.execute_task( task, _count_schemas_async, registry_manager=registry_manager, registry_mode=registry_mode, context=context, registry=registry, ) ) except RuntimeError: # No running event loop, use thread pool def run_task(): asyncio.run( task_manager.execute_task( task, _count_schemas_async, registry_manager=registry_manager, registry_mode=registry_mode, context=context, registry=registry, ) ) thread = threading.Thread(target=run_task) thread.start() return { "message": "Schema counting started as async task", "task_id": task.id, "task": task.to_dict(), "operation_info": { "operation": "count_schemas", "expected_duration": "medium", "async_pattern": "task_queue", "guidance": "Long-running operation. Returns task_id immediately. Use get_task_status() to monitor progress.", "registry_mode": registry_mode, }, "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } except Exception as e: return create_error_response(str(e), error_code="TASK_CREATION_FAILED", registry_mode=registry_mode)