Skip to main content
Glama
ShivamPansuriya

Dynamic Per-User Tool Generation MCP Server

resolver.py9.11 kB
""" Slot Resolver Core resolution engine that orchestrates the multi-stage resolution process. Implements the waterfall strategy: Normalization → Exact → Alias → Fuzzy → Decisioning """ import logging import time from typing import Optional from slot_resolution.core.models import ( SlotResolutionRequest, SlotResolutionResponse, ResolutionStatus, ResolvedEntity, ResolutionMethod ) from slot_resolution.core.normalizer import InputNormalizer from slot_resolution.core.decisioning import DecisionEngine from slot_resolution.services.exact_match_service import ExactMatchService from slot_resolution.services.alias_service import AliasService from slot_resolution.services.elasticsearch_service import ElasticsearchMatchingService from slot_resolution.services.cache_service import CacheService from slot_resolution.services.audit_service import AuditService from slot_resolution.utils.exceptions import ValidationError logger = logging.getLogger(__name__) class SlotResolver: """ Core slot resolution engine. Orchestrates the multi-stage resolution process: 1. Input normalization 2. Cache lookup 3. Exact match 4. Alias match 5. Elasticsearch fuzzy match 6. Decisioning logic 7. Audit logging """ def __init__( self, es_service: ElasticsearchMatchingService, exact_match_service: Optional[ExactMatchService] = None, alias_service: Optional[AliasService] = None, cache_service: Optional[CacheService] = None, audit_service: Optional[AuditService] = None, decision_engine: Optional[DecisionEngine] = None ): """ Initialize the slot resolver. Args: es_service: Elasticsearch matching service (required) exact_match_service: Exact match service (optional) alias_service: Alias service (optional) cache_service: Cache service (optional) audit_service: Audit service (optional) decision_engine: Decision engine (optional, uses default if None) """ self.es_service = es_service self.exact_match_service = exact_match_service or ExactMatchService() self.alias_service = alias_service or AliasService() self.cache_service = cache_service self.audit_service = audit_service self.decision_engine = decision_engine or DecisionEngine() logger.info("SlotResolver initialized with all services") async def resolve( self, request: SlotResolutionRequest, user_id: Optional[str] = None ) -> SlotResolutionResponse: """ Resolve a slot to an entity ID. Args: request: Slot resolution request user_id: Optional user identifier for audit logging Returns: SlotResolutionResponse with resolution result """ start_time = time.time() try: # Stage 1: Normalization normalized_query = InputNormalizer.normalize(request.input) logger.info( f"Resolving slot: tenant={request.tenant_id}, " f"entity_type={request.entity_type}, " f"input='{request.input}', normalized='{normalized_query}'" ) # Stage 2: Cache lookup if self.cache_service: cached_response = self.cache_service.get( tenant_id=request.tenant_id, entity_type=request.entity_type, normalized_query=normalized_query ) if cached_response: logger.info(f"Cache hit for '{normalized_query}'") return cached_response # Stage 3: Exact match exact_match = await self.exact_match_service.find_exact_match( entity_type=request.entity_type, normalized_query=normalized_query, tenant_id=request.tenant_id, filters=request.context ) if exact_match: logger.info(f"Exact match found for '{normalized_query}'") response = SlotResolutionResponse( status=ResolutionStatus.RESOLVED, resolved=exact_match, input_echo=request.input, normalization=normalized_query, method=ResolutionMethod.EXACT_MATCH ) self._finalize_response(request, response, start_time, user_id) return response # Stage 4: Alias match canonical_form = self.alias_service.resolve_alias( entity_type=request.entity_type, normalized_query=normalized_query ) if canonical_form: logger.info( f"Alias resolved: '{normalized_query}' -> '{canonical_form}'" ) # Retry exact match with canonical form exact_match = await self.exact_match_service.find_exact_match( entity_type=request.entity_type, normalized_query=canonical_form, tenant_id=request.tenant_id, filters=request.context ) if exact_match: response = SlotResolutionResponse( status=ResolutionStatus.RESOLVED, resolved=exact_match, input_echo=request.input, normalization=canonical_form, method=ResolutionMethod.ALIAS_MATCH ) self._finalize_response(request, response, start_time, user_id) return response # Use canonical form for fuzzy search normalized_query = canonical_form # Stage 5: Elasticsearch fuzzy match candidates = await self.es_service.fuzzy_search( entity_type=request.entity_type, query=normalized_query, filters=request.context, limit=request.limit ) # Stage 6: Decisioning response = self.decision_engine.decide( candidates=candidates, entity_type=request.entity_type, input_query=request.input, normalized_query=normalized_query, min_score_override=request.min_score, score_gap_delta_override=request.score_gap_delta ) # Finalize and return self._finalize_response(request, response, start_time, user_id) return response except ValidationError as e: logger.warning(f"Validation error: {e}") return SlotResolutionResponse( status=ResolutionStatus.INVALID_REQUEST, input_echo=request.input, normalization="", error=str(e) ) except Exception as e: logger.error(f"Error during slot resolution: {e}", exc_info=True) return SlotResolutionResponse( status=ResolutionStatus.SERVICE_ERROR, input_echo=request.input, normalization="", error=f"Internal error: {str(e)}" ) def _finalize_response( self, request: SlotResolutionRequest, response: SlotResolutionResponse, start_time: float, user_id: Optional[str] ): """ Finalize response with latency, caching, and audit logging. Args: request: Original request response: Response to finalize start_time: Request start time user_id: Optional user identifier """ # Calculate latency latency_ms = int((time.time() - start_time) * 1000) response.latency_ms = latency_ms # Cache successful resolutions if self.cache_service and response.status == ResolutionStatus.RESOLVED: self.cache_service.put( tenant_id=request.tenant_id, entity_type=request.entity_type, normalized_query=response.normalization, response=response ) # Audit log if self.audit_service: self.audit_service.log_resolution( tenant_id=request.tenant_id, user_id=user_id, entity_type=request.entity_type, input_query=request.input, response=response, latency_ms=latency_ms ) logger.info( f"Resolution completed: status={response.status.value}, " f"latency={latency_ms}ms" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ShivamPansuriya/MCP-server-Python'

If you have feedback or need assistance with the MCP directory API, please join our Discord server