Skip to main content
Glama
entity.py66 kB
import logging from typing import List, Dict, Any, Optional import yaml import json import re from src.constants import ACTIVITY_CLIENT, MAX_RESULTS_LIMIT from src.env import RELTIO_TENANT from src.util.api import get_reltio_url, get_reltio_export_job_url, http_request, create_error_response, validate_connection_security from src.util.auth import get_reltio_headers from src.util.exceptions import SecurityError from src.util.models import ( EntityIdRequest, UpdateEntityAttributesRequest, MergeEntitiesRequest, RejectMatchRequest, UnmergeEntityRequest, EntityWithMatchesRequest, CreateEntitiesRequest, GetEntityParentsRequest ) from src.util.activity_log import ActivityLog from src.tools.util import ActivityLogLabel, simplify_reltio_attributes, slim_crosswalks, format_entity_matches, format_unified_entity_matches # Configure logging logger = logging.getLogger("mcp.server.reltio") def filter_entity(entity: Dict[str, Any], filter_field: Optional[Dict[str, List[str]]]) -> Dict[str, Any]: if filter_field is None: return entity def is_valid(value: Any) -> bool: return value is not None and not (isinstance(value, (str, list, dict, set)) and len(value) == 0) filtered_entity = {} for field, subfields in filter_field.items(): if field not in entity: continue value = entity[field] if not is_valid(value): continue # Handle subfield filtering for nested fields like "attributes" if isinstance(value, dict) and subfields: subvalue = { k: v for k, v in value.items() if k in subfields and is_valid(v) } if subvalue: filtered_entity[field] = subvalue elif isinstance(value, dict) and not subfields: # Include whole dict if subfields list is empty filtered_subdict = {k: v for k, v in value.items() if is_valid(v)} if filtered_subdict: filtered_entity[field] = filtered_subdict else: # For non-dict values (e.g., lists, strings, booleans) filtered_entity[field] = value return filtered_entity async def get_entity_details(entity_id: str, filter_field: Dict[str, List[str]] = None, tenant_id: str = RELTIO_TENANT) -> dict: """Get detailed information about a Reltio entity by ID Args: entity_id (str): The ID of the entity to retrieve filter_field (Dict[str, List[str]]): Optional dictionary to filter specific fields in the entity response tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. Returns: A dictionary containing the entity details Raises: Exception: If there's an error getting the entity details """ try: # Validate inputs using Pydantic model try: request = EntityIdRequest( entity_id=entity_id, tenant_id=tenant_id ) except ValueError as e: logger.warning(f"Validation error in get_entity_details: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid entity ID format: {str(e)}" ) # Construct URL with validated entity ID url = get_reltio_url(f"entities/{request.entity_id}", "api", request.tenant_id) try: headers = get_reltio_headers() # Validate connection security validate_connection_security(url, headers) except Exception as e: logger.error(f"Authentication or security error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Make the request with timeout try: entity = http_request(url, headers=headers) except Exception as e: logger.error(f"API request error: {str(e)}") # Check if it's a 404 error (entity not found) if "404" in str(e): return create_error_response( "RESOURCE_NOT_FOUND", f"Entity with ID {request.entity_id} not found" ) return create_error_response( "SERVER_ERROR", "Failed to retrieve entity details from Reltio API" ) # Try to log activity for success try: await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, label=ActivityLogLabel.USER_PROFILE_VIEW.value, client_type=ACTIVITY_CLIENT, description=json.dumps({"uri":f"entities/{entity_id.split('/')[-1]}","label":entity.get("label","")}), items=[{"objectUri":f"entities/{entity_id.split('/')[-1]}"}] ) except Exception as log_error: logger.error(f"Activity logging failed for get_entity_details: {str(log_error)}") filter_entity_data=filter_entity(entity, filter_field) if filter_field else entity result={"attributes":simplify_reltio_attributes(filter_entity_data.get("attributes",{}))} if "crosswalks" in filter_entity_data: result["crosswalks"]=slim_crosswalks(filter_entity_data["crosswalks"]) return yaml.dump(result, sort_keys=False) except Exception as e: # Log the error logger.error(f"Unexpected error in get_entity_details: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while retrieving entity details" ) async def update_entity_attributes(entity_id: str, updates: List[Dict[str, Any]],options:str = "",always_create_dcr:bool = False,change_request_id:str = None, overwrite_default_crosswalk_value:bool = True,tenant_id: str = RELTIO_TENANT) -> dict: """Update specific attributes of an entity in Reltio Args: entity_id (str): Entity ID to update updates (List[Dict[str, Any]]): List of update operations as per Reltio API spec options (str): Optional comma-separated list of options. Available options: - sendHidden: Include hidden attributes in the response - updateAttributeUpdateDates: Update the updateDate and singleAttributeUpdateDates timestamps - addRefAttrUriToCrosswalk: Add reference attribute URIs to crosswalks during updates Example: options="sendHidden,updateAttributeUpdateDates" always_create_dcr (bool): If true, creates a DCR without updating the entity.(TO BE USED MOST OF THE TIME, SKIP IF Changes Seem minimal) change_request_id (str): If provided, all changes will be added to the DCR with this ID instead of updating the entity directly. tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. Returns: A dictionary containing the API response Raises: Exception: If there's an error during the update """ try: # Validate request try: request = UpdateEntityAttributesRequest( entity_id=entity_id, updates=updates, options=options, tenant_id=tenant_id, always_create_dcr=always_create_dcr, change_request_id=change_request_id, overwrite_default_crosswalk_value=overwrite_default_crosswalk_value ) except ValueError as e: logger.warning(f"Validation error in update_entity_attributes: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid request format: {str(e)}" ) url = get_reltio_url(f"entities/{request.entity_id}/_update", "api", request.tenant_id) params={} if options and options.strip(): params["options"] = options if request.always_create_dcr: params["alwaysCreateDCR"] = request.always_create_dcr if request.change_request_id: params["changeRequestId"] = request.change_request_id if request.overwrite_default_crosswalk_value: params["overwriteDefaultCrosswalkValue"] = request.overwrite_default_crosswalk_value try: headers = get_reltio_headers() headers["Content-Type"] = "application/json" headers["Globalid"] = ACTIVITY_CLIENT validate_connection_security(url, headers) except Exception as e: logger.error(f"Authentication or security error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate or security requirements not met" ) try: result = http_request(url, method="POST", headers=headers, data=request.updates,params=params if params else None) except Exception as e: logger.error(f"API request error in update_entity_attributes: {str(e)}") if "404" in str(e): return create_error_response( "RESOURCE_NOT_FOUND", f"Entity with ID {request.entity_id} not found" ) return create_error_response( "SERVER_ERROR", f"Failed to update entity attributes in Reltio API- {str(e)}" ) return yaml.dump(result, sort_keys=False) except Exception as e: logger.error(f"Unexpected error in update_entity_attributes: {str(e)}") return create_error_response( "SERVER_ERROR", f"An unexpected error occurred while updating entity attributes- {str(e)}" ) async def get_entity_matches(entity_id: str, tenant_id: str = RELTIO_TENANT, max_results: int = 25) -> dict: """Find potential matches for a specific entity with detailed comparisons Args: entity_id (str): Entity ID to find matches for tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. max_results (int): Maximum number of results to return. Default to 25 Returns: A dictionary containing the source entity and potential matches Raises: Exception: If there's an error getting the potential matches for an entity """ try: # Validate inputs using Pydantic model try: request = EntityIdRequest( entity_id=entity_id, tenant_id=tenant_id ) # Validate max_results if max_results < 1: max_results = 1 elif max_results > MAX_RESULTS_LIMIT: max_results = MAX_RESULTS_LIMIT logger.info(f"Max results limited to {MAX_RESULTS_LIMIT} for entity matches") except ValueError as e: logger.warning(f"Validation error in get_entity_matches: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid entity ID format: {str(e)}" ) try: headers = get_reltio_headers() except Exception as e: logger.error(f"Authentication error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Use the _transitiveMatches endpoint for a specific entity url = get_reltio_url(f"entities/{request.entity_id}/_transitiveMatches", "api", request.tenant_id) # Validate connection security try: validate_connection_security(url, headers) except SecurityError as e: logger.error(f"Security error: {str(e)}") return create_error_response( "SECURITY_ERROR", "Security requirements not met" ) params = { "deep": 1, "markMatchedValues": "true", "sort": "score", "order": "desc", "activeness": "active", "limit": max_results } # Make the request with timeout try: matches_result = http_request(url, headers=headers, params=params) except Exception as e: logger.error(f"API request error: {str(e)}") # Check if it's a 404 error (entity not found) if "404" in str(e): return create_error_response( "RESOURCE_NOT_FOUND", f"Entity with ID {request.entity_id} not found" ) return create_error_response( "SERVER_ERROR", "Failed to retrieve matches from Reltio API" ) # Check if we found any matches if not matches_result or len(matches_result) == 0: return { "message": f"No potential matches found for entity {request.entity_id}.", "matches": [] } # Get the source entity source_url = get_reltio_url(f"entities/{request.entity_id}", "api", request.tenant_id) try: source_entity = http_request(source_url, headers=headers) except Exception as e: logger.error(f"Error retrieving source entity: {str(e)}") # We still have the matches, so return those with an error message about the source return { "message": f"Found matches but could not retrieve source entity details: {str(e)}", "matches": matches_result } # Combine results result = { "source_entity": request.entity_id, "matches": format_entity_matches(matches_result) } # Try to log activity for success try: source_entity_label = source_entity.get('label', '') if isinstance(source_entity, dict) else '' await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, description=f"get_entity_matches : Successfully fetched potential matches for entity: {entity_id}, label: {source_entity_label}" ) except Exception as log_error: logger.error(f"Activity logging failed for get_entity_matches: {str(log_error)}") return yaml.dump(result, sort_keys=False) except Exception as e: # Log the error logger.error(f"Unexpected error in get_entity_matches: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while retrieving entity matches" ) async def get_entity_match_history(entity_id: str, tenant_id: str = RELTIO_TENANT) -> dict: """Find the match history for a specific entity Args: entity_id (str): Entity ID to find matches for tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. Returns: A dictionary containing the source entity and match history for that entity Raises: Exception: If there's an error getting the match history for an entity """ try: # Validate inputs using Pydantic model try: request = EntityIdRequest( entity_id=entity_id, tenant_id=tenant_id ) except ValueError as e: logger.warning(f"Validation error in get_entity_match_history: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid entity ID format: {str(e)}" ) try: headers = get_reltio_headers() except Exception as e: logger.error(f"Authentication error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Use the _crosswalkTree endpoint for a specific entity url = get_reltio_url(f"entities/{request.entity_id}/_crosswalkTree", "api", request.tenant_id) # Validate connection security try: validate_connection_security(url, headers) except SecurityError as e: logger.error(f"Security error: {str(e)}") return create_error_response( "SECURITY_ERROR", "Security requirements not met" ) # Make the request with timeout try: match_history = http_request(url, headers=headers) except Exception as e: logger.error(f"API request error: {str(e)}") # Check if it's a 404 error (entity not found) if "404" in str(e): return create_error_response( "RESOURCE_NOT_FOUND", f"Entity with ID {request.entity_id} not found" ) return create_error_response( "SERVER_ERROR", "Failed to retrieve match history from Reltio API" ) # Check if we found any match history if not match_history or len(match_history) == 0: # Try to log activity for no results try: await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, label=ActivityLogLabel.USER_PROFILE_VIEW.value, client_type=ACTIVITY_CLIENT, description=json.dumps({"uri":f"entities/{entity_id.split('/')[-1]}","label":""}), items=[{"objectUri":f"entities/{entity_id.split('/')[-1]}"}] ) except Exception as log_error: logger.error(f"Activity logging failed for get_entity_match_history (no results): {str(log_error)}") return { "message": f"No match history found for entity {request.entity_id}.", "match_history": [] } # Get the source entity source_url = get_reltio_url(f"entities/{request.entity_id}", "api", request.tenant_id) try: source_entity = http_request(source_url, headers=headers) except Exception as e: logger.error(f"Error retrieving source entity: {str(e)}") # We still have the match history, so return those with an error message about the source return { "message": f"Found match history but could not retrieve source entity details: {str(e)}", "match_history": match_history } # Try to log activity for success try: crosswalk_uris = [] for cross_walk in match_history.get("crosswalks", []): crosswalk_uris.append(cross_walk.get("uri", "")) source_entity_label = source_entity.get('label', '') if isinstance(source_entity, dict) else '' await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, description=f"get_entity_match_history_tool : Successfully fetched match history for entity: {entity_id}, label: {source_entity_label}, crosswalk URIs: {crosswalk_uris}" ) except Exception as log_error: logger.error(f"Activity logging failed for get_entity_match_history: {str(log_error)}") return yaml.dump(match_history, sort_keys=False) except Exception as e: # Log the error logger.error(f"Unexpected error in get_entity_match_history: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while retrieving entity match history" ) async def merge_entities(entity_ids: List[str], tenant_id: str = RELTIO_TENANT) -> dict: """Merge two Reltio entities into one Args: entity_ids (List[str]): List of two entity IDs to merge tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. Returns: A dictionary containing the result of the merge operation Raises: Exception: If there's an error merging the entities """ try: # Validate inputs using Pydantic model try: request = MergeEntitiesRequest( entity_ids=entity_ids, tenant_id=tenant_id ) except ValueError as e: logger.warning(f"Validation error in merge_entities: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid entity IDs: {str(e)}" ) # Construct URL with validated entity IDs url = get_reltio_url(f"entities/_same", "api", request.tenant_id) try: headers = get_reltio_headers() headers["Globalid"] = ACTIVITY_CLIENT # Validate connection security validate_connection_security(url, headers) except Exception as e: logger.error(f"Authentication or security error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Prepare the payload for merging entities payload = request.entity_ids # Make the POST request try: merge_result = http_request( url, method='POST', data=payload, headers=headers ) except Exception as e: logger.error(f"API request error: {str(e)}") # Check for common errors if "404" in str(e): return create_error_response( "RESOURCE_NOT_FOUND", f"One or more entities not found" ) elif "400" in str(e): return create_error_response( "INVALID_REQUEST", f"Invalid merge request: {str(e)}" ) return create_error_response( "SERVER_ERROR", "Failed to merge entities" ) return merge_result except Exception as e: # Log the error logger.error(f"Unexpected error in merge_entities: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while merging entities" ) async def reject_entity_match(source_id: str, target_id: str, tenant_id: str = RELTIO_TENANT) -> dict: """Reject a potential match between two Reltio entities Args: source_id (str): The ID of the source entity target_id (str): The ID of the target entity to reject as a match tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. Returns: A dictionary containing the result of the reject operation Raises: Exception: If there's an error rejecting the match """ try: # Validate inputs using Pydantic model try: request = RejectMatchRequest( source_id=source_id, target_id=target_id, tenant_id=tenant_id ) except ValueError as e: logger.warning(f"Validation error in reject_entity_match: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid entity ID format: {str(e)}" ) # Construct URL with validated entity IDs base_url = get_reltio_url(f"entities/{request.source_id}/_notMatch", "api", request.tenant_id) # Add the target entity URI as a query parameter params = { "uri": f"entities/{request.target_id}" } try: headers = get_reltio_headers() headers["Globalid"] = ACTIVITY_CLIENT # Validate connection security validate_connection_security(base_url, headers) except Exception as e: logger.error(f"Authentication or security error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Make the POST request with URL parameters try: reject_result = http_request( base_url, method='POST', params=params, headers=headers ) except Exception as e: logger.error(f"API request error: {str(e)}") # Check for common errors if "404" in str(e): return create_error_response( "RESOURCE_NOT_FOUND", f"One or more entities not found" ) elif "400" in str(e): return create_error_response( "INVALID_REQUEST", f"Invalid reject match request: {str(e)}" ) return create_error_response( "SERVER_ERROR", "Failed to reject entity match" ) # If we reach here, the operation was successful # The API might not return any content, so create a meaningful response if not reject_result: return { "success": True, "message": f"Successfully rejected match between entities {request.source_id} and {request.target_id}" } return reject_result except Exception as e: # Log the error logger.error(f"Unexpected error in reject_entity_match: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while rejecting entity match" ) async def export_merge_tree(email_id: str, tenant_id: str = RELTIO_TENANT) -> dict: """Export the merge tree for all entities in a specific tenant. Args: email_id (str): This parameter indicates the valid email address to which the notification is sent after the export is completed. tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. Returns: A dictionary containing the scheduled export job ID and status Raises: Exception: If there's an error exporting the merge tree """ try: url = get_reltio_export_job_url(f"entities/_crosswalksTree", tenant_id) try: headers = get_reltio_headers() headers["Content-Type"] = "application/json" validate_connection_security(url, headers) except Exception as e: logger.error(f"Authentication or security error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate or security requirements not met" ) payload = { "outputAsJsonArray": True } params = { "email": email_id } try: result = http_request(url, method="POST", headers=headers, data=payload, params=params) except Exception as e: logger.error(f"API request error in export_merge_tree: {str(e)}") return create_error_response( "SERVER_ERROR", "Failed to schedule export merge tree job" ) try: await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, label=ActivityLogLabel.ENTITY_MERGE_TREE_EXPORT.value, client_type=ACTIVITY_CLIENT, description=f"export_merge_tree_tool : Successfully scheduled export merge tree job for all entities in tenant {tenant_id}" ) except Exception as log_error: logger.error(f"Activity logging failed for export_merge_tree: {str(log_error)}") return result except Exception as e: logger.error(f"Unexpected error in export_merge_tree: {str(e)}") return create_error_response( "SERVER_ERROR", "An unexpected error occurred while Scheduling the export merge tree job" ) async def unmerge_entity_by_contributor(origin_entity_id: str, contributor_entity_id: str, tenant_id: str = RELTIO_TENANT) -> dict: """Unmerge a contributor entity from a merged entity, keeping any profiles merged beneath it intact. Args: origin_entity_id (str): The ID of the origin entity (the merged entity) contributor_entity_id (str): The ID of the contributor entity to unmerge from the origin entity tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. Returns: A dictionary containing the result of the unmerge operation with 'a' (modified origin) and 'b' (spawn) entities Raises: Exception: If there's an error during the unmerge operation """ try: # Validate inputs using Pydantic model try: request = UnmergeEntityRequest( origin_entity_id=origin_entity_id, contributor_entity_id=contributor_entity_id, tenant_id=tenant_id ) except ValueError as e: logger.warning(f"Validation error in unmerge_entity_by_contributor: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid entity ID format: {str(e)}" ) # Construct URL with validated entity IDs url = get_reltio_url(f"entities/{request.origin_entity_id}/_unmerge", "api", request.tenant_id) # Add the contributor entity URI as a query parameter params = { "contributorURI": f"entities/{request.contributor_entity_id}" } try: headers = get_reltio_headers() headers["Content-Type"] = "application/json" headers["Globalid"] = ACTIVITY_CLIENT # Validate connection security validate_connection_security(url, headers) except Exception as e: logger.error(f"Authentication or security error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Make the POST request with URL parameters try: unmerge_result = http_request( url, method='POST', params=params, headers=headers ) except Exception as e: logger.error(f"API request error: {str(e)}") # Check for common errors if "404" in str(e): return create_error_response( "RESOURCE_NOT_FOUND", f"One or more entities not found" ) elif "400" in str(e): return create_error_response( "INVALID_REQUEST", f"Invalid unmerge request: {str(e)}" ) return create_error_response( "SERVER_ERROR", "Failed to unmerge entity" ) return unmerge_result except Exception as e: # Log the error logger.error(f"Unexpected error in unmerge_entity_by_contributor: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while unmerging entity" ) async def unmerge_entity_tree_by_contributor(origin_entity_id: str, contributor_entity_id: str, tenant_id: str = RELTIO_TENANT) -> dict: """Unmerge a contributor entity and all profiles merged beneath it from a merged entity. Args: origin_entity_id (str): The ID of the origin entity (the merged entity) contributor_entity_id (str): The ID of the contributor entity to unmerge from the origin entity tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. Returns: A dictionary containing the result of the unmerge operation with 'a' (modified origin) and 'b' (spawn) entities Raises: Exception: If there's an error during the unmerge operation """ try: # Validate inputs using Pydantic model try: request = UnmergeEntityRequest( origin_entity_id=origin_entity_id, contributor_entity_id=contributor_entity_id, tenant_id=tenant_id ) except ValueError as e: logger.warning(f"Validation error in unmerge_entity_tree_by_contributor: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid entity ID format: {str(e)}" ) # Construct URL with validated entity IDs url = get_reltio_url(f"entities/{request.origin_entity_id}/_treeUnmerge", "api", request.tenant_id) # Add the contributor entity URI as a query parameter params = { "contributorURI": f"entities/{request.contributor_entity_id}" } try: headers = get_reltio_headers() headers["Content-Type"] = "application/json" headers["Globalid"] = ACTIVITY_CLIENT # Validate connection security validate_connection_security(url, headers) except Exception as e: logger.error(f"Authentication or security error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Make the POST request with URL parameters try: unmerge_result = http_request( url, method='POST', params=params, headers=headers ) except Exception as e: logger.error(f"API request error: {str(e)}") # Check for common errors if "404" in str(e): return create_error_response( "RESOURCE_NOT_FOUND", f"One or more entities not found" ) elif "400" in str(e): return create_error_response( "INVALID_REQUEST", f"Invalid tree unmerge request: {str(e)}" ) return create_error_response( "SERVER_ERROR", "Failed to tree unmerge entity" ) return unmerge_result except Exception as e: # Log the error logger.error(f"Unexpected error in unmerge_entity_tree_by_contributor: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while tree unmerging entity" ) async def get_entity_with_matches( entity_id: str, attributes: List[str] = None, include_match_attributes: bool = True, match_attributes: List[str] = None, match_limit: int = 5, tenant_id: str = RELTIO_TENANT ) -> dict: """Get detailed information about a Reltio entity along with its potential matches Args: entity_id (str): The ID of the entity to retrieve attributes (List[str]): Specific attributes to return for source entity. Empty/None returns all attributes include_match_attributes (bool): Whether to include full attribute details for matching entities match_attributes (List[str]): Specific attributes to return for matching entities (only used if include_match_attributes=True) match_limit (int): Maximum number of potential matches to return (1-5) tenant_id (str): Tenant ID for the Reltio environment Returns: A dictionary containing the source entity, matches, and total count Raises: Exception: If there's an error getting the entity or matches """ try: # Validate inputs using Pydantic model try: request = EntityWithMatchesRequest( entity_id=entity_id, attributes=attributes or [], include_match_attributes=include_match_attributes, match_attributes=match_attributes or [], match_limit=match_limit, tenant_id=tenant_id ) except ValueError as e: logger.warning(f"Validation error in get_entity_with_matches: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid request parameters: {str(e)}" ) try: headers = get_reltio_headers() except Exception as e: logger.error(f"Authentication error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Get the source entity source_url = get_reltio_url(f"entities/{request.entity_id}", "api", request.tenant_id) try: validate_connection_security(source_url, headers) except SecurityError as e: logger.error(f"Security error: {str(e)}") return create_error_response( "SECURITY_ERROR", "Security requirements not met" ) try: source_entity = http_request(source_url, headers=headers) except Exception as e: logger.error(f"API request error getting source entity: {str(e)}") if "404" in str(e): return create_error_response( "RESOURCE_NOT_FOUND", f"Entity with ID {request.entity_id} not found" ) return create_error_response( "SERVER_ERROR", "Failed to retrieve source entity from Reltio API" ) # Get potential matches using _transitiveMatches endpoint matches_url = get_reltio_url(f"entities/{request.entity_id}/_transitiveMatches", "api", request.tenant_id) try: validate_connection_security(matches_url, headers) except SecurityError as e: logger.error(f"Security error for matches: {str(e)}") return create_error_response( "SECURITY_ERROR", "Security requirements not met" ) params = { "deep": 1, "markMatchedValues": "true", "sort": "relevance", "order": "desc", "activeness": "active", "limit": request.match_limit } try: matches_result = http_request(matches_url, headers=headers, params=params) except Exception as e: logger.warning(f"Error retrieving matches: {str(e)}") # Continue without matches if the matches API fails matches_result = [] # Get total count of matches (separate call without limit) total_count = 0 try: total_params = { "deep": 1, "markMatchedValues": "true", "activeness": "active", "limit": 1000 # High limit to get accurate count } total_matches_result = http_request(matches_url, headers=headers, params=total_params) total_count = len(total_matches_result) if total_matches_result else 0 except Exception as e: logger.warning(f"Error getting total matches count: {str(e)}") total_count = len(matches_result) if matches_result else 0 # Fetch full entity details for matching entities if requested match_entities = {} if request.include_match_attributes and matches_result: for match in matches_result[:request.match_limit]: match_entity_id = match["object"]["uri"].split("/")[-1] try: match_entity_url = get_reltio_url(f"entities/{match_entity_id}", "api", request.tenant_id) match_entity = http_request(match_entity_url, headers=headers) # Filter match entity attributes if specified filtered_match_entity = filter_entity(match_entity, {"attributes": request.match_attributes} if request.match_attributes else None) match_entities[match["object"]["uri"]] = filtered_match_entity except Exception as e: logger.warning(f"Failed to get details for match entity {match_entity_id}: {str(e)}") # Continue with other matches even if one fails continue # Filter source entity attributes if specified filtered_source_entity = filter_entity(source_entity, {"attributes": request.attributes} if request.attributes else None) # Prepare the result result = { "source_entity": { "uri": f"entities/{request.entity_id}", "label": source_entity.get("label", ""), "attributes": simplify_reltio_attributes(filtered_source_entity.get("attributes", {})) }, "matches": format_unified_entity_matches(matches_result[:request.match_limit], match_entities), "total_matches": total_count } # Add crosswalks for source entity if present if "crosswalks" in filtered_source_entity: result["source_entity"]["crosswalks"] = slim_crosswalks(filtered_source_entity["crosswalks"]) # Log activity for success try: await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, label=ActivityLogLabel.POTENTIAL_MATCHES_FOUND.value, client_type=ACTIVITY_CLIENT, description=json.dumps({ "uri": f"entities/{entity_id.split('/')[-1]}", "label": source_entity.get("label", ""), "total_matches": total_count }), items=[{"objectUri": f"entities/{entity_id.split('/')[-1]}"}] ) except Exception as log_error: logger.error(f"Activity logging failed for get_entity_with_matches: {str(log_error)}") return yaml.dump(result, sort_keys=False) except Exception as e: logger.error(f"Unexpected error in get_entity_with_matches: {str(e)}") return create_error_response( "SERVER_ERROR", "An unexpected error occurred while retrieving entity with matches" ) async def create_entities(entities: List[Dict[str, Any]], return_objects: bool = False, execute_lca: bool = True, tenant_id: str = RELTIO_TENANT) -> dict: """Create one or more entities in Reltio Args: entities (List[Dict[str, Any]]): List of entity objects to create. Each entity must have a 'type' field. return_objects (bool): Whether the response contains created entities: true or false(default) execute_lca (bool): Whether to trigger all Lifecycle Actions during this request: true (default) or false tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value. Returns: A dictionary containing the creation results for each entity Raises: Exception: If there's an error creating the entities """ try: # Validate inputs using Pydantic model try: request = CreateEntitiesRequest( entities=entities, return_objects=return_objects, execute_lca=execute_lca, tenant_id=tenant_id ) except ValueError as e: logger.warning(f"Validation error in create_entities: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid entities data: {str(e)}" ) # Construct URL url = get_reltio_url("entities", "api", request.tenant_id) try: headers = get_reltio_headers() headers["Globalid"] = ACTIVITY_CLIENT # Validate connection security validate_connection_security(url, headers) except Exception as e: logger.error(f"Authentication or security error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Prepare query parameters params = {} params["returnObjects"] = str(request.return_objects).lower() if not request.execute_lca: params["executeLCA"] = "false" # Make the POST request try: create_result = http_request( url, method='POST', data=request.entities, headers=headers, params=params ) except Exception as e: logger.error(f"API request error: {str(e)}") # Check for common errors if "400" in str(e): return create_error_response( "INVALID_REQUEST", f"Invalid create entities request: {str(e)}" ) elif "401" in str(e): return create_error_response( "AUTHENTICATION_ERROR", "Unauthorized - check your authentication token" ) elif "403" in str(e): return create_error_response( "AUTHORIZATION_ERROR", "Forbidden - insufficient permissions to create entities" ) return create_error_response( "SERVER_ERROR", "Failed to create entities" ) # Process the response to extract only the required fields if isinstance(create_result, list): processed_results = [] successful_count = 0 failed_count = 0 for result in create_result: processed_result = { "index": result.get("index") } # Check if the entity creation was successful if result.get("successful"): successful_count += 1 processed_result["successful"] = True # Add object details if returnObjects was true and object exists if request.return_objects and "object" in result: entity_obj = result["object"] processed_result["object"] = { "uri": entity_obj.get("uri"), "type": entity_obj.get("type"), "tags": entity_obj.get("tags"), "createdBy": entity_obj.get("createdBy"), "createdTime": entity_obj.get("createdTime"), "updatedBy": entity_obj.get("updatedBy"), "updatedTime": entity_obj.get("updatedTime"), "isFavorite": entity_obj.get("isFavorite"), "label": entity_obj.get("label"), "crosswalks": entity_obj.get("crosswalks") } # Remove None values processed_result["object"] = {k: v for k, v in processed_result["object"].items() if v is not None} else: processed_result["uri"] = result.get("uri") else: failed_count += 1 processed_result["successful"] = False # Include error information if "errors" in result: processed_result["errors"] = result["errors"] processed_results.append(processed_result) return yaml.dump(processed_results, sort_keys=False) else: # Handle unexpected response format logger.warning(f"Unexpected response format from create entities API: {type(create_result)}") return create_error_response( "UNEXPECTED_RESPONSE", "Unexpected response format from Reltio API" ) except Exception as e: # Log the error logger.error(f"Unexpected error in create_entities: {str(e)}") # Return a sanitized error response return create_error_response( "SERVER_ERROR", "An unexpected error occurred while creating entities" ) async def get_entity_hops( entity_id: str, select: str = "label,secondaryLabel,entities.attributes,relations.attributes", graph_type_uris: str = "", relation_type_uris: str = "", entity_type_uris: str = "", deep: int = 1, max_results: int = 100, activeness_enabled: bool = True, return_inactive: bool = False, filter_last_level: bool = True, return_data_anyway: bool = False, options: str = "ovOnly", tenant_id: str = RELTIO_TENANT ) -> dict: """Get entity graph (hops) for a specific entity with comprehensive filtering and traversal options Args: entity_id (str): The ID of the entity to get hops for select (str): Comma-separated list of properties to return. Defaults to "label,secondaryLabel,entities.attributes,relations.attributes" graph_type_uris (str): Comma-separated list of graph type URIs for graphs to be traversed relation_type_uris (str): Comma-separated list of relation type URIs for relations to be traversed entity_type_uris (str): Comma-separated list of entity type URIs for entities to be traversed deep (int): Limits traversing deep levels. Default is 1 max_results (int): Limits the amount of entities to be returned. Default is 100, max is 1500 activeness_enabled (bool): Flag to determine whether to return only active entities and relations. Default is True return_inactive (bool): Flag to traverse inactive entities/relationships. Default is False filter_last_level (bool): Flag to NOT count relationships from the last level. Default is True return_data_anyway (bool): Flag to return partial data when credit consumption limit is exceeded. Default is False options (str): Comma-separated list of options (sendHidden, ovOnly, nonOvOnly, sendMasked, showAppliedSurvivorshipRules) tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value Returns: A dictionary containing the entity hops (graph) details with simplified attributes Raises: Exception: If there's an error getting the entity hops """ try: # Validate inputs using Pydantic model try: request = EntityIdRequest( entity_id=entity_id, tenant_id=tenant_id ) except ValueError as e: logger.warning(f"Validation error in get_entity_hops: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid entity ID format: {str(e)}" ) # Construct URL with validated entity ID url = get_reltio_url(f"entities/{request.entity_id}/_hops", "api", request.tenant_id) try: headers = get_reltio_headers() # Validate connection security validate_connection_security(url, headers) except Exception as e: logger.error(f"Authentication or security error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Validate and constrain max_results if max_results < 1: max_results = 1 elif max_results > 1500: max_results = 1500 logger.info(f"Max results limited to 1500 for entity hops") # Validate and constrain deep if deep < 1: deep = 1 elif deep > 10: # Reasonable upper limit to prevent excessive traversal deep = 10 logger.info(f"Deep level limited to 10 for entity hops") # Build query parameters params = { "select": select, "deep": deep, "max": max_results, "activeness_enabled": str(activeness_enabled).lower(), "returnInactive": str(return_inactive).lower(), "filterLastLevel": str(filter_last_level).lower(), "returnDataAnyway": str(return_data_anyway).lower(), "options": options } # Add optional URI filters if provided if graph_type_uris.strip(): params["graphTypeURIs"] = graph_type_uris.strip() if relation_type_uris.strip(): params["relationTypeURIs"] = relation_type_uris.strip() if entity_type_uris.strip(): params["entityTypeURIs"] = entity_type_uris.strip() # Make the request with timeout try: hops_data = http_request(url, method='GET', headers=headers, params=params) except Exception as e: logger.error(f"API request error: {str(e)}") error_str = str(e) # Extract error message from JSON response if available error_message = "" try: json_match = re.search(r'\{.*\}', error_str) if json_match: error_json = json.loads(json_match.group()) error_message = error_json.get("errorMessage", "") except (json.JSONDecodeError, AttributeError): pass # Check for specific error codes and return appropriate responses if "404" in error_str: if not error_message: error_message = "Entity not found" return create_error_response( "RESOURCE_NOT_FOUND", f"Entity with ID {request.entity_id} not found: {error_message}" ) elif "400" in error_str: if not error_message: error_message = "Invalid request parameters" return create_error_response( "INVALID_REQUEST", f"Bad request: {error_message}" ) # Generic server error for other cases if not error_message: error_message = "An error occurred while processing the request" return create_error_response( "SERVER_ERROR", f"Failed to retrieve entity hops from Reltio API: {error_message}" ) # Try to log activity for success try: await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, label=ActivityLogLabel.ENTITY_HOPS.value, client_type=ACTIVITY_CLIENT, description=json.dumps({ "uri": f"entities/{entity_id.split('/')[-1]}", "deep": deep, "max_results": max_results, "select": select, "graph_type_uris": graph_type_uris, "relation_type_uris": relation_type_uris, "entity_type_uris": entity_type_uris }), items=[{"objectUri": f"entities/{entity_id.split('/')[-1]}"}] ) except Exception as log_error: logger.error(f"Activity logging failed for get_entity_hops: {str(log_error)}") # Process the response to simplify attributes result = { "relations": hops_data.get("relations", []), "entities": [], "dataComplete": hops_data.get("dataComplete", True) } # Process entities and simplify their attributes for entity in hops_data.get("entities", []): processed_entity = { "URI": entity.get("uri"), "type": entity.get("type"), "label": entity.get("label"), "secondaryLabel": entity.get("secondaryLabel"), "traversedRelationsCount": entity.get("traversedRelations", 0), "untraversedRelationsCount": entity.get("untraversedRelations", 0) } # Simplify attributes using the existing function if "attributes" in entity: processed_entity["attributes"] = simplify_reltio_attributes(entity["attributes"]) # Add crosswalks if present if "crosswalks" in entity: processed_entity["crosswalks"] = slim_crosswalks(entity["crosswalks"]) result["entities"].append(processed_entity) return yaml.dump(result, sort_keys=False) except Exception as e: logger.error(f"Unexpected error in get_entity_hops: {str(e)}") return create_error_response( "SERVER_ERROR", "An unexpected error occurred while retrieving entity hops" ) async def get_entity_parents( entity_id: str, graph_type_uris: str, select: str = "uri,label,type,secondaryLabel", options: str = "", tenant_id: str = RELTIO_TENANT ) -> dict: """Get the parents of a specified entity from Reltio Args: entity_id (str): The ID of the entity to get parents for graph_type_uris (str): Comma-separated list of graph type URIs to traverse (required) for example : OrganizationHierarchy,Hierarchy select (str): Comma-separated list of properties to include in the response. Defaults to "uri,label,type,secondaryLabel" options (str): Comma-separated list of options affecting the response content: - sendHidden: Include hidden attributes in the response - ovOnly: Return only attribute values with the ov=true flag - nonOvOnly: Return only attribute values with the ov=false flag tenant_id (str): Tenant ID for the Reltio environment. Defaults to RELTIO_TENANT env value Returns: A dictionary containing: - parentPaths: Array of parent paths, where each path is an array of entities with their relations - entities: Dictionary of entity details keyed by entity URI - relations: Dictionary of relation details keyed by relation URI Raises: Exception: If there's an error getting the entity parents """ try: # Validate inputs using Pydantic model try: request = GetEntityParentsRequest( entity_id=entity_id, graph_type_uris=graph_type_uris, select=select, options=options, tenant_id=tenant_id ) except ValueError as e: logger.warning(f"Validation error in get_entity_parents: {str(e)}") return create_error_response( "VALIDATION_ERROR", f"Invalid request parameters: {str(e)}" ) # Construct URL with validated entity ID url = get_reltio_url(f"entities/{request.entity_id}/_parents", "api", request.tenant_id) try: headers = get_reltio_headers() # Validate connection security validate_connection_security(url, headers) except Exception as e: logger.error(f"Authentication or security error: {str(e)}") return create_error_response( "AUTHENTICATION_ERROR", "Failed to authenticate with Reltio API" ) # Build query parameters params = { "graphTypeURIs": request.graph_type_uris, } # Add optional parameters if provided if request.select and request.select.strip(): params["select"] = request.select.strip() if request.options and request.options.strip(): params["options"] = request.options.strip() # Make the request with timeout try: parents_data = http_request(url, method='GET', headers=headers, params=params) except Exception as e: logger.error(f"API request error: {e}") error_str = str(e) # Extract error message from JSON response if available error_message = "" error_code = None try: json_match = re.search(r'\{.*\}', error_str) if json_match: error_json = json.loads(json_match.group()) error_message = error_json.get("errorMessage", "") error_code = error_json.get("errorCode") except (json.JSONDecodeError, AttributeError): pass # Check for specific error codes and return appropriate responses if "404" in error_str or error_code == 119: if not error_message: error_message = "Entity or graph type not found" return create_error_response( "RESOURCE_NOT_FOUND", f"Entity with ID {request.entity_id} or graph type not found: {error_message}" ) elif "400" in error_str: if not error_message: error_message = "Invalid request parameters" return create_error_response( "INVALID_REQUEST", f"Bad request: {error_message}" ) # Generic server error for other cases if not error_message: error_message = "An error occurred while processing the request" return create_error_response( "SERVER_ERROR", f"Failed to retrieve entity parents from Reltio API: {error_message}" ) # Try to log activity for success try: await ActivityLog.execute_and_log_activity( tenant_id=tenant_id, label=ActivityLogLabel.USER_PROFILE_VIEW.value, client_type=ACTIVITY_CLIENT, description=json.dumps({ "uri": f"entities/{entity_id.split('/')[-1]}", "graph_type_uris": graph_type_uris, "select": select, "options": options }), items=[{"objectUri": f"entities/{entity_id.split('/')[-1]}"}] ) except Exception as log_error: logger.error(f"Activity logging failed for get_entity_parents: {str(log_error)}") # Process the response to match the actual API structure result = { "parentPaths": parents_data.get("parentPaths", []), "entities": {}, "relations": parents_data.get("relations", {}) } # Process entities from the entities object (not array) entities_data = parents_data.get("entities", {}) for entity_uri, entity_data in entities_data.items(): processed_entity = { "uri": entity_data.get("uri"), "type": entity_data.get("type"), "label": entity_data.get("label"), "secondaryLabel": entity_data.get("secondaryLabel") } # Simplify attributes using the existing function if present if "attributes" in entity_data: processed_entity["attributes"] = simplify_reltio_attributes(entity_data["attributes"]) result["entities"][entity_uri] = processed_entity return yaml.dump(result, sort_keys=False) except Exception as e: logger.error(f"Unexpected error in get_entity_parents: {str(e)}") return create_error_response( "SERVER_ERROR", "An unexpected error occurred while retrieving entity parents" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/reltio-ai/reltio-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server