Skip to main content
Glama

MCP Kafka Schema Reg

MIT License
23
  • Apple
  • Linux
elicitation.py33.3 kB
#!/usr/bin/env python3 """ Elicitation Module for Interactive Workflows This module implements the MCP 2025-06-18 elicitation capability, allowing tools to interactively request missing information from users. This enables more intelligent schema operations where the server can guide users through complex workflows. Key Features: - Multiple elicitation types (text, choice, confirmation, form) - Multi-round conversations support - Timeout handling and validation - Schema field definition assistance - Migration preference collection - Compatibility resolution guidance - Schema evolution assistant MCP 2025-06-18 Compliance: - Implements elicitation request/response protocol - Supports structured elicitation with multiple field types - Provides fallback mechanisms for non-supporting clients """ import asyncio import logging from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from enum import Enum from typing import Any, Dict, List, Optional from uuid import uuid4 logger = logging.getLogger(__name__) class ElicitationType(Enum): """Types of elicitation requests supported.""" TEXT = "text" CHOICE = "choice" CONFIRMATION = "confirmation" FORM = "form" MULTI_FIELD = "multi_field" class ElicitationPriority(Enum): """Priority levels for elicitation requests.""" HIGH = "high" MEDIUM = "medium" LOW = "low" @dataclass class ElicitationField: """Represents a single field in an elicitation request.""" name: str type: str label: Optional[str] = None description: Optional[str] = None required: bool = True default: Optional[Any] = None options: Optional[List[str]] = None validation: Optional[Dict[str, Any]] = None placeholder: Optional[str] = None @dataclass class ElicitationRequest: """Represents an elicitation request to the user.""" id: str = field(default_factory=lambda: str(uuid4())) type: ElicitationType = ElicitationType.TEXT title: str = "Information Required" description: Optional[str] = None fields: List[ElicitationField] = field(default_factory=list) priority: ElicitationPriority = ElicitationPriority.MEDIUM timeout_seconds: int = 300 # 5 minutes default allow_multiple: bool = False context: Optional[Dict[str, Any]] = None created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) expires_at: Optional[datetime] = None def __post_init__(self): """Calculate expiration time if not set.""" if self.expires_at is None: self.expires_at = self.created_at + timedelta(seconds=self.timeout_seconds) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for MCP protocol.""" return { "id": self.id, "type": self.type.value, "title": self.title, "description": self.description, "fields": [ { "name": f.name, "type": f.type, "label": f.label or f.name.replace("_", " ").title(), "description": f.description, "required": f.required, "default": f.default, "options": f.options, "validation": f.validation, "placeholder": f.placeholder, } for f in self.fields ], "priority": self.priority.value, "timeout_seconds": self.timeout_seconds, "allow_multiple": self.allow_multiple, "context": self.context, "created_at": self.created_at.isoformat(), "expires_at": self.expires_at.isoformat() if self.expires_at else None, } def is_expired(self) -> bool: """Check if the elicitation request has expired.""" if self.expires_at is None: return False return datetime.now(timezone.utc) > self.expires_at @dataclass class ElicitationResponse: """Represents a user's response to an elicitation request.""" request_id: str values: Dict[str, Any] timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) complete: bool = True metadata: Optional[Dict[str, Any]] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "request_id": self.request_id, "values": self.values, "timestamp": self.timestamp.isoformat(), "complete": self.complete, "metadata": self.metadata, } class ElicitationManager: """Manages elicitation requests and responses.""" def __init__(self): self.pending_requests: Dict[str, ElicitationRequest] = {} self.responses: Dict[str, ElicitationResponse] = {} self.timeout_tasks: Dict[str, asyncio.Task] = {} async def create_request(self, request: ElicitationRequest) -> str: """Create a new elicitation request.""" self.pending_requests[request.id] = request # Set up timeout handling if request.timeout_seconds > 0: self.timeout_tasks[request.id] = asyncio.create_task( self._handle_timeout(request.id, request.timeout_seconds) ) logger.info(f"Created elicitation request {request.id}: {request.title}") return request.id async def submit_response(self, response: ElicitationResponse) -> bool: """Submit a response to an elicitation request.""" if response.request_id not in self.pending_requests: logger.warning(f"No pending request found for ID {response.request_id}") return False request = self.pending_requests[response.request_id] if request.is_expired(): logger.warning(f"Request {response.request_id} has expired") return False # Validate response if not self._validate_response(request, response): logger.warning(f"Invalid response for request {response.request_id}") return False # Store response and clean up self.responses[response.request_id] = response self._cleanup_request(response.request_id) logger.info(f"Received response for elicitation request {response.request_id}") return True async def wait_for_response( self, request_id: str, timeout: Optional[float] = None ) -> Optional[ElicitationResponse]: """Wait for a response to an elicitation request.""" if request_id not in self.pending_requests: return None request = self.pending_requests[request_id] effective_timeout = timeout or request.timeout_seconds start_time = datetime.now(timezone.utc) while datetime.now(timezone.utc) - start_time < timedelta(seconds=effective_timeout): if request_id in self.responses: return self.responses[request_id] if request.is_expired(): logger.warning(f"Request {request_id} expired while waiting") break await asyncio.sleep(0.1) # Check every 100ms return None def get_request(self, request_id: str) -> Optional[ElicitationRequest]: """Get an elicitation request by ID.""" return self.pending_requests.get(request_id) def get_response(self, request_id: str) -> Optional[ElicitationResponse]: """Get a response by request ID.""" return self.responses.get(request_id) def list_pending_requests(self) -> List[ElicitationRequest]: """List all pending elicitation requests.""" return list(self.pending_requests.values()) def cancel_request(self, request_id: str) -> bool: """Cancel a pending elicitation request.""" if request_id in self.pending_requests: self._cleanup_request(request_id) logger.info(f"Cancelled elicitation request {request_id}") return True return False async def elicit(self, request: ElicitationRequest) -> Optional[Dict[str, Any]]: """ Perform elicitation with fallback support. This method acts as a wrapper around the existing elicitation system, providing a simple interface for bulk operations and other tools. Args: request: The elicitation request to process Returns: Dictionary containing the elicited values, or None if failed """ try: # Create and register the request request_id = await self.create_request(request) # Use the existing elicit_with_fallback function response = await elicit_with_fallback(request) if response and response.complete: logger.info(f"Elicitation successful for request {request_id}") return response.values else: logger.warning(f"Elicitation failed or incomplete for request {request_id}") return None except Exception as e: logger.error(f"Error in elicitation: {str(e)}") return None async def _handle_timeout(self, request_id: str, timeout_seconds: int): """Handle request timeout.""" await asyncio.sleep(timeout_seconds) if request_id in self.pending_requests: logger.warning(f"Elicitation request {request_id} timed out") self._cleanup_request(request_id) def _cleanup_request(self, request_id: str): """Clean up a completed or cancelled request.""" self.pending_requests.pop(request_id, None) if request_id in self.timeout_tasks: self.timeout_tasks[request_id].cancel() del self.timeout_tasks[request_id] def _validate_response(self, request: ElicitationRequest, response: ElicitationResponse) -> bool: """Validate a response against the request requirements.""" # Check required fields for req_field in request.fields: if req_field.required and req_field.name not in response.values: logger.warning(f"Missing required field: {req_field.name}") return False # Validate field types and constraints if req_field.name in response.values: value = response.values[req_field.name] if req_field.options and value not in req_field.options: logger.warning(f"Invalid option for field {req_field.name}: {value}") return False # Additional validation based on field type if req_field.type == "email" and "@" not in str(value): logger.warning(f"Invalid email format for field {req_field.name}: {value}") return False return True # Global elicitation manager instance elicitation_manager = ElicitationManager() # Helper functions for common elicitation patterns def create_schema_field_elicitation( context: Optional[str] = None, existing_fields: Optional[List[str]] = None ) -> ElicitationRequest: """Create an elicitation request for schema field definitions.""" fields = [ ElicitationField( name="field_name", type="text", label="Field Name", description="Name of the schema field", required=True, placeholder="e.g., user_id, email, timestamp", ), ElicitationField( name="field_type", type="choice", label="Field Type", description="Data type for the field", required=True, options=[ "string", "int", "long", "float", "double", "boolean", "bytes", "array", "record", ], default="string", ), ElicitationField( name="nullable", type="choice", label="Nullable", description="Can this field be null?", required=True, options=["true", "false"], default="false", ), ElicitationField( name="default_value", type="text", label="Default Value", description="Default value for the field (optional)", required=False, placeholder="Leave empty for no default", ), ElicitationField( name="documentation", type="text", label="Documentation", description="Description of what this field represents", required=False, placeholder="Brief description of the field purpose", ), ] context_info = {"existing_fields": existing_fields or []} if context: context_info["schema_context"] = context return ElicitationRequest( type=ElicitationType.FORM, title="Define Schema Field", description="Please provide details for the new schema field", fields=fields, allow_multiple=True, context=context_info, timeout_seconds=600, # 10 minutes for schema design ) def create_migration_preferences_elicitation( source_registry: str, target_registry: str, context: Optional[str] = None ) -> ElicitationRequest: """Create an elicitation request for migration preferences.""" fields = [ ElicitationField( name="preserve_ids", type="choice", label="Preserve Schema IDs", description="Should schema IDs be preserved during migration?", required=True, options=["true", "false"], default="true", ), ElicitationField( name="migrate_all_versions", type="choice", label="Migrate All Versions", description="Migrate all schema versions or just the latest?", required=True, options=["true", "false"], default="false", ), ElicitationField( name="conflict_resolution", type="choice", label="Conflict Resolution", description="How to handle conflicts if schema already exists in target?", required=True, options=["skip", "overwrite", "merge", "prompt"], default="prompt", ), ElicitationField( name="batch_size", type="text", label="Batch Size", description="Number of schemas to migrate in each batch", required=False, default="10", validation={"min": 1, "max": 100}, ), ElicitationField( name="dry_run", type="choice", label="Dry Run", description="Perform a dry run first to preview changes?", required=True, options=["true", "false"], default="true", ), ] return ElicitationRequest( type=ElicitationType.FORM, title="Migration Preferences", description=f"Configure migration from {source_registry} to {target_registry}", fields=fields, context={ "source_registry": source_registry, "target_registry": target_registry, "context": context, }, timeout_seconds=300, ) def create_compatibility_resolution_elicitation(subject: str, compatibility_errors: List[str]) -> ElicitationRequest: """Create an elicitation request for compatibility issue resolution.""" fields = [ ElicitationField( name="resolution_strategy", type="choice", label="Resolution Strategy", description="How would you like to resolve the compatibility issues?", required=True, options=[ "modify_schema", "change_compatibility_level", "add_default_values", "make_fields_optional", "skip_registration", ], ), ElicitationField( name="compatibility_level", type="choice", label="New Compatibility Level", description="Set a different compatibility level for this subject", required=False, options=["BACKWARD", "FORWARD", "FULL", "NONE"], default="BACKWARD", ), ElicitationField( name="notes", type="text", label="Notes", description="Additional notes about this compatibility decision", required=False, placeholder="Explain the reasoning for this change", ), ] return ElicitationRequest( type=ElicitationType.FORM, title="Resolve Compatibility Issues", description=f"Schema for subject '{subject}' has compatibility issues that need resolution", fields=fields, context={"subject": subject, "compatibility_errors": compatibility_errors}, timeout_seconds=300, ) def create_context_metadata_elicitation(context_name: str) -> ElicitationRequest: """Create an elicitation request for context metadata.""" fields = [ ElicitationField( name="description", type="text", label="Context Description", description="What is this context used for?", required=False, placeholder="e.g., User service schemas, Payment processing events", ), ElicitationField( name="owner", type="text", label="Owner", description="Team or person responsible for this context", required=False, placeholder="e.g., data-platform-team", ), ElicitationField( name="environment", type="choice", label="Environment", description="What environment is this context for?", required=False, options=["development", "staging", "production", "testing"], default="development", ), ElicitationField( name="tags", type="text", label="Tags", description="Comma-separated tags for organization", required=False, placeholder="e.g., microservice, events, user-data", ), ] return ElicitationRequest( type=ElicitationType.FORM, title="Context Metadata", description=f"Please provide metadata for the new context '{context_name}'", fields=fields, context={"context_name": context_name}, timeout_seconds=300, ) def create_export_preferences_elicitation(operation_type: str) -> ElicitationRequest: """Create an elicitation request for export preferences.""" fields = [ ElicitationField( name="format", type="choice", label="Export Format", description="Which format would you like for the export?", required=True, options=["json", "avro_idl", "yaml", "csv"], default="json", ), ElicitationField( name="include_metadata", type="choice", label="Include Metadata", description="Include schema metadata in the export?", required=True, options=["true", "false"], default="true", ), ElicitationField( name="include_versions", type="choice", label="Version Inclusion", description="Which schema versions to include?", required=True, options=["latest", "all", "specific"], default="latest", ), ElicitationField( name="compression", type="choice", label="Compression", description="Compress the export file?", required=False, options=["none", "gzip", "zip"], default="none", ), ] return ElicitationRequest( type=ElicitationType.FORM, title="Export Preferences", description=f"Configure {operation_type} export settings", fields=fields, context={"operation": operation_type}, timeout_seconds=300, ) def create_migrate_schema_elicitation( subject: str, source_registry: str, target_registry: str, schema_exists_in_target: bool = False, existing_versions: Optional[List[int]] = None, context: Optional[str] = None, ) -> ElicitationRequest: """Create an elicitation request for migrate schema preferences.""" fields = [] # If schema exists in target, ask about replacement and backup if schema_exists_in_target: fields.extend( [ ElicitationField( name="replace_existing", type="choice", label="Replace Existing Schema", description=( f"Schema '{subject}' already exists in target registry '{target_registry}'. " "Should it be replaced?" ), required=True, options=["true", "false"], default="false", ), ElicitationField( name="backup_before_replace", type="choice", label="Backup Before Replace", description="Should existing schema be backed up (exported) before replacement?", required=False, options=["true", "false"], default="true", ), ] ) # Always ask about ID preservation fields.append( ElicitationField( name="preserve_ids", type="choice", label="Preserve Schema IDs", description="Should schema IDs be preserved during migration? (Requires IMPORT mode on target registry)", required=True, options=["true", "false"], default="true", ) ) # Always ask about post-migration comparison fields.append( ElicitationField( name="compare_after_migration", type="choice", label="Compare After Migration", description="Should schemas be compared after migration to verify successful transfer?", required=True, options=["true", "false"], default="true", ) ) # Additional migration preferences fields.extend( [ ElicitationField( name="migrate_all_versions", type="choice", label="Migrate All Versions", description="Migrate all schema versions or just the latest?", required=True, options=["true", "false"], default="false", ), ElicitationField( name="dry_run", type="choice", label="Dry Run First", description="Perform a dry run to preview changes before actual migration?", required=True, options=["true", "false"], default="true", ), ] ) # Build description based on whether schema exists if schema_exists_in_target: description = ( f"Schema '{subject}' already exists in target registry '{target_registry}' " f"with versions {existing_versions}. Configure migration preferences." ) else: description = f"Configure migration of schema '{subject}' from '{source_registry}' to '{target_registry}'" return ElicitationRequest( type=ElicitationType.FORM, title="Schema Migration Preferences", description=description, fields=fields, context={ "subject": subject, "source_registry": source_registry, "target_registry": target_registry, "schema_exists_in_target": schema_exists_in_target, "existing_versions": existing_versions, "context": context, }, timeout_seconds=300, ) def create_schema_evolution_elicitation( subject: str, current_schema: Dict[str, Any], proposed_changes: List[Dict[str, Any]], compatibility_mode: str = "BACKWARD", ) -> ElicitationRequest: """Create an elicitation request for schema evolution guidance. This helps users understand the impact of schema changes and guides them through safe evolution strategies. Args: subject: The schema subject name current_schema: The current schema definition proposed_changes: List of proposed changes with their impact compatibility_mode: Current compatibility mode setting """ # Analyze the proposed changes to determine available strategies change_types = set() for change in proposed_changes: change_types.add(change.get("type", "unknown")) # Build strategy options based on change types strategy_options = [] if "add_field" in change_types: strategy_options.extend( [ "add_optional_field", "add_required_with_default", ] ) if "remove_field" in change_types: strategy_options.append("deprecate_field") if "modify_field" in change_types: strategy_options.extend( [ "change_type_safely", ] ) # Always include these options strategy_options.extend( [ "create_new_version", "create_new_subject", ] ) fields = [ ElicitationField( name="evolution_strategy", type="choice", label="Evolution Strategy", description="How should we handle this schema change?", required=True, options=strategy_options, default=strategy_options[0] if strategy_options else "create_new_version", ), ElicitationField( name="consumer_notification", type="choice", label="Consumer Notification", description="How should we notify consumers about this change?", required=True, options=[ "automatic_notification", "manual_review", "changelog_entry", "no_notification", ], default="automatic_notification", ), ElicitationField( name="rollback_strategy", type="choice", label="Rollback Strategy", description="If issues arise, how should we handle rollback?", required=True, options=[ "automatic_rollback", "manual_decision", "dual_support", "no_rollback", ], default="manual_decision", ), ElicitationField( name="testing_approach", type="choice", label="Testing Approach", description="How should this schema change be tested?", required=True, options=[ "staging_first", "canary_deployment", "parallel_testing", "direct_production", ], default="staging_first", ), ElicitationField( name="migration_timeline", type="choice", label="Migration Timeline", description="When should consumers migrate to the new schema?", required=True, options=[ "immediate", "1_week", "2_weeks", "1_month", "coordinated_date", ], default="2_weeks", ), ElicitationField( name="documentation_notes", type="text", label="Documentation Notes", description="Additional notes about this evolution (migration guide, breaking changes, etc.)", required=False, placeholder="e.g., Field X type is being changed from int to string...", ), ] # Build a description with change summary change_summary = "\n".join( [ f"- {change.get('type', 'Unknown')}: {change.get('description', 'No description')}" for change in proposed_changes[:5] # Limit to first 5 changes ] ) if len(proposed_changes) > 5: change_summary += f"\n... and {len(proposed_changes) - 5} more changes" description = ( f"Schema '{subject}' has proposed changes that need evolution planning:\n\n" f"{change_summary}\n\n" f"Current compatibility mode: {compatibility_mode}" ) return ElicitationRequest( type=ElicitationType.FORM, title="Schema Evolution Assistant", description=description, fields=fields, context={ "subject": subject, "current_schema": current_schema, "proposed_changes": proposed_changes, "compatibility_mode": compatibility_mode, "change_types": list(change_types), }, timeout_seconds=600, # 10 minutes for planning priority=ElicitationPriority.HIGH, # Schema evolution is critical ) # Mock elicitation function for non-supporting clients async def mock_elicit(request: ElicitationRequest) -> Optional[ElicitationResponse]: """ Mock elicitation function that provides default responses for non-supporting clients. This ensures graceful degradation when elicitation is not available. """ logger.info(f"Mock elicitation for request: {request.title}") # Generate reasonable defaults based on request type and context values = {} for req_field in request.fields: if req_field.default is not None: values[req_field.name] = req_field.default elif req_field.type == "choice" and req_field.options: values[req_field.name] = req_field.options[0] # Use first option as default elif req_field.type == "text": values[req_field.name] = req_field.placeholder or "" elif req_field.type == "confirmation": values[req_field.name] = "false" # Conservative default else: values[req_field.name] = None # Override with context-specific intelligent defaults if request.context: if "schema_context" in request.context: # For schema fields, provide sensible defaults if "field_type" in values: values["field_type"] = "string" if "nullable" in values: values["nullable"] = "false" elif "migration" in request.context: # For migration, use safe defaults if "preserve_ids" in values: values["preserve_ids"] = "true" if "dry_run" in values: values["dry_run"] = "true" elif "evolution" in request.title.lower(): # For schema evolution, use conservative defaults if "evolution_strategy" in values: # Prefer safe strategies if "add_optional_field" in values.get("evolution_strategy", []): values["evolution_strategy"] = "add_optional_field" else: values["evolution_strategy"] = "create_new_version" if "rollback_strategy" in values: values["rollback_strategy"] = "manual_decision" if "testing_approach" in values: values["testing_approach"] = "staging_first" return ElicitationResponse( request_id=request.id, values=values, complete=True, metadata={"source": "mock_fallback", "auto_generated": True}, ) # Integration function to check if elicitation is supported def is_elicitation_supported() -> bool: """ Check if the current MCP client supports elicitation. This can be extended to check client capabilities. """ # For now, assume elicitation is supported # In a real implementation, this would check the client's declared capabilities return True async def elicit_with_fallback( request: ElicitationRequest, ) -> Optional[ElicitationResponse]: """ Attempt elicitation with fallback to defaults if not supported. """ if is_elicitation_supported(): # In a real implementation, this would use the MCP elicitation protocol # For now, we'll simulate with the mock function logger.info(f"Elicitation not yet implemented, using fallback for: {request.title}") return await mock_elicit(request) else: logger.info(f"Client doesn't support elicitation, using defaults for: {request.title}") return await mock_elicit(request)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server