Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

count_schemas

Count the number of schemas within a specific context or registry in Kafka Schema Registry using this MCP server tool.

Instructions

Count the number of schemas in a context or registry.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contextNo
registryNo

Implementation Reference

  • Primary handler function for the 'count_schemas' MCP tool. Counts schemas in a context or registry, returns structured output with metadata.
    @structured_output("count_schemas", fallback_on_error=True)
    def count_schemas_tool(
        registry_manager,
        registry_mode: str,
        context: Optional[str] = None,
        registry: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Count the number of schemas in a context or registry.
    
        Args:
            context: Optional schema context
            registry: Optional registry name (ignored in single-registry mode)
    
        Returns:
            Dictionary containing schema count and details with registry metadata and structured validation
        """
        try:
            if registry_mode == "single":
                client = get_default_client(registry_manager)
            else:
                client = registry_manager.get_registry(registry)
                if client is None:
                    return create_error_response(
                        f"Registry '{registry}' not found",
                        error_code="REGISTRY_NOT_FOUND",
                        registry_mode=registry_mode,
                    )
    
            subjects = client.get_subjects(context)
            if isinstance(subjects, dict) and "error" in subjects:
                return create_error_response(
                    f"Failed to get subjects: {subjects.get('error')}",
                    error_code="SUBJECTS_RETRIEVAL_FAILED",
                    registry_mode=registry_mode,
                )
    
            # Get registry metadata
            metadata = client.get_server_metadata()
    
            result = {
                "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                "context": context or "default",
                "count": len(subjects),
                "scope": "schemas",
                "schemas": subjects,
                "counted_at": datetime.now().isoformat(),
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
            # Add metadata information, but preserve the scope field
            metadata_copy = metadata.copy()
            if "scope" in metadata_copy:
                # Preserve the simple string scope, but add server scope info separately
                metadata_copy["server_scope"] = metadata_copy.pop("scope")
            result.update(metadata_copy)
    
            return result
        except Exception as e:
            return create_error_response(str(e), error_code="SCHEMA_COUNT_FAILED", registry_mode=registry_mode)
  • JSON Schema definition (COUNT_SCHEMA) used for validating the output of the count_schemas tool.
    COUNT_SCHEMA = {
        "type": "object",
        "properties": {
            "count": {"type": "integer", "minimum": 0, "description": "Count result"},
            "scope": {
                "type": "string",
                "description": "What was counted (contexts, schemas, versions)",
            },
            "context": {
                "type": "string",
                "description": "Context name if scoped to context",
            },
            "registry": {"type": "string", "description": "Registry name"},
            **METADATA_FIELDS,
        },
        "required": ["count", "scope"],
        "additionalProperties": True,
    }
  • Async helper function used by task queue version for performant schema counting across multiple contexts using parallel execution.
    async def _count_schemas_async(
        registry_manager,
        registry_mode: str,
        context: Optional[str] = None,
        registry: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Async version of count_schemas_tool with better performance.
        Uses parallel API calls when counting multiple contexts.
        Includes registry metadata information.
        """
        try:
            if registry_mode == "single":
                client = get_default_client(registry_manager)
            else:
                client = registry_manager.get_registry(registry)
                if client is None:
                    return {"error": f"Registry '{registry}' not found"}
    
            # Get registry metadata
            metadata = client.get_server_metadata()
    
            if context:
                # Single context - direct call
                subjects = client.get_subjects(context)
                if isinstance(subjects, dict) and "error" in subjects:
                    return subjects
    
                result = {
                    "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                    "context": context,
                    "count": len(subjects),  # Use 'count' to match schema
                    "scope": "schemas",  # Add scope field as string
                    "total_schemas": len(subjects),
                    "schemas": subjects,
                    "counted_at": datetime.now(timezone.utc).isoformat(),
                }
    
                # Add metadata information, but preserve the scope field
                metadata_copy = metadata.copy()
                if "scope" in metadata_copy:
                    # Preserve the simple string scope, but add server scope info separately
                    metadata_copy["server_scope"] = metadata_copy.pop("scope")
                result.update(metadata_copy)
                return result
            else:
                # All contexts - parallel execution
                contexts = client.get_contexts()
                if isinstance(contexts, dict) and "error" in contexts:
                    return contexts
    
                total_schemas = 0
                all_schemas = {}
    
                # Parallel execution for better performance
                with ThreadPoolExecutor(max_workers=5) as executor:
                    future_to_context = {executor.submit(client.get_subjects, ctx): ctx for ctx in contexts}
    
                    # Add default context
                    future_to_context[executor.submit(client.get_subjects, None)] = "default"
    
                    for future in as_completed(future_to_context):
                        ctx = future_to_context[future]
                        try:
                            subjects = future.result()
                            if not isinstance(subjects, dict):
                                all_schemas[ctx] = subjects
                                total_schemas += len(subjects)
                        except Exception as e:
                            all_schemas[ctx] = {"error": str(e)}
    
                result = {
                    "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                    "count": total_schemas,  # Use 'count' to match schema
                    "scope": "schemas",  # Add scope field as string
                    "total_schemas": total_schemas,
                    "schemas_by_context": all_schemas,
                    "contexts_analyzed": len(all_schemas),
                    "counted_at": datetime.now(timezone.utc).isoformat(),
                }
    
                # Add metadata information, but preserve the scope field
                metadata_copy = metadata.copy()
                if "scope" in metadata_copy:
                    # Preserve the simple string scope, but add server scope info separately
                    metadata_copy["server_scope"] = metadata_copy.pop("scope")
                result.update(metadata_copy)
                return result
        except Exception as e:
            return {"error": str(e)}
  • TOOL_OUTPUT_SCHEMAS mapping that registers 'count_schemas' to use COUNT_SCHEMA for structured output validation.
    "count_contexts": COUNT_SCHEMA,
    "count_schemas": COUNT_SCHEMA,
    "count_schema_versions": COUNT_SCHEMA,
  • Task queue wrapper tool that offloads count_schemas to background task for non-blocking operation.
    @structured_output("count_schemas_task_queue", fallback_on_error=True)
    def count_schemas_task_queue_tool(
        registry_manager,
        registry_mode: str,
        context: Optional[str] = None,
        registry: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Task queue version of count_schemas for better performance on large registries.
        Returns task_id immediately for async execution.
    
        Returns:
            Task information with task_id for monitoring progress with structured validation
        """
        try:
            # Create async task
            task = task_manager.create_task(
                TaskType.STATISTICS,
                metadata={
                    "operation": "count_schemas",
                    "context": context,
                    "registry": registry,
                },
            )
    
            # Start async execution
            try:
                asyncio.create_task(
                    task_manager.execute_task(
                        task,
                        _count_schemas_async,
                        registry_manager=registry_manager,
                        registry_mode=registry_mode,
                        context=context,
                        registry=registry,
                    )
                )
            except RuntimeError:
                # No running event loop, use thread pool
                def run_task():
                    asyncio.run(
                        task_manager.execute_task(
                            task,
                            _count_schemas_async,
                            registry_manager=registry_manager,
                            registry_mode=registry_mode,
                            context=context,
                            registry=registry,
                        )
                    )
    
                thread = threading.Thread(target=run_task)
                thread.start()
    
            return {
                "message": "Schema counting started as async task",
                "task_id": task.id,
                "task": task.to_dict(),
                "operation_info": {
                    "operation": "count_schemas",
                    "expected_duration": "medium",
                    "async_pattern": "task_queue",
                    "guidance": "Long-running operation. Returns task_id immediately. Use get_task_status() to monitor progress.",
                    "registry_mode": registry_mode,
                },
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
        except Exception as e:
            return create_error_response(str(e), error_code="TASK_CREATION_FAILED", registry_mode=registry_mode)
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries full burden but offers minimal behavioral insight. It doesn't disclose whether this is a read-only operation, what permissions are required, whether it counts active vs all schemas, or how results are returned (e.g., as a number, with metadata). The phrase 'Count the number' implies a simple read operation but lacks critical details for safe invocation.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence that front-loads the core purpose without unnecessary words. Every part of the sentence ('Count the number of schemas in a context or registry') contributes directly to understanding the tool's function, making it optimally concise for its limited content.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's moderate complexity (2 parameters with no schema descriptions, no annotations, no output schema), the description is incomplete. It doesn't explain parameter usage, behavioral constraints, or return format, leaving significant gaps for the agent to guess. For a counting tool with undefined parameters, more context is needed to ensure correct invocation.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The input schema has 0% description coverage, so the description must compensate but fails to do so. It mentions 'context or registry' but doesn't explain what these parameters represent, their format, whether both can be used simultaneously, or what happens when neither is provided (since both default to null). The description adds minimal value beyond what's inferred from parameter names.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the action ('Count') and resource ('number of schemas'), specifying the scope ('in a context or registry'). It distinguishes from obvious siblings like 'count_contexts' and 'count_schema_versions' by focusing on schemas, though it doesn't explicitly differentiate from tools like 'list_subjects' or 'get_schema_versions' that might also involve schema enumeration.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives. It doesn't mention prerequisites, when to choose 'context' vs 'registry' parameters, or how it differs from sibling tools like 'list_subjects' or 'get_schema_versions' that might provide similar counting functionality. The agent must infer usage from the tool name alone.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server