Skip to main content
Glama
ShivamPansuriya

Dynamic Per-User Tool Generation MCP Server

elasticsearch_service.py9.1 kB
""" Elasticsearch Matching Service Handles fuzzy matching using the existing elasticsearch_search_lib. Converts search results to Candidate objects for decisioning. """ import logging from typing import List, Dict, Any, Optional from elasticsearch_search_lib import SearchClient, SearchResponse from slot_resolution.core.models import Candidate logger = logging.getLogger(__name__) class ElasticsearchMatchingService: """ Service for fuzzy matching using Elasticsearch. Integrates with the existing elasticsearch_search_lib to perform fuzzy searches and convert results to Candidate objects. """ def __init__(self, search_client: SearchClient): """ Initialize the Elasticsearch matching service. Args: search_client: Configured SearchClient instance """ self.search_client = search_client logger.info("ElasticsearchMatchingService initialized") async def fuzzy_search( self, entity_type: str, query: str, filters: Optional[Dict[str, Any]] = None, limit: int = 5 ) -> List[Candidate]: """ Perform fuzzy search for an entity. Args: entity_type: Type of entity to search (e.g., "impact", "user") query: Normalized search query filters: Optional contextual filters (e.g., {"userType": "technician"}) limit: Maximum number of candidates to return Returns: List of Candidate objects sorted by confidence (descending) """ try: logger.debug( f"Fuzzy search: entity_type={entity_type}, query='{query}', " f"filters={filters}, limit={limit}" ) # Execute search using elasticsearch_search_lib search_response: SearchResponse = await self.search_client.search( entity_type=entity_type, query=query, limit=limit ) # Check if search was successful if not search_response.success: logger.warning( f"Search failed for entity_type={entity_type}, query='{query}': " f"{search_response.error}" ) return [] # Convert search results to candidates candidates = self._convert_to_candidates( search_response=search_response, entity_type=entity_type, filters=filters ) logger.info( f"Fuzzy search completed: found {len(candidates)} candidates " f"for '{query}' in {entity_type}" ) return candidates except Exception as e: logger.error( f"Error during fuzzy search for entity_type={entity_type}, " f"query='{query}': {e}", exc_info=True ) return [] def _convert_to_candidates( self, search_response: SearchResponse, entity_type: str, filters: Optional[Dict[str, Any]] = None ) -> List[Candidate]: """ Convert SearchResponse to list of Candidate objects. Args: search_response: Response from elasticsearch_search_lib entity_type: Type of entity filters: Optional filters to apply post-search Returns: List of Candidate objects """ candidates = [] for item in search_response.items: # Apply filters if specified # if filters and not self._matches_filters(item.data, filters): # logger.debug( # f"Filtering out candidate {item.data.get('id')} " # f"due to filter mismatch: {filters}" # ) # continue # Extract entity ID and name entity_id = self._extract_entity_id(item.data, entity_type) canonical_name = self._extract_canonical_name(item.data, entity_type) if not entity_id or not canonical_name: logger.warning( f"Skipping result with missing id or name: {item.data}" ) continue # Normalize score to 0-1 range # Elasticsearch scores are typically 0-10+, we normalize to 0-1 confidence = min(item.score / 10.0, 1.0) # Extract salient attributes for disambiguation attributes = self._extract_salient_attributes(item.data, entity_type) candidate = Candidate( id=str(entity_id), canonical_name=canonical_name, entity_type=entity_type, confidence=confidence, attributes=attributes ) candidates.append(candidate) # Sort by confidence descending candidates.sort(key=lambda c: c.confidence, reverse=True) return candidates def _matches_filters(self, data: Dict[str, Any], filters: Dict[str, Any]) -> bool: """ Check if data matches all specified filters. Args: data: Entity data from Elasticsearch filters: Filters to apply Returns: True if all filters match, False otherwise """ for filter_key, filter_value in filters.items(): # Convert filter key to field name (e.g., "userType" -> "user_type") field_name = self._to_snake_case(filter_key) # Check if field exists and matches if field_name not in data: logger.debug(f"Filter field '{field_name}' not found in data") return False if data[field_name] != filter_value: logger.debug( f"Filter mismatch: {field_name}={data[field_name]} " f"!= {filter_value}" ) return False return True def _extract_entity_id(self, data: Dict[str, Any], entity_type: str) -> Optional[str]: """Extract entity ID from data.""" # Try common ID field patterns id_fields = [ "id", "dbid", f"{entity_type}_id", f"{entity_type}Id" ] for field in id_fields: if field in data and data[field]: return str(data[field]) return None def _extract_canonical_name(self, data: Dict[str, Any], entity_type: str) -> Optional[str]: """Extract canonical name from data.""" # Try common name field patterns name_fields = [ f"{entity_type}_name", f"{entity_type}Name", "name", "title" ] for field in name_fields: if field in data and data[field]: return str(data[field]) return None def _extract_salient_attributes( self, data: Dict[str, Any], entity_type: str ) -> Dict[str, Any]: """ Extract salient attributes for disambiguation. Args: data: Entity data entity_type: Type of entity Returns: Dictionary of salient attributes """ attributes = {} # Entity-specific attribute extraction if entity_type == "user": # For users, include email domain, department if "email" in data: email = data["email"] attributes["emailDomain"] = email.split("@")[1] if "@" in email else None if "department" in data: attributes["department"] = data["department"] if "user_type" in data: attributes["userType"] = data["user_type"] elif entity_type in ["location", "department"]: # For hierarchical entities, include parent if "parent_name" in data: attributes["parent"] = data["parent_name"] if "parent_id" in data: attributes["parentId"] = data["parent_id"] elif entity_type in ["status", "category", "source"]: # For model-based entities, include model if "model" in data: attributes["model"] = data["model"] # Always include ID field if present if f"{entity_type}_id" in data: attributes[f"{entity_type}Id"] = data[f"{entity_type}_id"] return attributes @staticmethod def _to_snake_case(camel_case: str) -> str: """Convert camelCase to snake_case.""" import re return re.sub(r'(?<!^)(?=[A-Z])', '_', camel_case).lower()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ShivamPansuriya/MCP-server-Python'

If you have feedback or need assistance with the MCP directory API, please join our Discord server