"""
Elicitation Manager
Production-ready elicitation management system that handles the complete
elicitation lifecycle without hardcoding specific elicitation types.
Design Principles:
- DRY: One manager for all elicitation types
- KISS: Simple, predictable API
- Dynamic: Handles any elicitation automatically
- Extensible: Easy to add new features
- Production-ready: Full error handling and state management
"""
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import json
import uuid
from datetime import datetime, timedelta
import asyncio
from src.observability import get_logger
from .elicitations_ui import DynamicFormRenderer, form_renderer
logger = get_logger(__name__)
class ElicitationStatus(str, Enum):
"""Elicitation lifecycle status."""
PENDING = "pending"
DISPLAYED = "displayed"
SUBMITTED = "submitted"
DECLINED = "declined"
CANCELLED = "cancelled"
EXPIRED = "expired"
ERROR = "error"
@dataclass
class ElicitationSession:
"""Represents an active elicitation session."""
elicitation_id: str
request: Dict[str, Any]
status: ElicitationStatus
created_at: datetime
expires_at: Optional[datetime] = None
response_data: Optional[Dict[str, Any]] = None
response_action: Optional[str] = None
error_message: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
def is_expired(self) -> bool:
"""Check if elicitation has expired."""
if self.expires_at:
return datetime.utcnow() > self.expires_at
return False
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"elicitation_id": self.elicitation_id,
"request": self.request,
"status": self.status.value,
"created_at": self.created_at.isoformat(),
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
"response_data": self.response_data,
"response_action": self.response_action,
"error_message": self.error_message,
"metadata": self.metadata
}
class ElicitationManager:
"""
Production-ready elicitation manager.
Handles the complete lifecycle of MCP elicitations without hardcoding
specific elicitation types. Provides a clean API for the web chat
interface to interact with elicitations.
"""
def __init__(self, session_timeout_minutes: int = 30):
self.active_sessions: Dict[str, ElicitationSession] = {}
self.session_timeout = timedelta(minutes=session_timeout_minutes)
self.response_handlers: Dict[str, Callable] = {}
self.form_renderer = form_renderer
# Configuration
self.config = {
"max_active_sessions": 100,
"cleanup_interval_minutes": 5,
"default_session_timeout_minutes": session_timeout_minutes,
"enable_session_persistence": False
}
logger.info(
"elicitation_manager_initialized",
session_timeout_minutes=session_timeout_minutes,
config=self.config
)
async def create_elicitation(
self,
elicitation_request: Dict[str, Any],
response_handler: Optional[Callable] = None
) -> Dict[str, Any]:
"""
Create a new elicitation session.
Args:
elicitation_request: MCP elicitation/create request
response_handler: Callback function for handling responses
Returns:
Dict containing rendered elicitation and session info
"""
try:
# Generate unique ID
elicitation_id = str(uuid.uuid4())
# Check session limit
if len(self.active_sessions) >= self.config["max_active_sessions"]:
await self._cleanup_expired_sessions()
if len(self.active_sessions) >= self.config["max_active_sessions"]:
raise Exception("Maximum active sessions reached")
# Create session
expires_at = datetime.utcnow() + self.session_timeout
session = ElicitationSession(
elicitation_id=elicitation_id,
request=elicitation_request,
status=ElicitationStatus.PENDING,
created_at=datetime.utcnow(),
expires_at=expires_at
)
# Store session
self.active_sessions[elicitation_id] = session
# Store response handler
if response_handler:
self.response_handlers[elicitation_id] = response_handler
# Render the elicitation
rendered = self.form_renderer.render_elicitation_form(
elicitation_request,
elicitation_id
)
# Update session status
session.status = ElicitationStatus.DISPLAYED
session.metadata = rendered.get("metadata", {})
logger.info(
"elicitation_created",
elicitation_id=elicitation_id,
mode=rendered.get("mode"),
field_count=rendered.get("metadata", {}).get("field_count", 0)
)
return {
"success": True,
"elicitation_id": elicitation_id,
"rendered": rendered,
"session": session.to_dict(),
"expires_at": expires_at.isoformat()
}
except Exception as e:
logger.error(
"failed_to_create_elicitation",
error=str(e),
error_type=type(e).__name__
)
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__
}
async def handle_response(
self,
elicitation_id: str,
action: str,
data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Handle elicitation response from user.
Args:
elicitation_id: ID of the elicitation session
action: User action (accept, decline, cancel)
data: Form data (for accept action)
Returns:
Dict containing response handling result
"""
try:
# Validate session
session = self.active_sessions.get(elicitation_id)
if not session:
return {
"success": False,
"error": f"Elicitation session {elicitation_id} not found"
}
# Check if expired
if session.is_expired():
session.status = ElicitationStatus.EXPIRED
return {
"success": False,
"error": "Elicitation session has expired"
}
# Validate action
valid_actions = ["accept", "decline", "cancel"]
if action not in valid_actions:
return {
"success": False,
"error": f"Invalid action: {action}"
}
# Update session
session.status = ElicitationStatus(action)
session.response_action = action
session.response_data = data
logger.info(
"elicitation_response_handled",
elicitation_id=elicitation_id,
action=action,
has_data=bool(data)
)
# Call response handler if registered
handler = self.response_handlers.get(elicitation_id)
if handler:
try:
await handler(elicitation_id, action, data)
except Exception as e:
logger.error(
"elicitation_response_handler_error",
elicitation_id=elicitation_id,
error=str(e),
error_type=type(e).__name__
)
# Build MCP response
mcp_response = self._build_mcp_response(session)
return {
"success": True,
"elicitation_id": elicitation_id,
"action": action,
"data": data,
"mcp_response": mcp_response,
"session": session.to_dict()
}
except Exception as e:
logger.error(
"failed_to_handle_elicitation_response",
elicitation_id=elicitation_id,
error=str(e),
error_type=type(e).__name__
)
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__
}
async def get_session(self, elicitation_id: str) -> Dict[str, Any]:
"""Get elicitation session information."""
session = self.active_sessions.get(elicitation_id)
if not session:
return {
"success": False,
"error": f"Elicitation session {elicitation_id} not found"
}
return {
"success": True,
"session": session.to_dict()
}
async def cancel_elicitation(self, elicitation_id: str) -> Dict[str, Any]:
"""Cancel an active elicitation."""
session = self.active_sessions.get(elicitation_id)
if not session:
return {
"success": False,
"error": f"Elicitation session {elicitation_id} not found"
}
session.status = ElicitationStatus.CANCELLED
# Remove response handler
self.response_handlers.pop(elicitation_id, None)
logger.info("elicitation_cancelled", elicitation_id=elicitation_id)
return {
"success": True,
"elicitation_id": elicitation_id,
"session": session.to_dict()
}
async def cleanup_expired_sessions(self) -> Dict[str, Any]:
"""Clean up expired elicitation sessions."""
expired_ids = []
for elicitation_id, session in self.active_sessions.items():
if session.is_expired():
expired_ids.append(elicitation_id)
session.status = ElicitationStatus.EXPIRED
# Remove expired sessions
for elicitation_id in expired_ids:
del self.active_sessions[elicitation_id]
self.response_handlers.pop(elicitation_id, None)
logger.info(
"elicitation_sessions_cleaned",
expired_count=len(expired_ids),
remaining_sessions=len(self.active_sessions)
)
return {
"success": True,
"expired_count": len(expired_ids),
"remaining_sessions": len(self.active_sessions),
"expired_ids": expired_ids
}
async def get_active_sessions(self) -> Dict[str, Any]:
"""Get all active elicitation sessions."""
sessions = {}
for elicitation_id, session in self.active_sessions.items():
sessions[elicitation_id] = session.to_dict()
return {
"success": True,
"sessions": sessions,
"count": len(sessions)
}
def _build_mcp_response(self, session: ElicitationSession) -> Dict[str, Any]:
"""Build MCP protocol response from session."""
response = {
"jsonrpc": "2.0",
"id": session.request.get("id"),
"result": {
"action": session.response_action
}
}
# Add content for accept action
if session.response_action == "accept" and session.response_data:
response["result"]["content"] = session.response_data
return response
async def validate_elicitation_request(
self,
elicitation_request: Dict[str, Any]
) -> Dict[str, Any]:
"""
Validate elicitation request before processing.
Args:
elicitation_request: MCP elicitation/create request
Returns:
Dict containing validation result
"""
try:
# Check basic structure
if not isinstance(elicitation_request, dict):
return {
"valid": False,
"error": "Request must be a dictionary"
}
# Check method
if elicitation_request.get("method") != "elicitation/create":
return {
"valid": False,
"error": "Invalid method, expected 'elicitation/create'"
}
# Check params
params = elicitation_request.get("params", {})
if not isinstance(params, dict):
return {
"valid": False,
"error": "Params must be a dictionary"
}
# Check mode
mode = params.get("mode", "form")
if mode not in ["form", "url"]:
return {
"valid": False,
"error": "Invalid mode, must be 'form' or 'url'"
}
# Check message
if not params.get("message"):
return {
"valid": False,
"error": "Message is required"
}
# Mode-specific validation
if mode == "form":
schema = params.get("requestedSchema")
if not schema:
return {
"valid": False,
"error": "requestedSchema is required for form mode"
}
if not isinstance(schema, dict):
return {
"valid": False,
"error": "requestedSchema must be a dictionary"
}
elif mode == "url":
url = params.get("url")
if not url:
return {
"valid": False,
"error": "url is required for URL mode"
}
# Basic URL validation
if not (url.startswith("http://") or url.startswith("https://")):
return {
"valid": False,
"error": "url must start with http:// or https://"
}
return {
"valid": True,
"mode": mode,
"message": params.get("message")
}
except Exception as e:
logger.error(
"elicitation_validation_error",
error=str(e),
error_type=type(e).__name__
)
return {
"valid": False,
"error": f"Validation error: {str(e)}"
}
async def register_response_handler(
self,
elicitation_id: str,
handler: Callable
) -> Dict[str, Any]:
"""Register a response handler for an elicitation."""
session = self.active_sessions.get(elicitation_id)
if not session:
return {
"success": False,
"error": f"Elicitation session {elicitation_id} not found"
}
self.response_handlers[elicitation_id] = handler
return {
"success": True,
"elicitation_id": elicitation_id
}
def get_stats(self) -> Dict[str, Any]:
"""Get elicitation manager statistics."""
status_counts = {}
for session in self.active_sessions.values():
status = session.status.value
status_counts[status] = status_counts.get(status, 0) + 1
return {
"active_sessions": len(self.active_sessions),
"registered_handlers": len(self.response_handlers),
"status_distribution": status_counts,
"config": self.config,
"session_timeout_minutes": self.session_timeout.total_seconds() / 60
}
# Global instance for easy access
elicitation_manager = ElicitationManager()