"""
Slot Resolution Middleware
Orchestrates the transformation of user requests from entity names to IDs.
Handles disambiguation and error scenarios.
"""
import logging
from typing import Dict, Any, Optional
from slot_resolution.config.config_loader import FieldMappingLoader
from slot_resolution.core.resolver import SlotResolver
from slot_resolution.core.models import (
SlotResolutionRequest,
ResolutionStatus,
DisambiguationRequest,
RequestTransformationResult
)
from slot_resolution.utils.exceptions import ValidationError
logger = logging.getLogger(__name__)
class SlotResolutionMiddleware:
"""
Middleware for transforming requests with entity names to requests with IDs.
This is the main entry point for slot resolution. It:
1. Loads field mappings for the module (request, problem, change, etc.)
2. Identifies fields that need resolution
3. Calls the slot resolver for each field
4. Transforms the request or returns disambiguation payload
"""
def __init__(
self,
tenant_id: str,
resolver: SlotResolver,
config_loader: Optional[FieldMappingLoader] = None
):
"""
Initialize the middleware.
Args:
tenant_id: Tenant identifier
resolver: Configured SlotResolver instance
config_loader: Field mapping configuration loader
"""
self.tenant_id = tenant_id
self.resolver = resolver
self.config_loader = config_loader or FieldMappingLoader()
logger.info(
f"SlotResolutionMiddleware initialized for tenant '{tenant_id}'"
)
async def resolve_request(
self,
request_payload: Dict[str, Any],
module: str = "request",
user_id: Optional[str] = None
) -> RequestTransformationResult:
"""
Resolve all entity name slots in a request to IDs.
Args:
request_payload: Incoming request with entity names
module: Module type (e.g., "request", "problem", "change")
user_id: Optional user identifier for audit logging
Returns:
RequestTransformationResult with transformed payload or disambiguation requests
Example:
Input:
{
"subject": "Laptop not working",
"impact": "high",
"assignee": "shivam"
}
Output (success):
{
"status": "READY",
"payload": {
"subject": "Laptop not working",
"impactId": 2,
"technicianId": 433
}
}
Output (disambiguation):
{
"status": "DISAMBIGUATION_REQUIRED",
"disambiguations": [
{
"field": "assignee",
"targetField": "technicianId",
"candidates": [...]
}
]
}
"""
try:
# Load field mappings for this module
field_mappings = self.config_loader.get_mapping_by_column(module)
logger.info(
f"Processing request for module '{module}' with "
f"{len(field_mappings)} configured fields"
)
# Initialize result containers
resolved_payload = {}
disambiguations = []
audit_trail = {}
# Process each field in the request
for field_name, field_value in request_payload.items():
# Check if this field needs resolution
if field_name not in field_mappings:
# Pass through non-entity fields
resolved_payload[field_name] = field_value
continue
mapping = field_mappings[field_name]
# If value is already an ID (integer), pass through
if isinstance(field_value, int):
resolved_payload[mapping.db_key] = field_value
audit_trail[field_name] = {
"status": "PASSED_THROUGH",
"reason": "Already an ID"
}
continue
# If value is a string (name), resolve it
if isinstance(field_value, str):
resolution_result = await self._resolve_field(
field_name=field_name,
field_value=field_value,
mapping=mapping,
user_id=user_id
)
if resolution_result["status"] == "RESOLVED":
# Success: Add resolved ID to payload
resolved_payload[mapping.db_key] = resolution_result["id"]
audit_trail[field_name] = {
"status": "RESOLVED",
"original_value": field_value,
"resolved_id": resolution_result["id"],
"resolved_name": resolution_result["name"],
"confidence": resolution_result["confidence"]
}
elif resolution_result["status"] == "MULTIPLE_MATCHES":
# Disambiguation needed
disambiguations.append(
DisambiguationRequest(
field=field_name,
target_field=mapping.db_key,
candidates=resolution_result["candidates"],
original_input=field_value
)
)
audit_trail[field_name] = {
"status": "DISAMBIGUATION_REQUIRED",
"candidate_count": len(resolution_result["candidates"])
}
elif resolution_result["status"] == "NO_MATCH":
# No match found
if mapping.required:
raise ValidationError(
f"Required field '{field_name}' could not be resolved: "
f"{resolution_result['error']}"
)
else:
# Optional field, skip it
audit_trail[field_name] = {
"status": "NO_MATCH",
"error": resolution_result["error"]
}
else:
# Other error
raise ValidationError(
f"Failed to resolve field '{field_name}': "
f"{resolution_result.get('error', 'Unknown error')}"
)
# Return result
if disambiguations:
return RequestTransformationResult(
status="DISAMBIGUATION_REQUIRED",
payload=resolved_payload,
disambiguations=disambiguations,
audit_trail=audit_trail
)
else:
return RequestTransformationResult(
status="READY",
payload=resolved_payload,
audit_trail=audit_trail
)
except Exception as e:
logger.error(f"Error during request transformation: {e}", exc_info=True)
raise
async def _resolve_field(
self,
field_name: str,
field_value: str,
mapping: Any,
user_id: Optional[str]
) -> Dict[str, Any]:
"""
Resolve a single field value to an entity ID.
Args:
field_name: Name of the field
field_value: Value to resolve
mapping: Field mapping configuration
user_id: Optional user identifier
Returns:
Dictionary with resolution result
"""
# Create resolution request
resolution_request = SlotResolutionRequest(
tenant_id=self.tenant_id,
entity_type=mapping.reference_to,
input=field_value,
context=mapping.filters,
limit=5
)
# Resolve
response = await self.resolver.resolve(
request=resolution_request,
user_id=user_id
)
# Convert response to result dictionary
if response.status == ResolutionStatus.RESOLVED:
return {
"status": "RESOLVED",
"id": response.resolved.id,
"name": response.resolved.canonical_name,
"confidence": response.resolved.confidence
}
elif response.status == ResolutionStatus.MULTIPLE_MATCHES:
return {
"status": "MULTIPLE_MATCHES",
"candidates": response.candidates
}
elif response.status == ResolutionStatus.NO_MATCH:
return {
"status": "NO_MATCH",
"error": response.error
}
else:
return {
"status": "ERROR",
"error": response.error or "Unknown error"
}