Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

get_registry_statistics

Retrieve detailed statistics for a registry, including context details, using the MCP Kafka Schema Reg server. Analyze registry data efficiently for informed decision-making.

Instructions

Get comprehensive statistics about a registry.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
include_context_detailsNo
registryNo

Implementation Reference

  • Primary synchronous handler implementing the core logic for get_registry_statistics tool. Fetches contexts, iterates over subjects and versions to compute totals, includes optional detailed context stats, and adds registry metadata.
    @structured_output("get_registry_statistics", fallback_on_error=True)
    def get_registry_statistics_tool(
        registry_manager,
        registry_mode: str,
        registry: Optional[str] = None,
        include_context_details: bool = True,
    ) -> Dict[str, Any]:
        """
        Get comprehensive statistics about a registry.
    
        Args:
            registry: Optional registry name (ignored in single-registry mode)
            include_context_details: Whether to include detailed context statistics
    
        Returns:
            Dictionary containing registry statistics with metadata and structured validation
        """
        try:
            if registry_mode == "single":
                client = get_default_client(registry_manager)
            else:
                client = registry_manager.get_registry(registry)
                if client is None:
                    return create_error_response(
                        f"Registry '{registry}' not found",
                        error_code="REGISTRY_NOT_FOUND",
                        registry_mode=registry_mode,
                    )
    
            # Get all contexts
            contexts = client.get_contexts()
            if isinstance(contexts, dict) and "error" in contexts:
                return create_error_response(
                    f"Failed to get contexts: {contexts.get('error')}",
                    error_code="CONTEXTS_RETRIEVAL_FAILED",
                    registry_mode=registry_mode,
                )
    
            total_schemas = 0
            total_versions = 0
            context_stats = []
    
            # Import the function here to avoid circular imports
            from kafka_schema_registry_unified_mcp import get_schema_versions
    
            # Get statistics for each context
            for context in contexts:
                subjects = client.get_subjects(context)
                if isinstance(subjects, dict) and "error" in subjects:
                    continue
    
                context_schemas = len(subjects)
                context_versions = 0
    
                # Count versions for each subject
                for subject in subjects:
                    versions = get_schema_versions(subject, context, registry)
                    if not isinstance(versions, dict):
                        context_versions += len(versions)
    
                total_schemas += context_schemas
                total_versions += context_versions
    
                if include_context_details:
                    context_stats.append(
                        {
                            "name": context,
                            "subject_count": context_schemas,
                            "schema_count": context_versions,
                        }
                    )
    
            # Get default context stats
            default_subjects = client.get_subjects()
            if not isinstance(default_subjects, dict):
                default_schemas = len(default_subjects)
                default_versions = 0
    
                for subject in default_subjects:
                    versions = get_schema_versions(subject, None, registry)
                    if not isinstance(versions, dict):
                        default_versions += len(versions)
    
                total_schemas += default_schemas
                total_versions += default_versions
    
                if include_context_details:
                    context_stats.append(
                        {
                            "name": "default",
                            "subject_count": default_schemas,
                            "schema_count": default_versions,
                        }
                    )
    
            # Get registry metadata
            metadata = client.get_server_metadata()
    
            result = {
                "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                "total_contexts": len(contexts),
                "total_subjects": total_schemas,
                "total_schemas": total_versions,
                "contexts": context_stats if include_context_details else None,
                "generated_at": datetime.now().isoformat(),
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
            # Add metadata information
            result.update(metadata)
    
            return result
        except Exception as e:
            return create_error_response(str(e), error_code="REGISTRY_STATISTICS_FAILED", registry_mode=registry_mode)
  • JSON Schema definition (REGISTRY_STATISTICS_SCHEMA) for validating the structured output of the get_registry_statistics tool, specifying properties like total_contexts, total_subjects, total_schemas, and optional per-context details.
    REGISTRY_STATISTICS_SCHEMA = {
        "type": "object",
        "properties": {
            "registry": {"type": "string", "description": "Registry name"},
            "total_contexts": {
                "type": "integer",
                "minimum": 0,
                "description": "Total number of contexts",
            },
            "total_subjects": {
                "type": "integer",
                "minimum": 0,
                "description": "Total number of subjects",
            },
            "total_schemas": {
                "type": "integer",
                "minimum": 0,
                "description": "Total number of schema versions",
            },
            "contexts": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "name": {"type": "string"},
                        "subject_count": {"type": "integer", "minimum": 0},
                        "schema_count": {"type": "integer", "minimum": 0},
                    },
                    "required": ["name", "subject_count", "schema_count"],
                },
                "description": "Per-context statistics",
            },
            "generated_at": {
                "type": "string",
                "format": "date-time",
                "description": "When statistics were generated",
            },
            **METADATA_FIELDS,
        },
        "required": ["total_contexts", "total_subjects", "total_schemas"],
        "additionalProperties": True,
    }
  • Mapping of tool name 'get_registry_statistics' to its output schema in the TOOL_OUTPUT_SCHEMAS registry.
    "get_registry_statistics": REGISTRY_STATISTICS_SCHEMA,
  • Async task queue variant of the tool for long-running operations, returns task_id immediately and uses background execution with progress updates.
    @structured_output("get_registry_statistics_task_queue", fallback_on_error=True)
    def get_registry_statistics_task_queue_tool(
        registry_manager,
        registry_mode: str,
        registry: Optional[str] = None,
        include_context_details: bool = True,
    ) -> Dict[str, Any]:
        """
        Task queue version of get_registry_statistics for better performance.
        Returns task_id immediately for async execution with progress tracking.
    
        Returns:
            Task information with task_id for monitoring progress with structured validation
        """
        try:
            # Create async task
            task = task_manager.create_task(
                TaskType.STATISTICS,
                metadata={
                    "operation": "get_registry_statistics",
                    "registry": registry,
                    "include_context_details": include_context_details,
                },
            )
    
            # Start async execution
            try:
                asyncio.create_task(
                    task_manager.execute_task(
                        task,
                        _get_registry_statistics_async,
                        registry_manager=registry_manager,
                        registry_mode=registry_mode,
                        registry=registry,
                        include_context_details=include_context_details,
                        task_id=task.id,
                    )
                )
            except RuntimeError:
                # No running event loop, use thread pool
                def run_task():
                    asyncio.run(
                        task_manager.execute_task(
                            task,
                            _get_registry_statistics_async,
                            registry_manager=registry_manager,
                            registry_mode=registry_mode,
                            registry=registry,
                            include_context_details=include_context_details,
                            task_id=task.id,
                        )
                    )
    
                thread = threading.Thread(target=run_task)
                thread.start()
    
            return {
                "message": "Registry statistics analysis started as async task",
                "task_id": task.id,
                "task": task.to_dict(),
                "operation_info": {
                    "operation": "get_registry_statistics",
                    "expected_duration": "long",
                    "async_pattern": "task_queue",
                    "guidance": "Long-running operation. Returns task_id immediately. Use get_task_status() to monitor progress.",
                    "registry_mode": registry_mode,
                },
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
        except Exception as e:
            return create_error_response(str(e), error_code="TASK_CREATION_FAILED", registry_mode=registry_mode)
  • Async helper function used by task queue version, implements parallel execution across contexts and subjects using ThreadPoolExecutor for improved performance on large registries.
    async def _get_registry_statistics_async(
        registry_manager,
        registry_mode: str,
        registry: Optional[str] = None,
        include_context_details: bool = True,
        task_id: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Async version of get_registry_statistics_tool with parallel execution.
        """
        try:
            if registry_mode == "single":
                client = get_default_client(registry_manager)
            else:
                client = registry_manager.get_registry(registry)
                if client is None:
                    return {"error": f"Registry '{registry}' not found"}
    
            # Update progress
            if task_id:
                task_manager.update_progress(task_id, 10.0)
    
            # Get all contexts
            contexts = client.get_contexts()
            if isinstance(contexts, dict) and "error" in contexts:
                return contexts
    
            if task_id:
                task_manager.update_progress(task_id, 20.0)
    
            total_schemas = 0
            total_versions = 0
            context_stats = []
    
            # Parallel execution for better performance
            with ThreadPoolExecutor(max_workers=8) as executor:
                # Submit all context analysis tasks
                future_to_context = {}
    
                # Add all contexts
                for context in contexts:
                    future = executor.submit(_analyze_context_parallel, client, context, registry)
                    future_to_context[future] = context
    
                # Add default context
                future = executor.submit(_analyze_context_parallel, client, None, registry)
                future_to_context[future] = "default"
    
                # Collect results
                completed = 0
                total_contexts = len(future_to_context)
    
                for future in as_completed(future_to_context):
                    context = future_to_context[future]
                    try:
                        context_result = future.result()
    
                        if not isinstance(context_result, dict) or "error" not in context_result:
                            total_schemas += context_result.get("schemas", 0)
                            total_versions += context_result.get("versions", 0)
    
                            if include_context_details:
                                context_stats.append(
                                    {
                                        "context": context,
                                        "schemas": context_result.get("schemas", 0),
                                        "versions": context_result.get("versions", 0),
                                    }
                                )
    
                        # Update progress
                        completed += 1
                        if task_id:
                            # Progress from 20% to 90% based on context completion
                            progress = 20.0 + (completed / total_contexts) * 70.0
                            task_manager.update_progress(task_id, progress)
    
                    except Exception as e:
                        if include_context_details:
                            context_stats.append({"context": context, "error": str(e)})
    
            if task_id:
                task_manager.update_progress(task_id, 95.0)
    
            # Get registry metadata
            metadata = client.get_server_metadata()
    
            result = {
                "registry": (client.config.name if hasattr(client.config, "name") else "default"),
                "total_contexts": len(contexts),
                "total_schemas": total_schemas,
                "total_versions": total_versions,
                "average_versions_per_schema": round(total_versions / max(total_schemas, 1), 2),
                "contexts": context_stats if include_context_details else None,
                "counted_at": datetime.now().isoformat(),
            }
    
            # Add metadata information
            result.update(metadata)
    
            if task_id:
                task_manager.update_progress(task_id, 100.0)
    
            return result
    
        except Exception as e:
            return {"error": str(e)}
  • Operation metadata registration classifying get_registry_statistics as a long-running operation that uses task queue pattern for async handling.
        "get_registry_statistics": {
            "duration": OperationDuration.LONG,
            "pattern": AsyncPattern.TASK_QUEUE,
        },
    }
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server