get_registry_statistics
Retrieve detailed statistics for a registry, including context details, using the MCP Kafka Schema Reg server. Analyze registry data efficiently for informed decision-making.
Instructions
Get comprehensive statistics about a registry.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| include_context_details | No | ||
| registry | No |
Implementation Reference
- statistics_tools.py:214-329 (handler)Primary synchronous handler implementing the core logic for get_registry_statistics tool. Fetches contexts, iterates over subjects and versions to compute totals, includes optional detailed context stats, and adds registry metadata.@structured_output("get_registry_statistics", fallback_on_error=True) def get_registry_statistics_tool( registry_manager, registry_mode: str, registry: Optional[str] = None, include_context_details: bool = True, ) -> Dict[str, Any]: """ Get comprehensive statistics about a registry. Args: registry: Optional registry name (ignored in single-registry mode) include_context_details: Whether to include detailed context statistics Returns: Dictionary containing registry statistics with metadata and structured validation """ try: if registry_mode == "single": client = get_default_client(registry_manager) else: client = registry_manager.get_registry(registry) if client is None: return create_error_response( f"Registry '{registry}' not found", error_code="REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) # Get all contexts contexts = client.get_contexts() if isinstance(contexts, dict) and "error" in contexts: return create_error_response( f"Failed to get contexts: {contexts.get('error')}", error_code="CONTEXTS_RETRIEVAL_FAILED", registry_mode=registry_mode, ) total_schemas = 0 total_versions = 0 context_stats = [] # Import the function here to avoid circular imports from kafka_schema_registry_unified_mcp import get_schema_versions # Get statistics for each context for context in contexts: subjects = client.get_subjects(context) if isinstance(subjects, dict) and "error" in subjects: continue context_schemas = len(subjects) context_versions = 0 # Count versions for each subject for subject in subjects: versions = get_schema_versions(subject, context, registry) if not isinstance(versions, dict): context_versions += len(versions) total_schemas += context_schemas total_versions += context_versions if include_context_details: context_stats.append( { "name": context, "subject_count": context_schemas, "schema_count": context_versions, } ) # Get default context stats default_subjects = client.get_subjects() if not isinstance(default_subjects, dict): default_schemas = len(default_subjects) default_versions = 0 for subject in default_subjects: versions = get_schema_versions(subject, None, registry) if not isinstance(versions, dict): default_versions += len(versions) total_schemas += default_schemas total_versions += default_versions if include_context_details: context_stats.append( { "name": "default", "subject_count": default_schemas, "schema_count": default_versions, } ) # Get registry metadata metadata = client.get_server_metadata() result = { "registry": (client.config.name if hasattr(client.config, "name") else "default"), "total_contexts": len(contexts), "total_subjects": total_schemas, "total_schemas": total_versions, "contexts": context_stats if include_context_details else None, "generated_at": datetime.now().isoformat(), "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Add metadata information result.update(metadata) return result except Exception as e: return create_error_response(str(e), error_code="REGISTRY_STATISTICS_FAILED", registry_mode=registry_mode)
- schema_definitions.py:611-652 (schema)JSON Schema definition (REGISTRY_STATISTICS_SCHEMA) for validating the structured output of the get_registry_statistics tool, specifying properties like total_contexts, total_subjects, total_schemas, and optional per-context details.REGISTRY_STATISTICS_SCHEMA = { "type": "object", "properties": { "registry": {"type": "string", "description": "Registry name"}, "total_contexts": { "type": "integer", "minimum": 0, "description": "Total number of contexts", }, "total_subjects": { "type": "integer", "minimum": 0, "description": "Total number of subjects", }, "total_schemas": { "type": "integer", "minimum": 0, "description": "Total number of schema versions", }, "contexts": { "type": "array", "items": { "type": "object", "properties": { "name": {"type": "string"}, "subject_count": {"type": "integer", "minimum": 0}, "schema_count": {"type": "integer", "minimum": 0}, }, "required": ["name", "subject_count", "schema_count"], }, "description": "Per-context statistics", }, "generated_at": { "type": "string", "format": "date-time", "description": "When statistics were generated", }, **METADATA_FIELDS, }, "required": ["total_contexts", "total_subjects", "total_schemas"], "additionalProperties": True, }
- schema_definitions.py:1017-1017 (schema)Mapping of tool name 'get_registry_statistics' to its output schema in the TOOL_OUTPUT_SCHEMAS registry."get_registry_statistics": REGISTRY_STATISTICS_SCHEMA,
- statistics_tools.py:643-716 (helper)Async task queue variant of the tool for long-running operations, returns task_id immediately and uses background execution with progress updates.@structured_output("get_registry_statistics_task_queue", fallback_on_error=True) def get_registry_statistics_task_queue_tool( registry_manager, registry_mode: str, registry: Optional[str] = None, include_context_details: bool = True, ) -> Dict[str, Any]: """ Task queue version of get_registry_statistics for better performance. Returns task_id immediately for async execution with progress tracking. Returns: Task information with task_id for monitoring progress with structured validation """ try: # Create async task task = task_manager.create_task( TaskType.STATISTICS, metadata={ "operation": "get_registry_statistics", "registry": registry, "include_context_details": include_context_details, }, ) # Start async execution try: asyncio.create_task( task_manager.execute_task( task, _get_registry_statistics_async, registry_manager=registry_manager, registry_mode=registry_mode, registry=registry, include_context_details=include_context_details, task_id=task.id, ) ) except RuntimeError: # No running event loop, use thread pool def run_task(): asyncio.run( task_manager.execute_task( task, _get_registry_statistics_async, registry_manager=registry_manager, registry_mode=registry_mode, registry=registry, include_context_details=include_context_details, task_id=task.id, ) ) thread = threading.Thread(target=run_task) thread.start() return { "message": "Registry statistics analysis started as async task", "task_id": task.id, "task": task.to_dict(), "operation_info": { "operation": "get_registry_statistics", "expected_duration": "long", "async_pattern": "task_queue", "guidance": "Long-running operation. Returns task_id immediately. Use get_task_status() to monitor progress.", "registry_mode": registry_mode, }, "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } except Exception as e: return create_error_response(str(e), error_code="TASK_CREATION_FAILED", registry_mode=registry_mode)
- statistics_tools.py:499-606 (helper)Async helper function used by task queue version, implements parallel execution across contexts and subjects using ThreadPoolExecutor for improved performance on large registries.async def _get_registry_statistics_async( registry_manager, registry_mode: str, registry: Optional[str] = None, include_context_details: bool = True, task_id: Optional[str] = None, ) -> Dict[str, Any]: """ Async version of get_registry_statistics_tool with parallel execution. """ try: if registry_mode == "single": client = get_default_client(registry_manager) else: client = registry_manager.get_registry(registry) if client is None: return {"error": f"Registry '{registry}' not found"} # Update progress if task_id: task_manager.update_progress(task_id, 10.0) # Get all contexts contexts = client.get_contexts() if isinstance(contexts, dict) and "error" in contexts: return contexts if task_id: task_manager.update_progress(task_id, 20.0) total_schemas = 0 total_versions = 0 context_stats = [] # Parallel execution for better performance with ThreadPoolExecutor(max_workers=8) as executor: # Submit all context analysis tasks future_to_context = {} # Add all contexts for context in contexts: future = executor.submit(_analyze_context_parallel, client, context, registry) future_to_context[future] = context # Add default context future = executor.submit(_analyze_context_parallel, client, None, registry) future_to_context[future] = "default" # Collect results completed = 0 total_contexts = len(future_to_context) for future in as_completed(future_to_context): context = future_to_context[future] try: context_result = future.result() if not isinstance(context_result, dict) or "error" not in context_result: total_schemas += context_result.get("schemas", 0) total_versions += context_result.get("versions", 0) if include_context_details: context_stats.append( { "context": context, "schemas": context_result.get("schemas", 0), "versions": context_result.get("versions", 0), } ) # Update progress completed += 1 if task_id: # Progress from 20% to 90% based on context completion progress = 20.0 + (completed / total_contexts) * 70.0 task_manager.update_progress(task_id, progress) except Exception as e: if include_context_details: context_stats.append({"context": context, "error": str(e)}) if task_id: task_manager.update_progress(task_id, 95.0) # Get registry metadata metadata = client.get_server_metadata() result = { "registry": (client.config.name if hasattr(client.config, "name") else "default"), "total_contexts": len(contexts), "total_schemas": total_schemas, "total_versions": total_versions, "average_versions_per_schema": round(total_versions / max(total_schemas, 1), 2), "contexts": context_stats if include_context_details else None, "counted_at": datetime.now().isoformat(), } # Add metadata information result.update(metadata) if task_id: task_manager.update_progress(task_id, 100.0) return result except Exception as e: return {"error": str(e)}
- task_management.py:200-204 (registration)Operation metadata registration classifying get_registry_statistics as a long-running operation that uses task queue pattern for async handling."get_registry_statistics": { "duration": OperationDuration.LONG, "pattern": AsyncPattern.TASK_QUEUE, }, }