Skip to main content
Glama

export_subject

Export all versions of a subject with metadata and configurations from Kafka Schema Registry for backup or migration purposes using MCP server.

Instructions

Export all versions of a subject.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contextNo
include_configNo
include_metadataNo
include_versionsNoall
registryNo
subjectYes

Implementation Reference

  • MCP tool handler function for 'export_subject'. Decorated with @structured_output, handles registry selection, calls core export function, adds MCP metadata and resource links.
    @structured_output("export_subject", fallback_on_error=True) def export_subject_tool( subject: str, registry_manager, registry_mode: str, context: Optional[str] = None, include_metadata: bool = True, include_config: bool = True, include_versions: str = "all", registry: Optional[str] = None, ) -> Dict[str, Any]: """ Export all versions of a subject. Args: subject: The subject name context: Optional schema context include_metadata: Include export metadata include_config: Include subject configuration include_versions: Which versions to include (all, latest) registry: Optional registry name (ignored in single-registry mode) Returns: Dictionary containing subject export data with structured validation and resource links """ try: if registry_mode == "single": client = get_default_client(registry_manager) else: client = registry_manager.get_registry(registry) if client is None: return create_error_response( "No registry configured or registry not found", error_code="REGISTRY_NOT_CONFIGURED", registry_mode=registry_mode, ) result = common_export_subject(client, subject, context, include_metadata, include_config, include_versions) # Add structured output metadata result["registry_mode"] = registry_mode result["mcp_protocol_version"] = "2025-06-18" # Ensure required fields for export subject if "subject" not in result: result["subject"] = subject if "versions" not in result: result["versions"] = [] # Add resource links registry_name = _get_registry_name_for_linking(registry_mode, client, registry) result = add_links_to_response(result, "subject", registry_name, subject=subject, context=context) return result except Exception as e: return create_error_response(str(e), error_code="SUBJECT_EXPORT_FAILED", registry_mode=registry_mode)
  • JSON Schema definition for the output of export_subject tool, defining structure for subject versions, config, and metadata.
    EXPORT_SUBJECT_SCHEMA = { "type": "object", "properties": { "subject": {"type": "string", "description": "Subject name"}, "versions": { "type": "array", "items": { "type": "object", "properties": { "version": {"type": "integer", "minimum": 1}, "id": {"type": "integer", "minimum": 0}, "schema": { "oneOf": [ {"type": "string", "description": "The schema definition as JSON string"}, {"type": "object", "description": "The schema definition as JSON object"}, ] }, "schemaType": {"type": "string"}, }, "required": ["version", "id", "schema"], }, "description": "All versions of the subject", }, "config": {"type": "object", "description": "Subject configuration"}, "export_metadata": { "type": "object", "properties": { "exported_at": {"type": "string", "format": "date-time"}, "total_versions": {"type": "integer", "minimum": 0}, "include_config": {"type": "boolean"}, }, }, **METADATA_FIELDS, }, "required": ["subject", "versions"], "additionalProperties": True, }
  • Registration of export_subject output schema in the central TOOL_OUTPUT_SCHEMAS dictionary used for structured output validation.
    "export_schema": EXPORT_SCHEMA_SCHEMA, "export_subject": EXPORT_SUBJECT_SCHEMA,
  • Core helper function implementing the subject export logic: fetches versions, retrieves schemas, includes config and metadata.
    client: RegistryClient, subject: str, context: Optional[str] = None, include_metadata: bool = True, include_config: bool = True, include_versions: str = "all", ) -> Dict[str, Any]: """Export all versions of a subject.""" try: # Get versions if include_versions == "latest": versions = ["latest"] else: versions_list = client.get_schema_versions(subject, context) if isinstance(versions_list, dict) and "error" in versions_list: return versions_list versions = [str(v) for v in versions_list] # Get schemas for each version schemas = [] for version in versions: schema_data = get_schema_with_metadata(client, subject, version, context) if "error" not in schema_data: schemas.append(schema_data) result = {"subject": subject, "versions": schemas} if include_config: config = client.get_subject_config(subject, context) if "error" not in config: result["config"] = config if include_metadata: result["metadata"] = { "exported_at": datetime.now().isoformat(), "registry_url": client.config.url, "context": context, "export_version": "1.7.0", } return result except Exception as e: return {"error": str(e)}

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server