Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

export_subject

Export all versions of a subject with metadata and configurations from Kafka Schema Registry for backup or migration purposes using MCP server.

Instructions

Export all versions of a subject.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contextNo
include_configNo
include_metadataNo
include_versionsNoall
registryNo
subjectYes

Implementation Reference

  • MCP tool handler function for 'export_subject'. Decorated with @structured_output, handles registry selection, calls core export function, adds MCP metadata and resource links.
    @structured_output("export_subject", fallback_on_error=True)
    def export_subject_tool(
        subject: str,
        registry_manager,
        registry_mode: str,
        context: Optional[str] = None,
        include_metadata: bool = True,
        include_config: bool = True,
        include_versions: str = "all",
        registry: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Export all versions of a subject.
    
        Args:
            subject: The subject name
            context: Optional schema context
            include_metadata: Include export metadata
            include_config: Include subject configuration
            include_versions: Which versions to include (all, latest)
            registry: Optional registry name (ignored in single-registry mode)
    
        Returns:
            Dictionary containing subject export data with structured validation and resource links
        """
        try:
            if registry_mode == "single":
                client = get_default_client(registry_manager)
            else:
                client = registry_manager.get_registry(registry)
    
            if client is None:
                return create_error_response(
                    "No registry configured or registry not found",
                    error_code="REGISTRY_NOT_CONFIGURED",
                    registry_mode=registry_mode,
                )
    
            result = common_export_subject(client, subject, context, include_metadata, include_config, include_versions)
    
            # Add structured output metadata
            result["registry_mode"] = registry_mode
            result["mcp_protocol_version"] = "2025-06-18"
    
            # Ensure required fields for export subject
            if "subject" not in result:
                result["subject"] = subject
            if "versions" not in result:
                result["versions"] = []
    
            # Add resource links
            registry_name = _get_registry_name_for_linking(registry_mode, client, registry)
            result = add_links_to_response(result, "subject", registry_name, subject=subject, context=context)
    
            return result
        except Exception as e:
            return create_error_response(str(e), error_code="SUBJECT_EXPORT_FAILED", registry_mode=registry_mode)
  • JSON Schema definition for the output of export_subject tool, defining structure for subject versions, config, and metadata.
    EXPORT_SUBJECT_SCHEMA = {
        "type": "object",
        "properties": {
            "subject": {"type": "string", "description": "Subject name"},
            "versions": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "version": {"type": "integer", "minimum": 1},
                        "id": {"type": "integer", "minimum": 0},
                        "schema": {
                            "oneOf": [
                                {"type": "string", "description": "The schema definition as JSON string"},
                                {"type": "object", "description": "The schema definition as JSON object"},
                            ]
                        },
                        "schemaType": {"type": "string"},
                    },
                    "required": ["version", "id", "schema"],
                },
                "description": "All versions of the subject",
            },
            "config": {"type": "object", "description": "Subject configuration"},
            "export_metadata": {
                "type": "object",
                "properties": {
                    "exported_at": {"type": "string", "format": "date-time"},
                    "total_versions": {"type": "integer", "minimum": 0},
                    "include_config": {"type": "boolean"},
                },
            },
            **METADATA_FIELDS,
        },
        "required": ["subject", "versions"],
        "additionalProperties": True,
    }
  • Registration of export_subject output schema in the central TOOL_OUTPUT_SCHEMAS dictionary used for structured output validation.
    "export_schema": EXPORT_SCHEMA_SCHEMA,
    "export_subject": EXPORT_SUBJECT_SCHEMA,
  • Core helper function implementing the subject export logic: fetches versions, retrieves schemas, includes config and metadata.
        client: RegistryClient,
        subject: str,
        context: Optional[str] = None,
        include_metadata: bool = True,
        include_config: bool = True,
        include_versions: str = "all",
    ) -> Dict[str, Any]:
        """Export all versions of a subject."""
        try:
            # Get versions
            if include_versions == "latest":
                versions = ["latest"]
            else:
                versions_list = client.get_schema_versions(subject, context)
                if isinstance(versions_list, dict) and "error" in versions_list:
                    return versions_list
                versions = [str(v) for v in versions_list]
    
            # Get schemas for each version
            schemas = []
            for version in versions:
                schema_data = get_schema_with_metadata(client, subject, version, context)
                if "error" not in schema_data:
                    schemas.append(schema_data)
    
            result = {"subject": subject, "versions": schemas}
    
            if include_config:
                config = client.get_subject_config(subject, context)
                if "error" not in config:
                    result["config"] = config
    
            if include_metadata:
                result["metadata"] = {
                    "exported_at": datetime.now().isoformat(),
                    "registry_url": client.config.url,
                    "context": context,
                    "export_version": "1.7.0",
                }
    
            return result
        except Exception as e:
            return {"error": str(e)}
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server