Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

count_schemas

Count the number of schemas within a specific context or registry in Kafka Schema Registry using this MCP server tool.

Instructions

Count the number of schemas in a context or registry.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contextNo
registryNo

Implementation Reference

  • Primary handler function for the 'count_schemas' MCP tool. Counts schemas in a context or registry, returns structured output with metadata.
    @structured_output("count_schemas", fallback_on_error=True)
    def count_schemas_tool(
        registry_manager,
        registry_mode: str,
        context: Optional[str] = None,
        registry: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Count the number of schemas in a context or registry.
    
        Args:
            context: Optional schema context
            registry: Optional registry name (ignored in single-registry mode)
    
        Returns:
            Dictionary containing schema count and details with registry metadata and structured validation
        """
        try:
            if registry_mode == "single":
                client = get_default_client(registry_manager)
            else:
                client = registry_manager.get_registry(registry)
                if client is None:
                    return create_error_response(
                        f"Registry '{registry}' not found",
                        error_code="REGISTRY_NOT_FOUND",
                        registry_mode=registry_mode,
                    )
    
            subjects = client.get_subjects(context)
            if isinstance(subjects, dict) and "error" in subjects:
                return create_error_response(
                    f"Failed to get subjects: {subjects.get('error')}",
                    error_code="SUBJECTS_RETRIEVAL_FAILED",
                    registry_mode=registry_mode,
                )
    
            # Get registry metadata
            metadata = client.get_server_metadata()
    
            result = {
                "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                "context": context or "default",
                "count": len(subjects),
                "scope": "schemas",
                "schemas": subjects,
                "counted_at": datetime.now().isoformat(),
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
            # Add metadata information, but preserve the scope field
            metadata_copy = metadata.copy()
            if "scope" in metadata_copy:
                # Preserve the simple string scope, but add server scope info separately
                metadata_copy["server_scope"] = metadata_copy.pop("scope")
            result.update(metadata_copy)
    
            return result
        except Exception as e:
            return create_error_response(str(e), error_code="SCHEMA_COUNT_FAILED", registry_mode=registry_mode)
  • JSON Schema definition (COUNT_SCHEMA) used for validating the output of the count_schemas tool.
    COUNT_SCHEMA = {
        "type": "object",
        "properties": {
            "count": {"type": "integer", "minimum": 0, "description": "Count result"},
            "scope": {
                "type": "string",
                "description": "What was counted (contexts, schemas, versions)",
            },
            "context": {
                "type": "string",
                "description": "Context name if scoped to context",
            },
            "registry": {"type": "string", "description": "Registry name"},
            **METADATA_FIELDS,
        },
        "required": ["count", "scope"],
        "additionalProperties": True,
    }
  • Async helper function used by task queue version for performant schema counting across multiple contexts using parallel execution.
    async def _count_schemas_async(
        registry_manager,
        registry_mode: str,
        context: Optional[str] = None,
        registry: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Async version of count_schemas_tool with better performance.
        Uses parallel API calls when counting multiple contexts.
        Includes registry metadata information.
        """
        try:
            if registry_mode == "single":
                client = get_default_client(registry_manager)
            else:
                client = registry_manager.get_registry(registry)
                if client is None:
                    return {"error": f"Registry '{registry}' not found"}
    
            # Get registry metadata
            metadata = client.get_server_metadata()
    
            if context:
                # Single context - direct call
                subjects = client.get_subjects(context)
                if isinstance(subjects, dict) and "error" in subjects:
                    return subjects
    
                result = {
                    "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                    "context": context,
                    "count": len(subjects),  # Use 'count' to match schema
                    "scope": "schemas",  # Add scope field as string
                    "total_schemas": len(subjects),
                    "schemas": subjects,
                    "counted_at": datetime.now(timezone.utc).isoformat(),
                }
    
                # Add metadata information, but preserve the scope field
                metadata_copy = metadata.copy()
                if "scope" in metadata_copy:
                    # Preserve the simple string scope, but add server scope info separately
                    metadata_copy["server_scope"] = metadata_copy.pop("scope")
                result.update(metadata_copy)
                return result
            else:
                # All contexts - parallel execution
                contexts = client.get_contexts()
                if isinstance(contexts, dict) and "error" in contexts:
                    return contexts
    
                total_schemas = 0
                all_schemas = {}
    
                # Parallel execution for better performance
                with ThreadPoolExecutor(max_workers=5) as executor:
                    future_to_context = {executor.submit(client.get_subjects, ctx): ctx for ctx in contexts}
    
                    # Add default context
                    future_to_context[executor.submit(client.get_subjects, None)] = "default"
    
                    for future in as_completed(future_to_context):
                        ctx = future_to_context[future]
                        try:
                            subjects = future.result()
                            if not isinstance(subjects, dict):
                                all_schemas[ctx] = subjects
                                total_schemas += len(subjects)
                        except Exception as e:
                            all_schemas[ctx] = {"error": str(e)}
    
                result = {
                    "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                    "count": total_schemas,  # Use 'count' to match schema
                    "scope": "schemas",  # Add scope field as string
                    "total_schemas": total_schemas,
                    "schemas_by_context": all_schemas,
                    "contexts_analyzed": len(all_schemas),
                    "counted_at": datetime.now(timezone.utc).isoformat(),
                }
    
                # Add metadata information, but preserve the scope field
                metadata_copy = metadata.copy()
                if "scope" in metadata_copy:
                    # Preserve the simple string scope, but add server scope info separately
                    metadata_copy["server_scope"] = metadata_copy.pop("scope")
                result.update(metadata_copy)
                return result
        except Exception as e:
            return {"error": str(e)}
  • TOOL_OUTPUT_SCHEMAS mapping that registers 'count_schemas' to use COUNT_SCHEMA for structured output validation.
    "count_contexts": COUNT_SCHEMA,
    "count_schemas": COUNT_SCHEMA,
    "count_schema_versions": COUNT_SCHEMA,
  • Task queue wrapper tool that offloads count_schemas to background task for non-blocking operation.
    @structured_output("count_schemas_task_queue", fallback_on_error=True)
    def count_schemas_task_queue_tool(
        registry_manager,
        registry_mode: str,
        context: Optional[str] = None,
        registry: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Task queue version of count_schemas for better performance on large registries.
        Returns task_id immediately for async execution.
    
        Returns:
            Task information with task_id for monitoring progress with structured validation
        """
        try:
            # Create async task
            task = task_manager.create_task(
                TaskType.STATISTICS,
                metadata={
                    "operation": "count_schemas",
                    "context": context,
                    "registry": registry,
                },
            )
    
            # Start async execution
            try:
                asyncio.create_task(
                    task_manager.execute_task(
                        task,
                        _count_schemas_async,
                        registry_manager=registry_manager,
                        registry_mode=registry_mode,
                        context=context,
                        registry=registry,
                    )
                )
            except RuntimeError:
                # No running event loop, use thread pool
                def run_task():
                    asyncio.run(
                        task_manager.execute_task(
                            task,
                            _count_schemas_async,
                            registry_manager=registry_manager,
                            registry_mode=registry_mode,
                            context=context,
                            registry=registry,
                        )
                    )
    
                thread = threading.Thread(target=run_task)
                thread.start()
    
            return {
                "message": "Schema counting started as async task",
                "task_id": task.id,
                "task": task.to_dict(),
                "operation_info": {
                    "operation": "count_schemas",
                    "expected_duration": "medium",
                    "async_pattern": "task_queue",
                    "guidance": "Long-running operation. Returns task_id immediately. Use get_task_status() to monitor progress.",
                    "registry_mode": registry_mode,
                },
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
        except Exception as e:
            return create_error_response(str(e), error_code="TASK_CREATION_FAILED", registry_mode=registry_mode)
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server