"""
Elasticsearch Matching Service
Handles fuzzy matching using the existing elasticsearch_search_lib.
Converts search results to Candidate objects for decisioning.
"""
import logging
from typing import List, Dict, Any, Optional
from elasticsearch_search_lib import SearchClient, SearchResponse
from slot_resolution.core.models import Candidate
logger = logging.getLogger(__name__)
class ElasticsearchMatchingService:
"""
Service for fuzzy matching using Elasticsearch.
Integrates with the existing elasticsearch_search_lib to perform
fuzzy searches and convert results to Candidate objects.
"""
def __init__(self, search_client: SearchClient):
"""
Initialize the Elasticsearch matching service.
Args:
search_client: Configured SearchClient instance
"""
self.search_client = search_client
logger.info("ElasticsearchMatchingService initialized")
async def fuzzy_search(
self,
entity_type: str,
query: str,
filters: Optional[Dict[str, Any]] = None,
limit: int = 5
) -> List[Candidate]:
"""
Perform fuzzy search for an entity.
Args:
entity_type: Type of entity to search (e.g., "impact", "user")
query: Normalized search query
filters: Optional contextual filters (e.g., {"userType": "technician"})
limit: Maximum number of candidates to return
Returns:
List of Candidate objects sorted by confidence (descending)
"""
try:
logger.debug(
f"Fuzzy search: entity_type={entity_type}, query='{query}', "
f"filters={filters}, limit={limit}"
)
# Execute search using elasticsearch_search_lib
search_response: SearchResponse = await self.search_client.search(
entity_type=entity_type,
query=query,
limit=limit
)
# Check if search was successful
if not search_response.success:
logger.warning(
f"Search failed for entity_type={entity_type}, query='{query}': "
f"{search_response.error}"
)
return []
# Convert search results to candidates
candidates = self._convert_to_candidates(
search_response=search_response,
entity_type=entity_type,
filters=filters
)
logger.info(
f"Fuzzy search completed: found {len(candidates)} candidates "
f"for '{query}' in {entity_type}"
)
return candidates
except Exception as e:
logger.error(
f"Error during fuzzy search for entity_type={entity_type}, "
f"query='{query}': {e}",
exc_info=True
)
return []
def _convert_to_candidates(
self,
search_response: SearchResponse,
entity_type: str,
filters: Optional[Dict[str, Any]] = None
) -> List[Candidate]:
"""
Convert SearchResponse to list of Candidate objects.
Args:
search_response: Response from elasticsearch_search_lib
entity_type: Type of entity
filters: Optional filters to apply post-search
Returns:
List of Candidate objects
"""
candidates = []
for item in search_response.items:
# Apply filters if specified
# if filters and not self._matches_filters(item.data, filters):
# logger.debug(
# f"Filtering out candidate {item.data.get('id')} "
# f"due to filter mismatch: {filters}"
# )
# continue
# Extract entity ID and name
entity_id = self._extract_entity_id(item.data, entity_type)
canonical_name = self._extract_canonical_name(item.data, entity_type)
if not entity_id or not canonical_name:
logger.warning(
f"Skipping result with missing id or name: {item.data}"
)
continue
# Normalize score to 0-1 range
# Elasticsearch scores are typically 0-10+, we normalize to 0-1
confidence = min(item.score / 10.0, 1.0)
# Extract salient attributes for disambiguation
attributes = self._extract_salient_attributes(item.data, entity_type)
candidate = Candidate(
id=str(entity_id),
canonical_name=canonical_name,
entity_type=entity_type,
confidence=confidence,
attributes=attributes
)
candidates.append(candidate)
# Sort by confidence descending
candidates.sort(key=lambda c: c.confidence, reverse=True)
return candidates
def _matches_filters(self, data: Dict[str, Any], filters: Dict[str, Any]) -> bool:
"""
Check if data matches all specified filters.
Args:
data: Entity data from Elasticsearch
filters: Filters to apply
Returns:
True if all filters match, False otherwise
"""
for filter_key, filter_value in filters.items():
# Convert filter key to field name (e.g., "userType" -> "user_type")
field_name = self._to_snake_case(filter_key)
# Check if field exists and matches
if field_name not in data:
logger.debug(f"Filter field '{field_name}' not found in data")
return False
if data[field_name] != filter_value:
logger.debug(
f"Filter mismatch: {field_name}={data[field_name]} "
f"!= {filter_value}"
)
return False
return True
def _extract_entity_id(self, data: Dict[str, Any], entity_type: str) -> Optional[str]:
"""Extract entity ID from data."""
# Try common ID field patterns
id_fields = [
"id",
"dbid",
f"{entity_type}_id",
f"{entity_type}Id"
]
for field in id_fields:
if field in data and data[field]:
return str(data[field])
return None
def _extract_canonical_name(self, data: Dict[str, Any], entity_type: str) -> Optional[str]:
"""Extract canonical name from data."""
# Try common name field patterns
name_fields = [
f"{entity_type}_name",
f"{entity_type}Name",
"name",
"title"
]
for field in name_fields:
if field in data and data[field]:
return str(data[field])
return None
def _extract_salient_attributes(
self,
data: Dict[str, Any],
entity_type: str
) -> Dict[str, Any]:
"""
Extract salient attributes for disambiguation.
Args:
data: Entity data
entity_type: Type of entity
Returns:
Dictionary of salient attributes
"""
attributes = {}
# Entity-specific attribute extraction
if entity_type == "user":
# For users, include email domain, department
if "email" in data:
email = data["email"]
attributes["emailDomain"] = email.split("@")[1] if "@" in email else None
if "department" in data:
attributes["department"] = data["department"]
if "user_type" in data:
attributes["userType"] = data["user_type"]
elif entity_type in ["location", "department"]:
# For hierarchical entities, include parent
if "parent_name" in data:
attributes["parent"] = data["parent_name"]
if "parent_id" in data:
attributes["parentId"] = data["parent_id"]
elif entity_type in ["status", "category", "source"]:
# For model-based entities, include model
if "model" in data:
attributes["model"] = data["model"]
# Always include ID field if present
if f"{entity_type}_id" in data:
attributes[f"{entity_type}Id"] = data[f"{entity_type}_id"]
return attributes
@staticmethod
def _to_snake_case(camel_case: str) -> str:
"""Convert camelCase to snake_case."""
import re
return re.sub(r'(?<!^)(?=[A-Z])', '_', camel_case).lower()