Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

export_context

Export all subjects within a specified context, including metadata, configurations, and versions, for Kafka Schema Registry management and analysis.

Instructions

Export all subjects within a context.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contextYes
include_configNo
include_metadataNo
include_versionsNoall
registryNo

Implementation Reference

  • Primary MCP tool handler implementing export_context with progress reporting, registry mode handling (single/multi), error handling, resource linking, and delegation to core common_export_context function.
    @structured_output("export_context", fallback_on_error=True)
    async def export_context_tool(
        context: str,
        registry_manager,
        registry_mode: str,
        registry: Optional[str] = None,
        include_metadata: bool = True,
        include_config: bool = True,
        include_versions: str = "all",
        mcp_context: Optional["Context"] = None,
    ) -> Dict[str, Any]:
        """
        Export all subjects within a context.
    
        Args:
            context: The context name
            registry: Optional registry name (ignored in single-registry mode)
            include_metadata: Include export metadata
            include_config: Include configuration data
            include_versions: Which versions to include (all, latest)
            mcp_context: MCP Context for progress reporting
    
        Returns:
            Dictionary containing context export data with structured validation and resource links
        """
        try:
            # Initial setup (0-10%)
            if mcp_context:
                await mcp_context.info(f"Starting context export: {context}")
                await mcp_context.report_progress(0.0, 100.0, "Initializing context export")
    
            if registry_mode == "single":
                # Single-registry mode: use common function
                client = get_default_client(registry_manager)
                if client is None:
                    return create_error_response(
                        "No default registry configured",
                        error_code="REGISTRY_NOT_CONFIGURED",
                        registry_mode="single",
                    )
    
                if mcp_context:
                    await mcp_context.report_progress(5.0, 100.0, "Using default registry client")
    
                result = common_export_context(client, context, include_metadata, include_config, include_versions)
                result["registry_mode"] = "single"
                result["mcp_protocol_version"] = "2025-06-18"
    
                # Add resource links
                registry_name = _get_registry_name_for_linking(registry_mode, client, registry)
                result = add_links_to_response(result, "context", registry_name, context=context)
    
                if mcp_context:
                    await mcp_context.info("Context export completed successfully")
                    await mcp_context.report_progress(100.0, 100.0, "Context export completed")
    
                return result
            else:
                # Multi-registry mode: use client approach
                client = registry_manager.get_registry(registry)
                if client is None:
                    return create_error_response(
                        f"Registry '{registry}' not found",
                        error_code="REGISTRY_NOT_FOUND",
                        registry_mode="multi",
                    )
    
                if mcp_context:
                    await mcp_context.report_progress(10.0, 100.0, f"Registry client '{registry}' initialized")
    
                # Get all subjects in context (10-25%)
                if mcp_context:
                    await mcp_context.info(f"Fetching subjects from context: {context}")
                    await mcp_context.report_progress(15.0, 100.0, f"Fetching subjects from context '{context}'")
    
                subjects_list = client.get_subjects(context)
                if isinstance(subjects_list, dict) and "error" in subjects_list:
                    return create_error_response(
                        f"Failed to get subjects for context '{context}': {subjects_list.get('error')}",
                        error_code="CONTEXT_SUBJECTS_RETRIEVAL_FAILED",
                        registry_mode=registry_mode,
                    )
    
                if mcp_context:
                    await mcp_context.report_progress(25.0, 100.0, f"Found {len(subjects_list)} subjects in context")
    
                # Export each subject (25-70%)
                subjects_data = []
                if mcp_context:
                    await mcp_context.info(f"Exporting {len(subjects_list)} subjects")
                    await mcp_context.report_progress(30.0, 100.0, f"Starting export of {len(subjects_list)} subjects")
    
                for i, subject in enumerate(subjects_list):
                    # Report progress for subject export
                    if mcp_context and len(subjects_list) > 0:
                        progress = 30.0 + (i / len(subjects_list)) * 40.0  # 30% to 70%
                        await mcp_context.report_progress(
                            progress, 100.0, f"Exporting subject {i+1}/{len(subjects_list)}: {subject}"
                        )
    
                    subject_export = export_subject_tool(
                        subject,
                        registry_manager,
                        registry_mode,
                        context,
                        include_metadata,
                        include_config,
                        include_versions,
                        registry,
                    )
                    if "error" not in subject_export:
                        subjects_data.append(subject_export)
    
                if mcp_context:
                    await mcp_context.report_progress(70.0, 100.0, f"Exported {len(subjects_data)} subjects successfully")
    
                # Build result structure (70-80%)
                if mcp_context:
                    await mcp_context.report_progress(75.0, 100.0, "Building export result structure")
    
                result = {
                    "context": context,
                    "subjects": subjects_data,
                    "subject_count": len(subjects_data),
                    "registry": client.config.name,
                    "registry_mode": registry_mode,
                    "mcp_protocol_version": "2025-06-18",
                }
    
                # Add configuration data if requested (80-90%)
                if include_config:
                    if mcp_context:
                        await mcp_context.report_progress(80.0, 100.0, "Fetching context configuration")
    
                    global_config = client.get_global_config(context)
                    if "error" not in global_config:
                        result["global_config"] = global_config
    
                    global_mode = client.get_mode(context)
                    if "error" not in global_mode:
                        result["global_mode"] = global_mode
    
                    if mcp_context:
                        await mcp_context.report_progress(85.0, 100.0, "Context configuration added")
                else:
                    if mcp_context:
                        await mcp_context.report_progress(85.0, 100.0, "Skipping context configuration")
    
                # Add metadata if requested (90-95%)
                if include_metadata:
                    if mcp_context:
                        await mcp_context.report_progress(90.0, 100.0, "Adding export metadata")
    
                    from datetime import datetime
    
                    result["metadata"] = {
                        "exported_at": datetime.now().isoformat(),
                        "registry_url": client.config.url,
                        "registry_name": client.config.name,
                        "export_version": "2.0.0",
                        "registry_mode": "multi",
                    }
    
                # Add resource links (95-100%)
                if mcp_context:
                    await mcp_context.report_progress(95.0, 100.0, "Adding resource links")
    
                registry_name = _get_registry_name_for_linking(registry_mode, client, registry)
                result = add_links_to_response(result, "context", registry_name, context=context)
    
                if mcp_context:
                    await mcp_context.info("Context export completed successfully")
                    await mcp_context.report_progress(100.0, 100.0, "Context export completed")
    
                return result
        except Exception as e:
            if mcp_context:
                await mcp_context.error(f"Context export failed: {str(e)}")
            return create_error_response(str(e), error_code="CONTEXT_EXPORT_FAILED", registry_mode=registry_mode)
  • Core helper function containing the fundamental export logic: lists subjects in the given context, recursively exports each subject's versions/config, assembles context-level result with optional global config, mode, and metadata.
    def export_context(
        client: RegistryClient,
        context: str,
        include_metadata: bool = True,
        include_config: bool = True,
        include_versions: str = "all",
    ) -> Dict[str, Any]:
        """Export all subjects within a context."""
        try:
            # Get all subjects in context
            subjects_list = client.get_subjects(context)
    
            # Export each subject
            subjects_data = []
            for subject in subjects_list:
                subject_export = export_subject(
                    client,
                    subject,
                    context,
                    include_metadata,
                    include_config,
                    include_versions,
                )
                if "error" not in subject_export:
                    subjects_data.append(subject_export)
    
            result = {"context": context, "subjects": subjects_data}
    
            if include_config:
                global_config = client.get_global_config(context)
                if "error" not in global_config:
                    result["global_config"] = global_config
    
                global_mode = client.get_mode(context)
                if "error" not in global_mode:
                    result["global_mode"] = global_mode
    
            if include_metadata:
                result["metadata"] = {
                    "exported_at": datetime.now().isoformat(),
                    "registry_url": client.config.url,
                    "export_version": "1.7.0",
                }
    
            return result
        except Exception as e:
            return {"error": str(e)}
  • JSON Schema output validation definition for the export_context tool (noted as complex structure allowing additional properties).
    "export_context": {
        "type": "object",
        "additionalProperties": True,
    },  # Complex export structure
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server