"""
Slot Resolver
Core resolution engine that orchestrates the multi-stage resolution process.
Implements the waterfall strategy: Normalization → Exact → Alias → Fuzzy → Decisioning
"""
import logging
import time
from typing import Optional
from slot_resolution.core.models import (
SlotResolutionRequest,
SlotResolutionResponse,
ResolutionStatus,
ResolvedEntity,
ResolutionMethod
)
from slot_resolution.core.normalizer import InputNormalizer
from slot_resolution.core.decisioning import DecisionEngine
from slot_resolution.services.exact_match_service import ExactMatchService
from slot_resolution.services.alias_service import AliasService
from slot_resolution.services.elasticsearch_service import ElasticsearchMatchingService
from slot_resolution.services.cache_service import CacheService
from slot_resolution.services.audit_service import AuditService
from slot_resolution.utils.exceptions import ValidationError
logger = logging.getLogger(__name__)
class SlotResolver:
"""
Core slot resolution engine.
Orchestrates the multi-stage resolution process:
1. Input normalization
2. Cache lookup
3. Exact match
4. Alias match
5. Elasticsearch fuzzy match
6. Decisioning logic
7. Audit logging
"""
def __init__(
self,
es_service: ElasticsearchMatchingService,
exact_match_service: Optional[ExactMatchService] = None,
alias_service: Optional[AliasService] = None,
cache_service: Optional[CacheService] = None,
audit_service: Optional[AuditService] = None,
decision_engine: Optional[DecisionEngine] = None
):
"""
Initialize the slot resolver.
Args:
es_service: Elasticsearch matching service (required)
exact_match_service: Exact match service (optional)
alias_service: Alias service (optional)
cache_service: Cache service (optional)
audit_service: Audit service (optional)
decision_engine: Decision engine (optional, uses default if None)
"""
self.es_service = es_service
self.exact_match_service = exact_match_service or ExactMatchService()
self.alias_service = alias_service or AliasService()
self.cache_service = cache_service
self.audit_service = audit_service
self.decision_engine = decision_engine or DecisionEngine()
logger.info("SlotResolver initialized with all services")
async def resolve(
self,
request: SlotResolutionRequest,
user_id: Optional[str] = None
) -> SlotResolutionResponse:
"""
Resolve a slot to an entity ID.
Args:
request: Slot resolution request
user_id: Optional user identifier for audit logging
Returns:
SlotResolutionResponse with resolution result
"""
start_time = time.time()
try:
# Stage 1: Normalization
normalized_query = InputNormalizer.normalize(request.input)
logger.info(
f"Resolving slot: tenant={request.tenant_id}, "
f"entity_type={request.entity_type}, "
f"input='{request.input}', normalized='{normalized_query}'"
)
# Stage 2: Cache lookup
if self.cache_service:
cached_response = self.cache_service.get(
tenant_id=request.tenant_id,
entity_type=request.entity_type,
normalized_query=normalized_query
)
if cached_response:
logger.info(f"Cache hit for '{normalized_query}'")
return cached_response
# Stage 3: Exact match
exact_match = await self.exact_match_service.find_exact_match(
entity_type=request.entity_type,
normalized_query=normalized_query,
tenant_id=request.tenant_id,
filters=request.context
)
if exact_match:
logger.info(f"Exact match found for '{normalized_query}'")
response = SlotResolutionResponse(
status=ResolutionStatus.RESOLVED,
resolved=exact_match,
input_echo=request.input,
normalization=normalized_query,
method=ResolutionMethod.EXACT_MATCH
)
self._finalize_response(request, response, start_time, user_id)
return response
# Stage 4: Alias match
canonical_form = self.alias_service.resolve_alias(
entity_type=request.entity_type,
normalized_query=normalized_query
)
if canonical_form:
logger.info(
f"Alias resolved: '{normalized_query}' -> '{canonical_form}'"
)
# Retry exact match with canonical form
exact_match = await self.exact_match_service.find_exact_match(
entity_type=request.entity_type,
normalized_query=canonical_form,
tenant_id=request.tenant_id,
filters=request.context
)
if exact_match:
response = SlotResolutionResponse(
status=ResolutionStatus.RESOLVED,
resolved=exact_match,
input_echo=request.input,
normalization=canonical_form,
method=ResolutionMethod.ALIAS_MATCH
)
self._finalize_response(request, response, start_time, user_id)
return response
# Use canonical form for fuzzy search
normalized_query = canonical_form
# Stage 5: Elasticsearch fuzzy match
candidates = await self.es_service.fuzzy_search(
entity_type=request.entity_type,
query=normalized_query,
filters=request.context,
limit=request.limit
)
# Stage 6: Decisioning
response = self.decision_engine.decide(
candidates=candidates,
entity_type=request.entity_type,
input_query=request.input,
normalized_query=normalized_query,
min_score_override=request.min_score,
score_gap_delta_override=request.score_gap_delta
)
# Finalize and return
self._finalize_response(request, response, start_time, user_id)
return response
except ValidationError as e:
logger.warning(f"Validation error: {e}")
return SlotResolutionResponse(
status=ResolutionStatus.INVALID_REQUEST,
input_echo=request.input,
normalization="",
error=str(e)
)
except Exception as e:
logger.error(f"Error during slot resolution: {e}", exc_info=True)
return SlotResolutionResponse(
status=ResolutionStatus.SERVICE_ERROR,
input_echo=request.input,
normalization="",
error=f"Internal error: {str(e)}"
)
def _finalize_response(
self,
request: SlotResolutionRequest,
response: SlotResolutionResponse,
start_time: float,
user_id: Optional[str]
):
"""
Finalize response with latency, caching, and audit logging.
Args:
request: Original request
response: Response to finalize
start_time: Request start time
user_id: Optional user identifier
"""
# Calculate latency
latency_ms = int((time.time() - start_time) * 1000)
response.latency_ms = latency_ms
# Cache successful resolutions
if self.cache_service and response.status == ResolutionStatus.RESOLVED:
self.cache_service.put(
tenant_id=request.tenant_id,
entity_type=request.entity_type,
normalized_query=response.normalization,
response=response
)
# Audit log
if self.audit_service:
self.audit_service.log_resolution(
tenant_id=request.tenant_id,
user_id=user_id,
entity_type=request.entity_type,
input_query=request.input,
response=response,
latency_ms=latency_ms
)
logger.info(
f"Resolution completed: status={response.status.value}, "
f"latency={latency_ms}ms"
)