Skip to main content
Glama

MCP Kafka Schema Reg

MIT License
23
  • Apple
  • Linux
comparison_tools.py24.9 kB
#!/usr/bin/env python3 """ Comparison Tools Module - Updated with Resource Linking Handles registry comparison operations with structured tool output support per MCP 2025-06-18 specification including resource linking. Provides registry comparison, context comparison, and missing schema detection with JSON Schema validation, type-safe responses, and HATEOAS navigation links. """ from datetime import datetime from typing import TYPE_CHECKING, Any, Dict, List, Optional from resource_linking import add_links_to_response from schema_validation import ( create_error_response, structured_output, ) if TYPE_CHECKING: from fastmcp.server.context import Context @structured_output("compare_registries", fallback_on_error=True) async def compare_registries_tool( source_registry: str, target_registry: str, registry_manager, registry_mode: str, include_contexts: bool = True, include_configs: bool = True, context: Optional["Context"] = None, ) -> Dict[str, Any]: """ Compare two Schema Registry instances and show differences. Only available in multi-registry mode. Args: source_registry: Source registry name target_registry: Target registry name include_contexts: Include context comparison include_configs: Include configuration comparison context: MCP Context for progress reporting Returns: Comparison results with structured validation and resource links, or error if in single-registry mode """ if registry_mode == "single": return create_error_response( "Registry comparison is only available in multi-registry mode", details={"suggestion": "Set REGISTRY_MODE=multi to enable registry comparison"}, error_code="SINGLE_REGISTRY_MODE_LIMITATION", registry_mode=registry_mode, ) try: # Initial setup (0-10%) if context: await context.info(f"Starting registry comparison: {source_registry} vs {target_registry}") await context.report_progress(0.0, 100.0, "Initializing registry comparison") source_client = registry_manager.get_registry(source_registry) target_client = registry_manager.get_registry(target_registry) if not source_client: return create_error_response( f"Source registry '{source_registry}' not found", error_code="SOURCE_REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) if not target_client: return create_error_response( f"Target registry '{target_registry}' not found", error_code="TARGET_REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) if context: await context.report_progress(10.0, 100.0, "Registry clients initialized") comparison: Dict[str, Any] = { "source_registry": source_registry, "target_registry": target_registry, "timestamp": datetime.now().isoformat(), "differences": {}, "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Compare subjects (10-30%) if context: await context.info(f"Fetching subjects from source registry: {source_registry}") await context.report_progress(15.0, 100.0, f"Fetching subjects from {source_registry}") source_subjects = set(source_client.get_subjects() or []) if context: await context.report_progress(20.0, 100.0, f"Found {len(source_subjects)} subjects in source registry") if context: await context.info(f"Fetching subjects from target registry: {target_registry}") await context.report_progress(25.0, 100.0, f"Fetching subjects from {target_registry}") target_subjects = set(target_client.get_subjects() or []) if context: await context.report_progress(30.0, 100.0, f"Found {len(target_subjects)} subjects in target registry") # Build subject comparison (30-35%) if context: await context.report_progress(30.0, 100.0, "Building subject comparison") comparison["differences"]["subjects"] = { "only_in_source": list(source_subjects - target_subjects), "only_in_target": list(target_subjects - source_subjects), "in_both": list(source_subjects & target_subjects), "source_total": len(source_subjects), "target_total": len(target_subjects), "common_count": len(source_subjects & target_subjects), } if context: await context.report_progress(35.0, 100.0, "Subject comparison completed") # Compare schemas for common subjects (35-55%) common_subjects = source_subjects & target_subjects schema_differences = [] if context: await context.info(f"Comparing schemas for {len(common_subjects)} common subjects") await context.report_progress(40.0, 100.0, f"Comparing schemas for {len(common_subjects)} common subjects") if common_subjects: for i, subject in enumerate(common_subjects): # Report progress for schema comparison if context and len(common_subjects) > 0: progress = 40.0 + (i / len(common_subjects)) * 15.0 # 40% to 55% await context.report_progress(progress, 100.0, f"Comparing schema versions for {subject}") source_versions = source_client.get_schema_versions(subject) or [] target_versions = target_client.get_schema_versions(subject) or [] if source_versions != target_versions: schema_differences.append( { "subject": subject, "source_versions": source_versions, "target_versions": target_versions, "source_version_count": (len(source_versions) if isinstance(source_versions, list) else 0), "target_version_count": (len(target_versions) if isinstance(target_versions, list) else 0), } ) if context: await context.report_progress( 55.0, 100.0, f"Found {len(schema_differences)} subjects with schema differences" ) comparison["differences"]["schemas"] = { "subjects_with_differences": schema_differences, "subjects_with_differences_count": len(schema_differences), } # Compare contexts if requested (55-70%) if include_contexts: if context: await context.info("Comparing contexts between registries") await context.report_progress(60.0, 100.0, "Fetching contexts from source registry") source_contexts = set(source_client.get_contexts() or []) if context: await context.report_progress(65.0, 100.0, f"Found {len(source_contexts)} contexts in source registry") target_contexts = set(target_client.get_contexts() or []) if context: await context.report_progress(70.0, 100.0, f"Found {len(target_contexts)} contexts in target registry") comparison["differences"]["contexts"] = { "only_in_source": list(source_contexts - target_contexts), "only_in_target": list(target_contexts - source_contexts), "in_both": list(source_contexts & target_contexts), "source_total": len(source_contexts), "target_total": len(target_contexts), "common_count": len(source_contexts & target_contexts), } else: if context: await context.report_progress(70.0, 100.0, "Skipping context comparison") # Compare configurations if requested (70-85%) if include_configs: if context: await context.info("Comparing global configurations") await context.report_progress(75.0, 100.0, "Fetching global configurations") source_config = source_client.get_global_config() target_config = target_client.get_global_config() # Remove registry-specific fields for comparison source_config_clean = {k: v for k, v in source_config.items() if k not in ["registry", "error"]} target_config_clean = {k: v for k, v in target_config.items() if k not in ["registry", "error"]} if context: await context.report_progress(80.0, 100.0, "Analyzing configuration differences") comparison["differences"]["global_config"] = { "source": source_config, "target": target_config, "match": source_config_clean == target_config_clean, "differences": { k: { "source": source_config_clean.get(k), "target": target_config_clean.get(k), } for k in set(source_config_clean.keys()) | set(target_config_clean.keys()) if source_config_clean.get(k) != target_config_clean.get(k) }, } if context: await context.report_progress(85.0, 100.0, "Configuration comparison completed") else: if context: await context.report_progress(85.0, 100.0, "Skipping configuration comparison") # Add summary statistics (85-95%) if context: await context.report_progress(90.0, 100.0, "Building comparison summary") comparison["summary"] = { "subjects_only_in_source": len(comparison["differences"]["subjects"]["only_in_source"]), "subjects_only_in_target": len(comparison["differences"]["subjects"]["only_in_target"]), "subjects_in_both": len(comparison["differences"]["subjects"]["in_both"]), "schemas_with_differences": len(schema_differences), "registries_match": ( len(comparison["differences"]["subjects"]["only_in_source"]) == 0 and len(comparison["differences"]["subjects"]["only_in_target"]) == 0 and len(schema_differences) == 0 ), } # Add resource links (95-100%) if context: await context.report_progress(95.0, 100.0, "Adding resource links") comparison = add_links_to_response( comparison, "comparison", source_registry, source_registry=source_registry, target_registry=target_registry, ) if context: await context.info("Registry comparison completed successfully") await context.report_progress(100.0, 100.0, "Registry comparison completed") return comparison except Exception as e: if context: await context.error(f"Registry comparison failed: {str(e)}") return create_error_response(str(e), error_code="REGISTRY_COMPARISON_FAILED", registry_mode=registry_mode) @structured_output("compare_contexts_across_registries", fallback_on_error=True) async def compare_contexts_across_registries_tool( source_registry: str, target_registry: str, source_context: str, registry_manager, registry_mode: str, target_context: Optional[str] = None, context: Optional["Context"] = None, ) -> Dict[str, Any]: """ Compare contexts across two registries. Only available in multi-registry mode. Args: source_registry: Source registry name target_registry: Target registry name source_context: Source context name target_context: Target context name (defaults to source_context) context: MCP Context for progress reporting Returns: Context comparison results with structured validation and resource links """ if registry_mode == "single": return create_error_response( "Context comparison across registries is only available in multi-registry mode", details={"suggestion": "Set REGISTRY_MODE=multi to enable this feature"}, error_code="SINGLE_REGISTRY_MODE_LIMITATION", registry_mode=registry_mode, ) try: # Initial setup (0-10%) if context: await context.info( f"Starting context comparison: {source_registry}/{source_context} vs " f"{target_registry}/{target_context or source_context}" ) await context.report_progress(0.0, 100.0, "Initializing context comparison") source_client = registry_manager.get_registry(source_registry) target_client = registry_manager.get_registry(target_registry) if not source_client: return create_error_response( f"Source registry '{source_registry}' not found", error_code="SOURCE_REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) if not target_client: return create_error_response( f"Target registry '{target_registry}' not found", error_code="TARGET_REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) # Use source context for target if not specified if target_context is None: target_context = source_context if context: await context.report_progress(10.0, 100.0, "Registry clients initialized") # Get subjects from source context (10-30%) if context: await context.info(f"Fetching subjects from source context: {source_registry}/{source_context}") await context.report_progress(15.0, 100.0, f"Fetching subjects from {source_registry}/{source_context}") source_subjects = set(source_client.get_subjects(source_context) or []) if context: await context.report_progress(25.0, 100.0, f"Found {len(source_subjects)} subjects in source context") # Get subjects from target context (30-50%) if context: await context.info(f"Fetching subjects from target context: {target_registry}/{target_context}") await context.report_progress(35.0, 100.0, f"Fetching subjects from {target_registry}/{target_context}") target_subjects = set(target_client.get_subjects(target_context) or []) if context: await context.report_progress(45.0, 100.0, f"Found {len(target_subjects)} subjects in target context") # Build comparison structure (50-60%) if context: await context.report_progress(50.0, 100.0, "Building comparison structure") comparison: Dict[str, Any] = { "source": { "registry": source_registry, "context": source_context, "subject_count": len(source_subjects), "subjects": list(source_subjects), }, "target": { "registry": target_registry, "context": target_context, "subject_count": len(target_subjects), "subjects": list(target_subjects), }, "differences": { "only_in_source": list(source_subjects - target_subjects), "only_in_target": list(target_subjects - source_subjects), "in_both": list(source_subjects & target_subjects), }, "timestamp": datetime.now().isoformat(), "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } if context: await context.report_progress(60.0, 100.0, "Analyzing subject differences") # Compare schemas for common subjects (60-90%) common_subjects = source_subjects & target_subjects schema_differences = [] if context: await context.info(f"Comparing schemas for {len(common_subjects)} common subjects") await context.report_progress(65.0, 100.0, f"Comparing schemas for {len(common_subjects)} common subjects") if common_subjects: for i, subject in enumerate(common_subjects): # Report progress for schema comparison if context and len(common_subjects) > 0: progress = 65.0 + (i / len(common_subjects)) * 20.0 # 65% to 85% await context.report_progress(progress, 100.0, f"Comparing schema versions for {subject}") source_versions = source_client.get_schema_versions(subject, source_context) or [] target_versions = target_client.get_schema_versions(subject, target_context) or [] if source_versions != target_versions: schema_differences.append( { "subject": subject, "source_versions": source_versions, "target_versions": target_versions, "source_version_count": (len(source_versions) if isinstance(source_versions, list) else 0), "target_version_count": (len(target_versions) if isinstance(target_versions, list) else 0), } ) if context: await context.report_progress( 85.0, 100.0, f"Found {len(schema_differences)} subjects with version differences" ) comparison["schema_differences"] = { "subjects_with_differences": schema_differences, "count": len(schema_differences), } # Add summary (90-95%) if context: await context.report_progress(90.0, 100.0, "Building comparison summary") comparison["summary"] = { "contexts_match": ( len(comparison["differences"]["only_in_source"]) == 0 and len(comparison["differences"]["only_in_target"]) == 0 and len(schema_differences) == 0 ), "subjects_only_in_source": len(comparison["differences"]["only_in_source"]), "subjects_only_in_target": len(comparison["differences"]["only_in_target"]), "subjects_in_both": len(comparison["differences"]["in_both"]), "schemas_with_version_differences": len(schema_differences), } # Add resource links (95-100%) if context: await context.report_progress(95.0, 100.0, "Adding resource links") comparison = add_links_to_response( comparison, "comparison", source_registry, source_registry=source_registry, target_registry=target_registry, ) if context: await context.info("Context comparison completed successfully") await context.report_progress(100.0, 100.0, "Context comparison completed") return comparison except Exception as e: if context: await context.error(f"Context comparison failed: {str(e)}") return create_error_response(str(e), error_code="CONTEXT_COMPARISON_FAILED", registry_mode=registry_mode) @structured_output("find_missing_schemas", fallback_on_error=True) async def find_missing_schemas_tool( source_registry: str, target_registry: str, registry_manager, registry_mode: str, context: Optional[str] = None, ) -> Dict[str, Any]: """ Find schemas that exist in source registry but not in target registry. Only available in multi-registry mode. Args: source_registry: Source registry name target_registry: Target registry name context: Optional context to limit the search Returns: List of missing schemas with structured validation and resource links """ if registry_mode == "single": return create_error_response( "Finding missing schemas across registries is only available in multi-registry mode", details={"suggestion": "Set REGISTRY_MODE=multi to enable this feature"}, error_code="SINGLE_REGISTRY_MODE_LIMITATION", registry_mode=registry_mode, ) try: source_client = registry_manager.get_registry(source_registry) target_client = registry_manager.get_registry(target_registry) if not source_client: return create_error_response( f"Source registry '{source_registry}' not found", error_code="SOURCE_REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) if not target_client: return create_error_response( f"Target registry '{target_registry}' not found", error_code="TARGET_REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) # Get subjects based on context if context: source_subjects = set(source_client.get_subjects(context) or []) target_subjects = set(target_client.get_subjects(context) or []) else: source_subjects = set(source_client.get_subjects() or []) target_subjects = set(target_client.get_subjects() or []) # Find missing subjects missing_subjects = source_subjects - target_subjects result: Dict[str, Any] = { "source_registry": source_registry, "target_registry": target_registry, "context": context, "missing_subjects": list(missing_subjects), "missing_count": len(missing_subjects), "source_subject_count": len(source_subjects), "target_subject_count": len(target_subjects), "details": [], "timestamp": datetime.now().isoformat(), "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } # Ensure details is treated as a list details_list: List[Dict[str, Any]] = result["details"] # Get details for each missing subject for subject in missing_subjects: try: versions = source_client.get_schema_versions(subject, context) or [] latest_schema = None if versions: latest_version = max(versions) if isinstance(versions, list) else "latest" latest_schema = source_client.get_schema(subject, str(latest_version), context) details_list.append( { "subject": subject, "versions": versions, "version_count": (len(versions) if isinstance(versions, list) else 0), "latest_version": latest_version if versions else None, "latest_schema_id": ( latest_schema.get("id") if latest_schema and isinstance(latest_schema, dict) else None ), "schema_type": ( latest_schema.get("schemaType", "AVRO") if latest_schema and isinstance(latest_schema, dict) else None ), } ) except Exception as e: # If we can't get details for a subject, still include it in the list details_list.append( { "subject": subject, "versions": [], "version_count": 0, "error": f"Failed to get subject details: {str(e)}", } ) # Update result with processed details result["details"] = details_list # Add summary information result["summary"] = { "migration_needed": len(missing_subjects) > 0, "total_versions_to_migrate": sum(detail.get("version_count", 0) for detail in details_list), "subjects_with_multiple_versions": len( [detail for detail in details_list if detail.get("version_count", 0) > 1] ), } # Add resource links result = add_links_to_response( result, "comparison", source_registry, source_registry=source_registry, target_registry=target_registry, ) return result except Exception as e: return create_error_response( str(e), error_code="MISSING_SCHEMA_SEARCH_FAILED", registry_mode=registry_mode, )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server