"""
Official MCP Elicitations Protocol Implementation
This module implements the official MCP elicitation protocol as specified
in the MCP documentation. It works alongside the LLM-based elicitation
to provide a hybrid approach.
Key Features:
- Official MCP elicitation/create messages
- JSON Schema validation
- Form and URL mode support
- Integration with existing LLM elicitation
- MCP protocol compliant
Usage:
- Use for structured data collection
- Use when client supports elicitation capability
- Use for sensitive data (URL mode)
- Use as fallback when LLM extraction is insufficient
"""
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass
from enum import Enum
import json
import uuid
from src.observability import get_logger
logger = get_logger(__name__)
class ElicitationMode(str, Enum):
"""Elicitation modes as per MCP specification."""
FORM = "form"
URL = "url"
class ElicitationAction(str, Enum):
"""Elicitation response actions."""
ACCEPT = "accept"
DECLINE = "decline"
CANCEL = "cancel"
@dataclass
class ElicitationRequest:
"""Official MCP elicitation request."""
mode: ElicitationMode
message: str
requested_schema: Optional[Dict[str, Any]] = None # For form mode
url: Optional[str] = None # For URL mode
elicitation_id: Optional[str] = None # For URL mode
def to_mcp_request(self, request_id: str) -> Dict[str, Any]:
"""Convert to MCP elicitation/create request."""
params = {
"mode": self.mode.value,
"message": self.message
}
if self.mode == ElicitationMode.FORM and self.requested_schema:
params["requestedSchema"] = self.requested_schema
elif self.mode == ElicitationMode.URL:
if not self.url:
raise ValueError("URL mode requires a URL")
if not self.elicitation_id:
raise ValueError("URL mode requires an elicitation_id")
params["url"] = self.url
params["elicitationId"] = self.elicitation_id
return {
"jsonrpc": "2.0",
"id": request_id,
"method": "elicitation/create",
"params": params
}
@dataclass
class ElicitationResponse:
"""Official MCP elicitation response."""
action: ElicitationAction
content: Optional[Dict[str, Any]] = None # For accept action with form data
@classmethod
def from_mcp_response(cls, response: Dict[str, Any]) -> "ElicitationResponse":
"""Create from MCP response."""
result = response.get("result", {})
action = result.get("action", "cancel")
content = result.get("content")
return cls(
action=ElicitationAction(action),
content=content
)
def create_vendor_form_elicitation(
missing_fields: List[str],
extracted_data: Optional[Dict[str, Any]] = None
) -> ElicitationRequest:
"""
Create an official MCP elicitation for vendor creation.
Args:
missing_fields: List of fields that need to be collected
extracted_data: Data already extracted (for defaults)
Returns:
ElicitationRequest for form mode
"""
# Build JSON Schema based on missing fields
properties = {}
required = []
field_definitions = {
"vendor_name": {
"type": "string",
"title": "Vendor Name",
"description": "The legal name of the vendor company",
"minLength": 2,
"maxLength": 100
},
"vendor_email": {
"type": "string",
"title": "Email",
"description": "Primary contact email",
"format": "email"
},
"vendor_phone": {
"type": "string",
"title": "Phone",
"description": "Primary contact phone number",
"pattern": "^[\\d\\s\\-\\(\\)\\+]+$"
},
"vendor_website": {
"type": "string",
"title": "Website",
"description": "Company website URL",
"format": "uri"
},
"vendor_domain": {
"type": "string",
"title": "Business Domain",
"description": "Industry or business domain",
"maxLength": 100
},
"description": {
"type": "string",
"title": "Description",
"description": "Brief description of the vendor's business",
"maxLength": 500
},
"location": {
"type": "string",
"title": "Location",
"description": "Physical location of the vendor",
"maxLength": 200
},
"manager_name": {
"type": "string",
"title": "Manager Name",
"description": "Name of the vendor manager",
"maxLength": 100
},
"gl_code": {
"type": "string",
"title": "GL Code",
"description": "General ledger code",
"pattern": "^[0-9A-Za-z\\-]+$"
},
"department_id": {
"type": "integer",
"title": "Department ID",
"description": "Department for categorization",
"minimum": 1
},
"billing_type_id": {
"type": "integer",
"title": "Billing Type ID",
"description": "Billing type",
"minimum": 1
},
"contract_required": {
"type": "boolean",
"title": "Contract Required",
"description": "Whether a contract is required",
"default": False
}
}
# Add missing fields to schema
for field in missing_fields:
if field in field_definitions:
properties[field] = field_definitions[field]
# Set default if we have extracted data
if extracted_data and field in extracted_data:
properties[field]["default"] = extracted_data[field]
# Mark as required if it's a critical field
critical_fields = ["vendor_name"]
if field in critical_fields:
required.append(field)
schema = {
"type": "object",
"properties": properties,
"required": required
}
return ElicitationRequest(
mode=ElicitationMode.FORM,
message=f"I need some additional information to create the vendor. Please provide the following details:",
requested_schema=schema
)
def create_sensitive_info_elicitation(
info_type: str,
redirect_url: str
) -> ElicitationRequest:
"""
Create a URL mode elicitation for sensitive information.
Args:
info_type: Type of sensitive info needed
redirect_url: URL to redirect user to
Returns:
ElicitationRequest for URL mode
"""
return ElicitationRequest(
mode=ElicitationMode.URL,
message=f"Please provide your {info_type} through the secure portal.",
url=redirect_url,
elicitation_id=str(uuid.uuid4())
)
def should_use_mcp_elicitation(
client_capabilities: Optional[Dict[str, Any]] = None,
extraction_confidence: float = 0.0,
missing_critical_fields: bool = False,
sensitive_data_needed: bool = False
) -> bool:
"""
Determine if we should use official MCP elicitation.
Args:
client_capabilities: Client's elicitation capabilities
extraction_confidence: Confidence score from LLM extraction
missing_critical_fields: Whether critical fields are missing
sensitive_data_needed: Whether sensitive data is required
Returns:
True if MCP elicitation should be used
"""
# Check if client supports elicitation
if not client_capabilities or "elicitation" not in client_capabilities:
return False
# Use MCP elicitation for sensitive data
if sensitive_data_needed:
return True
# Use MCP elicitation if LLM extraction confidence is low
if extraction_confidence < 0.7:
return True
# Use MCP elicitation if critical fields are missing
if missing_critical_fields:
return True
# Otherwise, stick with LLM extraction
return False
def merge_elicitation_data(
extracted_data: Dict[str, Any],
elicitation_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Merge data from LLM extraction and MCP elicitation.
Args:
extracted_data: Data from LLM extraction
elicitation_data: Data from MCP elicitation
Returns:
Merged data dictionary
"""
merged = extracted_data.copy()
merged.update(elicitation_data)
return merged
def validate_elicitation_response(
response: ElicitationResponse,
expected_schema: Dict[str, Any]
) -> List[str]:
"""
Validate elicitation response against schema.
Args:
response: Elicitation response to validate
expected_schema: JSON schema to validate against
Returns:
List of validation errors
"""
errors = []
if response.action != ElicitationAction.ACCEPT:
return ["User declined the elicitation request"]
if not response.content:
return ["No data provided in response"]
# Basic validation against schema
required = expected_schema.get("required", [])
properties = expected_schema.get("properties", {})
for field in required:
if field not in response.content:
errors.append(f"Required field '{field}' is missing")
for field, value in response.content.items():
if field in properties:
field_schema = properties[field]
field_type = field_schema.get("type")
# Type validation
if field_type == "string" and not isinstance(value, str):
errors.append(f"Field '{field}' must be a string")
elif field_type == "integer" and not isinstance(value, int):
errors.append(f"Field '{field}' must be an integer")
elif field_type == "boolean" and not isinstance(value, bool):
errors.append(f"Field '{field}' must be a boolean")
# Format validation
field_format = field_schema.get("format")
if field_format == "email" and isinstance(value, str):
if "@" not in value or "." not in value:
errors.append(f"Field '{field}' must be a valid email")
elif field_format == "uri" and isinstance(value, str):
if not (value.startswith("http://") or value.startswith("https://")):
errors.append(f"Field '{field}' must be a valid URL")
return errors