export_subject
Export all versions of a subject with metadata and configurations from Kafka Schema Registry for backup or migration purposes using MCP server.
Instructions
Export all versions of a subject.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| context | No | ||
| include_config | No | ||
| include_metadata | No | ||
| include_versions | No | all | |
| registry | No | ||
| subject | Yes |
Implementation Reference
- export_tools.py:120-177 (handler)MCP tool handler function for 'export_subject'. Decorated with @structured_output, handles registry selection, calls core export function, adds MCP metadata and resource links.@structured_output("export_subject", fallback_on_error=True) def export_subject_tool( subject: str, registry_manager, registry_mode: str, context: Optional[str] = None, include_metadata: bool = True, include_config: bool = True, include_versions: str = "all", registry: Optional[str] = None, ) -> Dict[str, Any]: """ Export all versions of a subject. Args: subject: The subject name context: Optional schema context include_metadata: Include export metadata include_config: Include subject configuration include_versions: Which versions to include (all, latest) registry: Optional registry name (ignored in single-registry mode) Returns: Dictionary containing subject export data with structured validation and resource links """ try: if registry_mode == "single": client = get_default_client(registry_manager) else: client = registry_manager.get_registry(registry) if client is None: return create_error_response( "No registry configured or registry not found", error_code="REGISTRY_NOT_CONFIGURED", registry_mode=registry_mode, ) result = common_export_subject(client, subject, context, include_metadata, include_config, include_versions) # Add structured output metadata result["registry_mode"] = registry_mode result["mcp_protocol_version"] = "2025-06-18" # Ensure required fields for export subject if "subject" not in result: result["subject"] = subject if "versions" not in result: result["versions"] = [] # Add resource links registry_name = _get_registry_name_for_linking(registry_mode, client, registry) result = add_links_to_response(result, "subject", registry_name, subject=subject, context=context) return result except Exception as e: return create_error_response(str(e), error_code="SUBJECT_EXPORT_FAILED", registry_mode=registry_mode)
- schema_definitions.py:517-553 (schema)JSON Schema definition for the output of export_subject tool, defining structure for subject versions, config, and metadata.EXPORT_SUBJECT_SCHEMA = { "type": "object", "properties": { "subject": {"type": "string", "description": "Subject name"}, "versions": { "type": "array", "items": { "type": "object", "properties": { "version": {"type": "integer", "minimum": 1}, "id": {"type": "integer", "minimum": 0}, "schema": { "oneOf": [ {"type": "string", "description": "The schema definition as JSON string"}, {"type": "object", "description": "The schema definition as JSON object"}, ] }, "schemaType": {"type": "string"}, }, "required": ["version", "id", "schema"], }, "description": "All versions of the subject", }, "config": {"type": "object", "description": "Subject configuration"}, "export_metadata": { "type": "object", "properties": { "exported_at": {"type": "string", "format": "date-time"}, "total_versions": {"type": "integer", "minimum": 0}, "include_config": {"type": "boolean"}, }, }, **METADATA_FIELDS, }, "required": ["subject", "versions"], "additionalProperties": True, }
- schema_definitions.py:993-994 (registration)Registration of export_subject output schema in the central TOOL_OUTPUT_SCHEMAS dictionary used for structured output validation."export_schema": EXPORT_SCHEMA_SCHEMA, "export_subject": EXPORT_SUBJECT_SCHEMA,
- schema_registry_common.py:1215-1258 (helper)Core helper function implementing the subject export logic: fetches versions, retrieves schemas, includes config and metadata.client: RegistryClient, subject: str, context: Optional[str] = None, include_metadata: bool = True, include_config: bool = True, include_versions: str = "all", ) -> Dict[str, Any]: """Export all versions of a subject.""" try: # Get versions if include_versions == "latest": versions = ["latest"] else: versions_list = client.get_schema_versions(subject, context) if isinstance(versions_list, dict) and "error" in versions_list: return versions_list versions = [str(v) for v in versions_list] # Get schemas for each version schemas = [] for version in versions: schema_data = get_schema_with_metadata(client, subject, version, context) if "error" not in schema_data: schemas.append(schema_data) result = {"subject": subject, "versions": schemas} if include_config: config = client.get_subject_config(subject, context) if "error" not in config: result["config"] = config if include_metadata: result["metadata"] = { "exported_at": datetime.now().isoformat(), "registry_url": client.config.url, "context": context, "export_version": "1.7.0", } return result except Exception as e: return {"error": str(e)}