Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

get_registry_statistics

Retrieve detailed statistics for a registry, including context details, using the MCP Kafka Schema Reg server. Analyze registry data efficiently for informed decision-making.

Instructions

Get comprehensive statistics about a registry.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
include_context_detailsNo
registryNo

Implementation Reference

  • Primary synchronous handler implementing the core logic for get_registry_statistics tool. Fetches contexts, iterates over subjects and versions to compute totals, includes optional detailed context stats, and adds registry metadata.
    @structured_output("get_registry_statistics", fallback_on_error=True)
    def get_registry_statistics_tool(
        registry_manager,
        registry_mode: str,
        registry: Optional[str] = None,
        include_context_details: bool = True,
    ) -> Dict[str, Any]:
        """
        Get comprehensive statistics about a registry.
    
        Args:
            registry: Optional registry name (ignored in single-registry mode)
            include_context_details: Whether to include detailed context statistics
    
        Returns:
            Dictionary containing registry statistics with metadata and structured validation
        """
        try:
            if registry_mode == "single":
                client = get_default_client(registry_manager)
            else:
                client = registry_manager.get_registry(registry)
                if client is None:
                    return create_error_response(
                        f"Registry '{registry}' not found",
                        error_code="REGISTRY_NOT_FOUND",
                        registry_mode=registry_mode,
                    )
    
            # Get all contexts
            contexts = client.get_contexts()
            if isinstance(contexts, dict) and "error" in contexts:
                return create_error_response(
                    f"Failed to get contexts: {contexts.get('error')}",
                    error_code="CONTEXTS_RETRIEVAL_FAILED",
                    registry_mode=registry_mode,
                )
    
            total_schemas = 0
            total_versions = 0
            context_stats = []
    
            # Import the function here to avoid circular imports
            from kafka_schema_registry_unified_mcp import get_schema_versions
    
            # Get statistics for each context
            for context in contexts:
                subjects = client.get_subjects(context)
                if isinstance(subjects, dict) and "error" in subjects:
                    continue
    
                context_schemas = len(subjects)
                context_versions = 0
    
                # Count versions for each subject
                for subject in subjects:
                    versions = get_schema_versions(subject, context, registry)
                    if not isinstance(versions, dict):
                        context_versions += len(versions)
    
                total_schemas += context_schemas
                total_versions += context_versions
    
                if include_context_details:
                    context_stats.append(
                        {
                            "name": context,
                            "subject_count": context_schemas,
                            "schema_count": context_versions,
                        }
                    )
    
            # Get default context stats
            default_subjects = client.get_subjects()
            if not isinstance(default_subjects, dict):
                default_schemas = len(default_subjects)
                default_versions = 0
    
                for subject in default_subjects:
                    versions = get_schema_versions(subject, None, registry)
                    if not isinstance(versions, dict):
                        default_versions += len(versions)
    
                total_schemas += default_schemas
                total_versions += default_versions
    
                if include_context_details:
                    context_stats.append(
                        {
                            "name": "default",
                            "subject_count": default_schemas,
                            "schema_count": default_versions,
                        }
                    )
    
            # Get registry metadata
            metadata = client.get_server_metadata()
    
            result = {
                "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                "total_contexts": len(contexts),
                "total_subjects": total_schemas,
                "total_schemas": total_versions,
                "contexts": context_stats if include_context_details else None,
                "generated_at": datetime.now().isoformat(),
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
            # Add metadata information
            result.update(metadata)
    
            return result
        except Exception as e:
            return create_error_response(str(e), error_code="REGISTRY_STATISTICS_FAILED", registry_mode=registry_mode)
  • JSON Schema definition (REGISTRY_STATISTICS_SCHEMA) for validating the structured output of the get_registry_statistics tool, specifying properties like total_contexts, total_subjects, total_schemas, and optional per-context details.
    REGISTRY_STATISTICS_SCHEMA = {
        "type": "object",
        "properties": {
            "registry": {"type": "string", "description": "Registry name"},
            "total_contexts": {
                "type": "integer",
                "minimum": 0,
                "description": "Total number of contexts",
            },
            "total_subjects": {
                "type": "integer",
                "minimum": 0,
                "description": "Total number of subjects",
            },
            "total_schemas": {
                "type": "integer",
                "minimum": 0,
                "description": "Total number of schema versions",
            },
            "contexts": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "name": {"type": "string"},
                        "subject_count": {"type": "integer", "minimum": 0},
                        "schema_count": {"type": "integer", "minimum": 0},
                    },
                    "required": ["name", "subject_count", "schema_count"],
                },
                "description": "Per-context statistics",
            },
            "generated_at": {
                "type": "string",
                "format": "date-time",
                "description": "When statistics were generated",
            },
            **METADATA_FIELDS,
        },
        "required": ["total_contexts", "total_subjects", "total_schemas"],
        "additionalProperties": True,
    }
  • Mapping of tool name 'get_registry_statistics' to its output schema in the TOOL_OUTPUT_SCHEMAS registry.
    "get_registry_statistics": REGISTRY_STATISTICS_SCHEMA,
  • Async task queue variant of the tool for long-running operations, returns task_id immediately and uses background execution with progress updates.
    @structured_output("get_registry_statistics_task_queue", fallback_on_error=True)
    def get_registry_statistics_task_queue_tool(
        registry_manager,
        registry_mode: str,
        registry: Optional[str] = None,
        include_context_details: bool = True,
    ) -> Dict[str, Any]:
        """
        Task queue version of get_registry_statistics for better performance.
        Returns task_id immediately for async execution with progress tracking.
    
        Returns:
            Task information with task_id for monitoring progress with structured validation
        """
        try:
            # Create async task
            task = task_manager.create_task(
                TaskType.STATISTICS,
                metadata={
                    "operation": "get_registry_statistics",
                    "registry": registry,
                    "include_context_details": include_context_details,
                },
            )
    
            # Start async execution
            try:
                asyncio.create_task(
                    task_manager.execute_task(
                        task,
                        _get_registry_statistics_async,
                        registry_manager=registry_manager,
                        registry_mode=registry_mode,
                        registry=registry,
                        include_context_details=include_context_details,
                        task_id=task.id,
                    )
                )
            except RuntimeError:
                # No running event loop, use thread pool
                def run_task():
                    asyncio.run(
                        task_manager.execute_task(
                            task,
                            _get_registry_statistics_async,
                            registry_manager=registry_manager,
                            registry_mode=registry_mode,
                            registry=registry,
                            include_context_details=include_context_details,
                            task_id=task.id,
                        )
                    )
    
                thread = threading.Thread(target=run_task)
                thread.start()
    
            return {
                "message": "Registry statistics analysis started as async task",
                "task_id": task.id,
                "task": task.to_dict(),
                "operation_info": {
                    "operation": "get_registry_statistics",
                    "expected_duration": "long",
                    "async_pattern": "task_queue",
                    "guidance": "Long-running operation. Returns task_id immediately. Use get_task_status() to monitor progress.",
                    "registry_mode": registry_mode,
                },
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
        except Exception as e:
            return create_error_response(str(e), error_code="TASK_CREATION_FAILED", registry_mode=registry_mode)
  • Async helper function used by task queue version, implements parallel execution across contexts and subjects using ThreadPoolExecutor for improved performance on large registries.
    async def _get_registry_statistics_async(
        registry_manager,
        registry_mode: str,
        registry: Optional[str] = None,
        include_context_details: bool = True,
        task_id: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Async version of get_registry_statistics_tool with parallel execution.
        """
        try:
            if registry_mode == "single":
                client = get_default_client(registry_manager)
            else:
                client = registry_manager.get_registry(registry)
                if client is None:
                    return {"error": f"Registry '{registry}' not found"}
    
            # Update progress
            if task_id:
                task_manager.update_progress(task_id, 10.0)
    
            # Get all contexts
            contexts = client.get_contexts()
            if isinstance(contexts, dict) and "error" in contexts:
                return contexts
    
            if task_id:
                task_manager.update_progress(task_id, 20.0)
    
            total_schemas = 0
            total_versions = 0
            context_stats = []
    
            # Parallel execution for better performance
            with ThreadPoolExecutor(max_workers=8) as executor:
                # Submit all context analysis tasks
                future_to_context = {}
    
                # Add all contexts
                for context in contexts:
                    future = executor.submit(_analyze_context_parallel, client, context, registry)
                    future_to_context[future] = context
    
                # Add default context
                future = executor.submit(_analyze_context_parallel, client, None, registry)
                future_to_context[future] = "default"
    
                # Collect results
                completed = 0
                total_contexts = len(future_to_context)
    
                for future in as_completed(future_to_context):
                    context = future_to_context[future]
                    try:
                        context_result = future.result()
    
                        if not isinstance(context_result, dict) or "error" not in context_result:
                            total_schemas += context_result.get("schemas", 0)
                            total_versions += context_result.get("versions", 0)
    
                            if include_context_details:
                                context_stats.append(
                                    {
                                        "context": context,
                                        "schemas": context_result.get("schemas", 0),
                                        "versions": context_result.get("versions", 0),
                                    }
                                )
    
                        # Update progress
                        completed += 1
                        if task_id:
                            # Progress from 20% to 90% based on context completion
                            progress = 20.0 + (completed / total_contexts) * 70.0
                            task_manager.update_progress(task_id, progress)
    
                    except Exception as e:
                        if include_context_details:
                            context_stats.append({"context": context, "error": str(e)})
    
            if task_id:
                task_manager.update_progress(task_id, 95.0)
    
            # Get registry metadata
            metadata = client.get_server_metadata()
    
            result = {
                "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                "total_contexts": len(contexts),
                "total_schemas": total_schemas,
                "total_versions": total_versions,
                "average_versions_per_schema": round(total_versions / max(total_schemas, 1), 2),
                "contexts": context_stats if include_context_details else None,
                "counted_at": datetime.now().isoformat(),
            }
    
            # Add metadata information
            result.update(metadata)
    
            if task_id:
                task_manager.update_progress(task_id, 100.0)
    
            return result
    
        except Exception as e:
            return {"error": str(e)}
  • Operation metadata registration classifying get_registry_statistics as a long-running operation that uses task queue pattern for async handling.
        "get_registry_statistics": {
            "duration": OperationDuration.LONG,
            "pattern": AsyncPattern.TASK_QUEUE,
        },
    }
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden of behavioral disclosure. It states 'Get comprehensive statistics' but doesn't clarify what 'comprehensive' means, whether this is a read-only operation, if it requires specific permissions, or what the output format might be. For a tool with zero annotation coverage, this leaves significant behavioral gaps.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence with no wasted words. It's appropriately front-loaded with the core action and resource, making it easy to parse quickly despite its brevity.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity of statistics gathering, zero annotation coverage, 0% schema description coverage, and no output schema, the description is inadequate. It doesn't explain what statistics are returned, how they're formatted, or any behavioral constraints, leaving the agent with insufficient information to use the tool effectively.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The input schema has 2 parameters with 0% description coverage, and the tool description adds no information about what 'registry' or 'include_context_details' mean. Without any parameter semantics in the description, the agent must rely solely on schema property names, which is insufficient for understanding parameter purposes or usage.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose3/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description 'Get comprehensive statistics about a registry' clearly states the action (get) and resource (registry statistics), making the purpose understandable. However, it lacks specificity about what 'comprehensive statistics' entails and doesn't differentiate from sibling tools like 'get_registry_info' or 'compare_registries', leaving ambiguity about scope.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives. With sibling tools like 'get_registry_info', 'compare_registries', and 'list_registries', there's no indication of how this tool differs in context or when it's the appropriate choice, leaving the agent without usage direction.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server