Skip to main content
Glama
match.py28.4 kB
import logging import yaml import json from src.constants import ACTIVITY_CLIENT from src.env import RELTIO_TENANT from src.util.api import get_reltio_url, http_request, create_error_response, validate_connection_security from src.util.auth import get_reltio_headers from src.util.exceptions import SecurityError from src.util.models import MatchScoreRequest, ConfidenceLevelRequest, GetTotalMatchesRequest, GetMatchFacetsRequest, UnifiedMatchRequest, GetPotentialMatchApisRequest from src.util.activity_log import ActivityLog from src.tools.util import ActivityLogLabel # Configure logging logger = logging.getLogger("mcp.server.reltio") async def find_matches_by_match_score(start_match_score: int = 0, end_match_score: int = 100, entity_type: str = "Individual", tenant_id: str = RELTIO_TENANT, max_results: int = 10, offset: int = 0) -> dict: """Find all potential matches by match score range Args: start_match_score (int): Minimum match score to filter matches. Default to 0. end_match_score (int): Maximum match score to filter matches. Default to 100. entity_type (str): Entity type to filter by. Default to 'Individual'. tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. max_results (int): Maximum number of results to return. Default to 25. Returns: A dictionary containing the search results Raises: Exception: If there's an error getting the matches """ try: # Validate inputs using Pydantic model try: request = MatchScoreRequest( start_match_score=start_match_score, end_match_score=end_match_score, entity_type=entity_type, tenant_id=tenant_id, max_results=min(max_results, 10), offset=offset ) except ValueError as e: logger.warning(f"Validation error in find_matches_by_match_score: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid input parameters: {str(e)}" ) try: headers = get_reltio_headers() except Exception as e: logger.error(f"Authentication error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Special URL construction specifically for search endpoint url = get_reltio_url("entities/_search", "api", request.tenant_id) # Validate connection security try: validate_connection_security(url, headers) except SecurityError as e: logger.error(f"Security error: {str(e)}") return create_error_response( "SECURITY_ERROR", "Security requirements not met" ) # Use exact string from Postman to ensure identical formatting but with validated inputs filter_expression = f"(range(potentialMatches.matchScore,{request.start_match_score},{request.end_match_score}) and equals(type,'configuration/entityTypes/{request.entity_type}')" # Build the payload to exactly match the Postman request payload = { "filter": filter_expression, "select": "uri,label,type,relevanceScores", "max": min(request.max_results, 10), "offset": request.offset, "scoreEnabled": False, "options": "ovOnly", "activeness": "active" } try: result = http_request(url, method='POST', headers=headers, data=payload) except Exception as e: logger.error(f"API request error: {str(e)}") return create_error_response( "SERVER_ERROR", f"Failed to retrieve matches: {str(e)}" ) try: await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, description=f"find_matches_by_match_score_tool : Successfully fetched potential matches for entity type {request.entity_type} with match score between {request.start_match_score} and {request.end_match_score}" ) except Exception as log_error: logger.error(f"Activity logging failed for find_matches_by_match_score: {str(log_error)}") # Return appropriate response based on results if result and len(result) > 0: result=[{"uri":match["uri"],"label":match["label"],"type":match["type"]} for match in result] return yaml.dump(result,sort_keys=False) else: return { "message": f"No potential matches found for entity type {request.entity_type} with match score between {request.start_match_score} and {request.end_match_score}.", "results": [] } except Exception as e: # Log the error logger.error(f"Unexpected error in find_matches_by_match_score: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while searching for matches" ) async def find_matches_by_confidence(confidence_level: str = "Low confidence", entity_type: str = "Individual", tenant_id: str = RELTIO_TENANT, max_results: int = 10, offset: int = 0) -> dict: """Find all potential matches by confidence level Args: confidence_level (str): Confidence level for matches (e.g., 'Strong matches', 'Medium confidence', 'Low confidence', 'High confidence', 'Super strong matches'). Default to 'Low confidence'. entity_type (str): Entity type to filter by. Default to 'Individual'. tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. max_results (int): Maximum number of results to return. Default to 25. Returns: A dictionary containing the search results Raises: Exception: If there's an error getting the matches """ try: # Validate inputs using Pydantic model try: request = ConfidenceLevelRequest( confidence_level=confidence_level, entity_type=entity_type, tenant_id=tenant_id, max_results=min(max_results, 10), offset=offset ) except ValueError as e: logger.warning(f"Validation error in find_matches_by_confidence: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid input parameters: {str(e)}" ) try: headers = get_reltio_headers() except Exception as e: logger.error(f"Authentication error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Special URL construction specifically for search endpoint url = get_reltio_url("entities/_search", "api", request.tenant_id) # Validate connection security try: validate_connection_security(url, headers) except SecurityError as e: logger.error(f"Security error: {str(e)}") return create_error_response( "SECURITY_ERROR", "Security requirements not met" ) filter_expression = f"(equals(type,'configuration/entityTypes/{request.entity_type}') and equals(relevanceScores.actionLabel,'{request.confidence_level}')) and equals(type,'configuration/entityTypes/{request.entity_type}')" # Build the payload to exactly match the Postman request payload = { "filter": filter_expression, "select": "uri,label,type,relevanceScores", "max": min(request.max_results, 10), "offset": request.offset, "scoreEnabled": False, "options": "ovOnly", "activeness": "active" } try: result = http_request(url, method='POST', headers=headers, data=payload) except Exception as e: logger.error(f"API request error: {str(e)}") return create_error_response( "SERVER_ERROR", f"Failed to retrieve matches: {str(e)}" ) try: await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, description=f"find_matches_by_confidence_tool : Successfully fetched potential matches for entity type {request.entity_type} with confidence level {request.confidence_level}" ) except Exception as log_error: logger.error(f"Activity logging failed for find_matches_by_confidence: {str(log_error)}") # Return appropriate response based on results if result and len(result) > 0: result=[{"uri":match["uri"],"label":match["label"],"type":match["type"]} for match in result] return yaml.dump(result,sort_keys=False) else: return { "message": f"No potential matches found for entity type {request.entity_type} with confidence level {request.confidence_level}.", "results": [] } except Exception as e: # Log the error logger.error(f"Unexpected error in find_matches_by_confidence: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while searching for matches by confidence" ) async def get_total_matches(min_matches: int = 0, tenant_id: str = RELTIO_TENANT) -> dict: """Get the total count of potential matches in the tenant Args: min_matches (int): Minimum number of matches to filter by. Returns total count of entities with greater than this many matches. Default to 0. tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. Returns: A dictionary containing the total count of potential matches Raises: Exception: If there's an error getting the total matches count """ try: # Validate inputs using Pydantic model try: request = GetTotalMatchesRequest( min_matches=min_matches, tenant_id=tenant_id ) except ValueError as e: logger.warning(f"Validation error in get_total_matches: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid input parameters: {str(e)}" ) try: headers = get_reltio_headers() except Exception as e: logger.error(f"Authentication error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Construct URL for the total endpoint url = get_reltio_url("entities/_total", "api", request.tenant_id) # Validate connection security try: validate_connection_security(url, headers) except SecurityError as e: logger.error(f"Security error: {str(e)}") return create_error_response( "SECURITY_ERROR", "Security requirements not met" ) # Build the payload for the total request filter_expression = f"(gt(matches,'{request.min_matches}'))" payload = { "filter": filter_expression, "options": "searchByOv,ovOnly", "activeness": "active" } try: result = http_request(url, method='POST', headers=headers, data=payload) except Exception as e: logger.error(f"API request error: {str(e)}") return create_error_response( "SERVER_ERROR", f"Failed to retrieve total matches count: {str(e)}" ) # Return the total count if result and "total" in result: try: await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, description=f"get_total_matches : Found {result['total']} entities with more than {request.min_matches} potential matches." ) except Exception as log_error: logger.error(f"Activity logging failed for get_total_matches: {str(log_error)}") return { "total": result["total"], "min_matches": request.min_matches, "message": f"Found {result['total']} entities with more than {request.min_matches} potential matches." } else: return { "error": "RESPONSE_ERROR", "message": "API response did not contain a total count", "details": result } except Exception as e: # Log the error logger.error(f"Unexpected error in get_total_matches: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while getting total matches count" ) async def get_total_matches_by_entity_type(min_matches: int = 0, tenant_id: str = RELTIO_TENANT) -> dict: """Get the facet counts of potential matches by entity type Args: min_matches (int): Minimum number of matches to filter by. Returns facet counts of entities with greater than this many matches. Default to 0. tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. Returns: A dictionary containing the counts of potential matches by entity type Raises: Exception: If there's an error getting the match facets """ try: # Validate inputs using Pydantic model try: request = GetMatchFacetsRequest( min_matches=min_matches, tenant_id=tenant_id ) except ValueError as e: logger.warning(f"Validation error in get_total_matches_by_entity_type: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid input parameters: {str(e)}" ) try: headers = get_reltio_headers() except Exception as e: logger.error(f"Authentication error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Construct the facets URL with query parameters # Need to escape % character in the filter query parameter filter_param = f"(gt(matches,'{request.min_matches}'))" # Using params dict for proper URL encoding params = { "activeness": "active", "filter": filter_param, "options": "searchByOv,ovOnly" } url = get_reltio_url("entities/_facets", "api", request.tenant_id) # Validate connection security try: validate_connection_security(url, headers) except SecurityError as e: logger.error(f"Security error: {str(e)}") return create_error_response( "SECURITY_ERROR", "Security requirements not met" ) # Fixed payload for facets payload = [{"fieldName": "type", "pageSize": 101, "pageNo": 1}] try: result = http_request(url, method='POST', headers=headers, data=payload, params=params) except Exception as e: logger.error(f"API request error: {str(e)}") return create_error_response( "SERVER_ERROR", f"Failed to retrieve match facets: {str(e)}" ) # Return the facet counts if result and "type" in result: try: await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, description=f"get_total_matches_by_entity_type : Found entities by type with more than {request.min_matches} potential matches." ) except Exception as log_error: logger.error(f"Activity logging failed for get_total_matches_by_entity_type: {str(log_error)}") return { "type_counts": result["type"], "min_matches": request.min_matches, "message": f"Found entities by type with more than {request.min_matches} potential matches." } else: return { "error": "RESPONSE_ERROR", "message": "API response did not contain facet counts", "details": result } except Exception as e: # Log the error logger.error(f"Unexpected error in get_total_matches_by_entity_type: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while getting match facets by entity type" ) async def find_potential_matches(search_type: str = "match_rule", filter: str = "", entity_type: str = "Individual", tenant_id: str = RELTIO_TENANT, max_results: int = 10, offset: int = 0, search_filters: str = "") -> dict: """Unified function to find all potential matches by match rule, score range, or confidence level Args: search_type (str): Type of search - 'match_rule' (default), 'score', or 'confidence' filter (str): Filter value based on search_type: - For 'match_rule': match rule ID (e.g., 'BaseRule05') - For 'score': comma-separated start,end range (e.g., '50,100') - For 'confidence': confidence level (e.g., 'High confidence') entity_type (str): Entity type to filter by. Default to 'Individual'. tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. max_results (int): Maximum number of results to return. Default to 10, and is capped at 10. offset (int): Starting index for paginated results. Use 0 for the first page. search_filters (str): Additional search filters to apply (e.g., 'equals(attributes.FirstName,'John')') Returns: A dictionary containing the search results Raises: Exception: If there's an error getting the matches """ try: # Validate inputs using Pydantic model try: request = UnifiedMatchRequest( search_type=search_type, filter=filter, entity_type=entity_type, tenant_id=tenant_id, max_results=min(max_results, 10), offset=offset, search_filters=search_filters ) except ValueError as e: logger.warning(f"Validation error in find_potential_matches: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid input parameters: {str(e)}" ) try: headers = get_reltio_headers() except Exception as e: logger.error(f"Authentication error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Special URL construction specifically for search endpoint url = get_reltio_url("entities/_search", "api", request.tenant_id) # Validate connection security try: validate_connection_security(url, headers) except SecurityError as e: logger.error(f"Security error: {str(e)}") return create_error_response( "SECURITY_ERROR", "Security requirements not met" ) # Build filter expression based on search type if request.search_type == "match_rule": # For match rule: equals(matchRules,<match_rule_id>) match_filter = request.filter.split('/')[-1] primary_filter = f"equals(matchRules,'configuration/entityTypes/{request.entity_type}/matchGroups/{match_filter}')" elif request.search_type == "score": # For score: range(potentialMatches.matchScore,start,end) start_score, end_score = request.filter.split(',') start_score = int(start_score.strip()) end_score = int(end_score.strip()) primary_filter = f"range(relevanceScores.relevance,{(start_score)/100},{(end_score)/100})" elif request.search_type == "confidence": # For confidence: equals(relevanceScores.actionLabel,'confidence_level') primary_filter = f"equals(relevanceScores.actionLabel,'{request.filter}')" # Combine primary filter with entity type filter entity_type_filter = f"equals(type,'configuration/entityTypes/{request.entity_type}')" # Combine all filters filter_parts = [primary_filter, entity_type_filter] # Add additional search filters if provided if request.search_filters.strip(): filter_parts.append(request.search_filters.strip()) # Create final filter expression with proper parentheses filter_expression = f"({' and '.join(filter_parts)})" # Build the payload to match the pattern from existing functions payload = { "filter": filter_expression, "select": "uri,label,type,relevanceScores", "max": min(request.max_results, 10), "offset": request.offset, "scoreEnabled": False, "options": "ovOnly", "activeness": "active" } try: result = http_request(url, method='POST', headers=headers, data=payload) except Exception as e: logger.error(f"API request error: {str(e)}") return create_error_response( "SERVER_ERROR", f"Failed to retrieve matches: {str(e)}" ) try: await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, label=ActivityLogLabel.USER_SEARCH.value, client_type=ACTIVITY_CLIENT, description=json.dumps({ "activity": { "query": f"filter={payload['filter']}", "search_type": request.search_type }, "version": '2.0' }) ) except Exception as log_error: logger.error(f"Activity logging failed for find_potential_matches: {str(log_error)}") # Return appropriate response based on results if result and len(result) > 0: result = [{"uri": match["uri"], "label": match["label"], "type": match["type"]} for match in result] return yaml.dump(result, sort_keys=False) else: return { "message": f"No potential matches found for entity type {request.entity_type} with {request.search_type} filter '{request.filter}'.", "results": [] } except Exception as e: # Log the error logger.error(f"Unexpected error in find_potential_matches: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while searching for matches" ) async def get_potential_match_apis(min_matches: int = 0, tenant_id: str = RELTIO_TENANT) -> dict: """Get total, entity-level, and match-rule-level counts of potential matches in the tenant Args: min_matches (int): Minimum number of matches to filter by. Returns total count of entities with greater than this many matches. Default to 0. tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. Returns: A dictionary containing total, entity-level, and match-rule-level counts Raises: Exception: If there's an error getting the potential matches counts """ try: # Validate inputs using Pydantic model try: request = GetPotentialMatchApisRequest( min_matches=min_matches, tenant_id=tenant_id ) except Exception as e: logger.error(f"Validation error in get_potential_match_apis: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid input parameters: {str(e)}" ) try: headers = get_reltio_headers() except Exception as e: logger.error(f"Authentication error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) url = get_reltio_url("entities/_facets", "api", request.tenant_id) try: validate_connection_security(url, headers) except SecurityError as e: logger.error(f"Security error: {str(e)}") return create_error_response( "SECURITY_ERROR", "Security requirements not met" ) filter_expression = f"(gt(matches,'{request.min_matches}'))" params = { "filter": filter_expression, "facet": "matchRules,type", "activeness": "active", "options": "searchByOv,ovOnly" } try: result = http_request(url, headers=headers, params=params) except Exception as e: logger.error(f"API request error: {str(e)}") return create_error_response( "SERVER_ERROR", f"Failed to retrieve potential match APIs: {str(e)}" ) # Process the result to calculate sum of type values if result and "type" in result: type_dict = result["type"] if isinstance(type_dict, dict): # Calculate sum of all values in the type dictionary type_sum = sum(type_dict.values()) # Add the sum to the result result["total_matches"] = type_sum try: await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, label=ActivityLogLabel.POTENTIAL_MATCHES_FOUND.value, client_type=ACTIVITY_CLIENT, description=f"get_potential_matches_stats_tool : Found total potential match with type and match rules with more than {request.min_matches} matches." ) except Exception as log_error: logger.error(f"Activity logging failed for get_potential_match_apis: {str(log_error)}") else: return { "error": "RESPONSE_ERROR", "message": "API response did not contain facet counts", "details": result } return result except Exception as e: logger.error(f"Unexpected error in get_potential_match_apis: {str(e)}") return create_error_response( "SERVER_ERROR", "An unexpected error occurred while getting potential match APIs" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/reltio-ai/reltio-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server