Skip to main content
Glama

MCP Kafka Schema Reg

MIT License
23
  • Apple
  • Linux
export_tools.py22.5 kB
#!/usr/bin/env python3 """ Export Tools Module - Updated with Resource Linking Handles schema export operations in various formats with structured tool output support per MCP 2025-06-18 specification including resource linking. Provides schema, subject, context, and global export functionality with JSON Schema validation, type-safe responses, and HATEOAS navigation links. """ import json from typing import TYPE_CHECKING, Any, Dict, Optional, Union from resource_linking import add_links_to_response from schema_registry_common import export_context as common_export_context from schema_registry_common import export_global as common_export_global from schema_registry_common import export_schema as common_export_schema from schema_registry_common import export_subject as common_export_subject from schema_registry_common import get_default_client from schema_validation import ( create_error_response, structured_output, ) if TYPE_CHECKING: from fastmcp.server.context import Context def _get_registry_name_for_linking(registry_mode: str, client=None, registry: Optional[str] = None) -> str: """Helper function to get registry name for linking.""" if registry_mode == "single": return "default" elif client and hasattr(client, "config"): return client.config.name elif registry: return registry else: return "unknown" @structured_output("export_schema", fallback_on_error=True) def export_schema_tool( subject: str, registry_manager, registry_mode: str, version: str = "latest", context: Optional[str] = None, format: str = "json", registry: Optional[str] = None, ) -> Union[Dict[str, Any], str]: """ Export a single schema in the specified format. Args: subject: The subject name version: The schema version (default: latest) context: Optional schema context format: Export format (json, avro_idl) registry: Optional registry name (ignored in single-registry mode) Returns: Exported schema data with structured validation and resource links """ try: if registry_mode == "single": client = get_default_client(registry_manager) else: client = registry_manager.get_registry(registry) if client is None: return create_error_response( "No registry configured or registry not found", error_code="REGISTRY_NOT_CONFIGURED", registry_mode=registry_mode, ) result = common_export_schema(client, subject, version, context, format) if isinstance(result, dict): # Add structured output metadata result["registry_mode"] = registry_mode result["mcp_protocol_version"] = "2025-06-18" # Ensure required fields for export schema if "subject" not in result: result["subject"] = subject if "version" not in result: result["version"] = version if version != "latest" else 1 if "format" not in result: result["format"] = format # Ensure content field is present - this is required by the schema if "content" not in result: # Generate content from schema field if "schema" in result: if format == "json": result["content"] = json.dumps(result["schema"], indent=2) else: result["content"] = str(result["schema"]) else: result["content"] = "" # Add resource links for dictionary results registry_name = _get_registry_name_for_linking(registry_mode, client, registry) result = add_links_to_response( result, "schema", registry_name, subject=subject, version=version, context=context, ) return result except Exception as e: return create_error_response(str(e), error_code="SCHEMA_EXPORT_FAILED", registry_mode=registry_mode) @structured_output("export_subject", fallback_on_error=True) def export_subject_tool( subject: str, registry_manager, registry_mode: str, context: Optional[str] = None, include_metadata: bool = True, include_config: bool = True, include_versions: str = "all", registry: Optional[str] = None, ) -> Dict[str, Any]: """ Export all versions of a subject. Args: subject: The subject name context: Optional schema context include_metadata: Include export metadata include_config: Include subject configuration include_versions: Which versions to include (all, latest) registry: Optional registry name (ignored in single-registry mode) Returns: Dictionary containing subject export data with structured validation and resource links """ try: if registry_mode == "single": client = get_default_client(registry_manager) else: client = registry_manager.get_registry(registry) if client is None: return create_error_response( "No registry configured or registry not found", error_code="REGISTRY_NOT_CONFIGURED", registry_mode=registry_mode, ) result = common_export_subject(client, subject, context, include_metadata, include_config, include_versions) # Add structured output metadata result["registry_mode"] = registry_mode result["mcp_protocol_version"] = "2025-06-18" # Ensure required fields for export subject if "subject" not in result: result["subject"] = subject if "versions" not in result: result["versions"] = [] # Add resource links registry_name = _get_registry_name_for_linking(registry_mode, client, registry) result = add_links_to_response(result, "subject", registry_name, subject=subject, context=context) return result except Exception as e: return create_error_response(str(e), error_code="SUBJECT_EXPORT_FAILED", registry_mode=registry_mode) @structured_output("export_context", fallback_on_error=True) async def export_context_tool( context: str, registry_manager, registry_mode: str, registry: Optional[str] = None, include_metadata: bool = True, include_config: bool = True, include_versions: str = "all", mcp_context: Optional["Context"] = None, ) -> Dict[str, Any]: """ Export all subjects within a context. Args: context: The context name registry: Optional registry name (ignored in single-registry mode) include_metadata: Include export metadata include_config: Include configuration data include_versions: Which versions to include (all, latest) mcp_context: MCP Context for progress reporting Returns: Dictionary containing context export data with structured validation and resource links """ try: # Initial setup (0-10%) if mcp_context: await mcp_context.info(f"Starting context export: {context}") await mcp_context.report_progress(0.0, 100.0, "Initializing context export") if registry_mode == "single": # Single-registry mode: use common function client = get_default_client(registry_manager) if client is None: return create_error_response( "No default registry configured", error_code="REGISTRY_NOT_CONFIGURED", registry_mode="single", ) if mcp_context: await mcp_context.report_progress(5.0, 100.0, "Using default registry client") result = common_export_context(client, context, include_metadata, include_config, include_versions) result["registry_mode"] = "single" result["mcp_protocol_version"] = "2025-06-18" # Add resource links registry_name = _get_registry_name_for_linking(registry_mode, client, registry) result = add_links_to_response(result, "context", registry_name, context=context) if mcp_context: await mcp_context.info("Context export completed successfully") await mcp_context.report_progress(100.0, 100.0, "Context export completed") return result else: # Multi-registry mode: use client approach client = registry_manager.get_registry(registry) if client is None: return create_error_response( f"Registry '{registry}' not found", error_code="REGISTRY_NOT_FOUND", registry_mode="multi", ) if mcp_context: await mcp_context.report_progress(10.0, 100.0, f"Registry client '{registry}' initialized") # Get all subjects in context (10-25%) if mcp_context: await mcp_context.info(f"Fetching subjects from context: {context}") await mcp_context.report_progress(15.0, 100.0, f"Fetching subjects from context '{context}'") subjects_list = client.get_subjects(context) if isinstance(subjects_list, dict) and "error" in subjects_list: return create_error_response( f"Failed to get subjects for context '{context}': {subjects_list.get('error')}", error_code="CONTEXT_SUBJECTS_RETRIEVAL_FAILED", registry_mode=registry_mode, ) if mcp_context: await mcp_context.report_progress(25.0, 100.0, f"Found {len(subjects_list)} subjects in context") # Export each subject (25-70%) subjects_data = [] if mcp_context: await mcp_context.info(f"Exporting {len(subjects_list)} subjects") await mcp_context.report_progress(30.0, 100.0, f"Starting export of {len(subjects_list)} subjects") for i, subject in enumerate(subjects_list): # Report progress for subject export if mcp_context and len(subjects_list) > 0: progress = 30.0 + (i / len(subjects_list)) * 40.0 # 30% to 70% await mcp_context.report_progress( progress, 100.0, f"Exporting subject {i+1}/{len(subjects_list)}: {subject}" ) subject_export = export_subject_tool( subject, registry_manager, registry_mode, context, include_metadata, include_config, include_versions, registry, ) if "error" not in subject_export: subjects_data.append(subject_export) if mcp_context: await mcp_context.report_progress(70.0, 100.0, f"Exported {len(subjects_data)} subjects successfully") # Build result structure (70-80%) if mcp_context: await mcp_context.report_progress(75.0, 100.0, "Building export result structure") result = { "context": context, "subjects": subjects_data, "subject_count": len(subjects_data), "registry": client.config.name, "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Add configuration data if requested (80-90%) if include_config: if mcp_context: await mcp_context.report_progress(80.0, 100.0, "Fetching context configuration") global_config = client.get_global_config(context) if "error" not in global_config: result["global_config"] = global_config global_mode = client.get_mode(context) if "error" not in global_mode: result["global_mode"] = global_mode if mcp_context: await mcp_context.report_progress(85.0, 100.0, "Context configuration added") else: if mcp_context: await mcp_context.report_progress(85.0, 100.0, "Skipping context configuration") # Add metadata if requested (90-95%) if include_metadata: if mcp_context: await mcp_context.report_progress(90.0, 100.0, "Adding export metadata") from datetime import datetime result["metadata"] = { "exported_at": datetime.now().isoformat(), "registry_url": client.config.url, "registry_name": client.config.name, "export_version": "2.0.0", "registry_mode": "multi", } # Add resource links (95-100%) if mcp_context: await mcp_context.report_progress(95.0, 100.0, "Adding resource links") registry_name = _get_registry_name_for_linking(registry_mode, client, registry) result = add_links_to_response(result, "context", registry_name, context=context) if mcp_context: await mcp_context.info("Context export completed successfully") await mcp_context.report_progress(100.0, 100.0, "Context export completed") return result except Exception as e: if mcp_context: await mcp_context.error(f"Context export failed: {str(e)}") return create_error_response(str(e), error_code="CONTEXT_EXPORT_FAILED", registry_mode=registry_mode) @structured_output("export_global", fallback_on_error=True) async def export_global_tool( registry_manager, registry_mode: str, registry: Optional[str] = None, include_metadata: bool = True, include_config: bool = True, include_versions: str = "all", mcp_context: Optional["Context"] = None, ) -> Dict[str, Any]: """ Export all contexts and schemas from a registry. Args: registry: Optional registry name (ignored in single-registry mode) include_metadata: Include export metadata include_config: Include configuration data include_versions: Which versions to include (all, latest) mcp_context: MCP Context for progress reporting Returns: Dictionary containing global export data with structured validation and resource links """ try: # Initial setup (0-10%) if mcp_context: await mcp_context.info(f"Starting global export from registry: {registry or 'default'}") await mcp_context.report_progress(0.0, 100.0, "Initializing global export") if registry_mode == "single": # Single-registry mode: use common function client = get_default_client(registry_manager) if client is None: return create_error_response( "No default registry configured", error_code="REGISTRY_NOT_CONFIGURED", registry_mode="single", ) if mcp_context: await mcp_context.report_progress(5.0, 100.0, "Using default registry client") result = common_export_global(client, include_metadata, include_config, include_versions) result["registry_mode"] = "single" result["mcp_protocol_version"] = "2025-06-18" # Add resource links registry_name = _get_registry_name_for_linking(registry_mode, client, registry) result = add_links_to_response(result, "registry", registry_name) if mcp_context: await mcp_context.info("Global export completed successfully") await mcp_context.report_progress(100.0, 100.0, "Global export completed") return result else: # Multi-registry mode: use client approach client = registry_manager.get_registry(registry) if client is None: return create_error_response( f"Registry '{registry}' not found", error_code="REGISTRY_NOT_FOUND", registry_mode="multi", ) if mcp_context: await mcp_context.report_progress(10.0, 100.0, f"Registry client '{registry}' initialized") # Get all contexts (10-20%) if mcp_context: await mcp_context.info(f"Fetching contexts from registry: {registry}") await mcp_context.report_progress(15.0, 100.0, f"Fetching contexts from registry '{registry}'") contexts_list = client.get_contexts() if isinstance(contexts_list, dict) and "error" in contexts_list: return create_error_response( f"Failed to get contexts: {contexts_list.get('error')}", error_code="CONTEXTS_RETRIEVAL_FAILED", registry_mode=registry_mode, ) if mcp_context: await mcp_context.report_progress(20.0, 100.0, f"Found {len(contexts_list)} contexts in registry") # Export each context (20-70%) contexts_data = [] if mcp_context: await mcp_context.info(f"Exporting {len(contexts_list)} contexts") await mcp_context.report_progress(25.0, 100.0, f"Starting export of {len(contexts_list)} contexts") for i, context in enumerate(contexts_list): # Report progress for context export if mcp_context and len(contexts_list) > 0: progress = 25.0 + (i / len(contexts_list)) * 40.0 # 25% to 65% await mcp_context.report_progress( progress, 100.0, f"Exporting context {i+1}/{len(contexts_list)}: {context}" ) context_export = await export_context_tool( context, registry_manager, registry_mode, registry, include_metadata, include_config, include_versions, None, # Don't pass mcp_context to avoid nested progress reporting ) if "error" not in context_export: contexts_data.append(context_export) if mcp_context: await mcp_context.report_progress(65.0, 100.0, f"Exported {len(contexts_data)} contexts successfully") # Export default context (65-70%) if mcp_context: await mcp_context.report_progress(68.0, 100.0, "Exporting default context") default_export = await export_context_tool( "", registry_manager, registry_mode, registry, include_metadata, include_config, include_versions, None, # Don't pass mcp_context to avoid nested progress reporting ) if mcp_context: await mcp_context.report_progress(70.0, 100.0, "Default context export completed") # Build result structure (70-80%) if mcp_context: await mcp_context.report_progress(75.0, 100.0, "Building global export result structure") result = { "contexts": contexts_data, "contexts_count": len(contexts_data), "default_context": (default_export if "error" not in default_export else None), "registry": client.config.name, "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Add configuration data if requested (80-90%) if include_config: if mcp_context: await mcp_context.report_progress(80.0, 100.0, "Fetching global configuration") global_config = client.get_global_config() if "error" not in global_config: result["global_config"] = global_config global_mode = client.get_mode() if "error" not in global_mode: result["global_mode"] = global_mode if mcp_context: await mcp_context.report_progress(85.0, 100.0, "Global configuration added") else: if mcp_context: await mcp_context.report_progress(85.0, 100.0, "Skipping global configuration") # Add metadata if requested (90-95%) if include_metadata: if mcp_context: await mcp_context.report_progress(90.0, 100.0, "Adding export metadata") from datetime import datetime result["metadata"] = { "exported_at": datetime.now().isoformat(), "registry_url": client.config.url, "registry_name": client.config.name, "export_version": "2.0.0", "registry_mode": "multi", "total_contexts": len(contexts_data), "total_subjects": sum(len(ctx.get("subjects", [])) for ctx in contexts_data), } # Add resource links (95-100%) if mcp_context: await mcp_context.report_progress(95.0, 100.0, "Adding resource links") registry_name = _get_registry_name_for_linking(registry_mode, client, registry) result = add_links_to_response(result, "registry", registry_name) if mcp_context: await mcp_context.info("Global export completed successfully") await mcp_context.report_progress(100.0, 100.0, "Global export completed") return result except Exception as e: if mcp_context: await mcp_context.error(f"Global export failed: {str(e)}") return create_error_response(str(e), error_code="GLOBAL_EXPORT_FAILED", registry_mode=registry_mode)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server