# Copyright [2025] [IBM]
# Licensed under the Apache License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
# See the LICENSE file in the project root for license information.
import json
from typing import Literal
from app.services.constants import (
CONNECTIONS_BASE_ENDPOINT,
PROJECTS_BASE_ENDPOINT,
CATALOGS_BASE_ENDPOINT,
SPACES_BASE_ENDPOINT,
ASSET_TYPE_BASE_ENDPOINT,
GS_BASE_ENDPOINT,
DATASOURCE_TYPES_BASE_ENDPOINT,
JSON_PLUS_UTF8_ACCEPT_TYPE,
EN_LANGUAGE_ACCEPT_TYPE
)
from app.shared.exceptions.base import ServiceError
from app.shared.utils.helpers import get_closest_match, get_project_or_space_type_based_on_context, append_context_to_url
from app.shared.utils.tool_helper_service import tool_helper_service
from app.core.auth import get_bss_account_id
from app.core.settings import settings
METADATA_ARTIFACT_TYPE = "metadata.artifact_type"
METADATA_NAME = "metadata.name"
ENTITY_ASSETS_PROJECT_ID = "entity.assets.project_id"
ENTITY_ASSETS_CATALOG_ID = "entity.assets.catalog_id"
ARTIFACT_TYPE_CATEGORY = "category"
ARTIFACT_TYPE_DATA_ASSET = "data_asset"
CATEGORY_UNCATEGORIZED = "uncategorized"
async def find_project_id(project_name: str) -> str:
"""
Find id of project based on project name.
Args:
project_name (str): The name of the project which is used to find a project id.
Returns:
uuid.UUID: Unique identifier of the project.
"""
params = {"limit": 100}
response = await tool_helper_service.execute_get_request(
url=str(tool_helper_service.base_url) + PROJECTS_BASE_ENDPOINT,
params=params,
)
projects = [
{"name": project["entity"]["name"], "id": project["metadata"]["guid"]}
for project in response.get("resources", {})
]
result_id = get_closest_match(projects, project_name)
if result_id:
return result_id
else:
raise ServiceError(
f"find_project_id failed to find any projects with the name '{project_name}'"
)
async def find_connection_id(connection_name: str, project_id: str) -> str:
"""
Find id of connection based on connection name.
Args:
connection_name (str): The name of the connection which is used to find a connection id,
project_id (uuid.UUID): The unique identifier of the project
Returns:
uuid.UUID: Unique identifier of the project.
"""
params = {"project_id": project_id}
response = await tool_helper_service.execute_get_request(
url=str(tool_helper_service.base_url) + CONNECTIONS_BASE_ENDPOINT,
params=params,
)
connections = [
{
"name": connection["entity"]["name"],
"id": connection["metadata"]["asset_id"],
}
for connection in response.get("resources", {})
]
result_id = get_closest_match(connections, connection_name)
if result_id:
return result_id
else:
raise ServiceError(
f"find_connection_id failed to find any connections with the name '{connection_name}'"
)
async def is_project_exist_by_name(project_name: str):
"""
Check for project name exist or not
Args:
project_name (str): The name of the project to create
Returns:
bool: True/False
str: Project type i.e. df/cpdaas/wx etc.
str: Project id
"""
params = {"limit": 100}
response = await tool_helper_service.execute_get_request(
url=str(tool_helper_service.base_url) + PROJECTS_BASE_ENDPOINT,
params=params
)
projects = [
{"name": project["entity"]["name"], "type": project["entity"]["type"], "id": project["metadata"]["guid"]}
for project in response.get("resources", {})
]
#check for exact project name
for proj_name in projects:
if proj_name["name"] == project_name:
return True, proj_name["type"], proj_name["id"]
return False,"",""
async def find_catalog_id(catalog_name: str) -> str:
"""
Find id of catalog based on catalog name.
Args:
catalog_name (str): The name of the catalog which is used to find a catalog id.
Returns:
uuid.UUID: Unique identifier of the catalog.
"""
params = {"limit": 1, "name": catalog_name}
response = await tool_helper_service.execute_get_request(
url=str(tool_helper_service.base_url) + CATALOGS_BASE_ENDPOINT,
params=params
)
result_id = None
for catalog in response.get("catalogs", []):
result_id = catalog["metadata"]["guid"]
if result_id:
return result_id
else:
raise ServiceError(
f"find_catalog_id failed to find any catalog with the name '{catalog_name}'"
)
async def get_platform_assets_catalog_id() -> str:
"""
Find id of the Platform Assets Catalog attached
to current user's account.
Returns:
uuid.UUID: Unique identifier of the Platform Assets catalog.
"""
response = await tool_helper_service.execute_get_request(
url=str(tool_helper_service.base_url) + CATALOGS_BASE_ENDPOINT + "/ibm-global-catalog"
)
result_id = response.get("metadata", {}).get("guid", None)
if result_id:
return result_id
else:
raise ServiceError(
"get_platform_assets_catalog_id failed to find the platform assets catalog"
)
def _build_container_from_response(
response: dict, container_type: str, id_field: str = "guid"
):
"""
Build a Container object from API response.
Args:
response: API response dictionary
container_type: Type of container ("project", "catalog", "space")
id_field: Field name for ID in metadata ("guid" or "id")
Returns:
Container object
"""
from app.services.search.models.container import Container, ContainerType
container_id = response.get("metadata", {}).get(id_field, "")
name = response.get("entity", {}).get("name", "")
if container_type == "project":
url = append_context_to_url(
f"{tool_helper_service.ui_base_url}/projects/{container_id}/overview",
settings.di_context
)
return Container(
id=container_id,
name=name,
type=ContainerType.PROJECT,
url=url
)
elif container_type == "space":
url = append_context_to_url(
f"{tool_helper_service.ui_base_url}/ml-runtime/spaces/{container_id}",
settings.di_context
)
return Container(
id=container_id,
name=name,
type=ContainerType.SPACE,
url=url
)
else: # catalog
url = append_context_to_url(
f"{tool_helper_service.ui_base_url}/data/catalogs/{container_id}",
settings.di_context
)
return Container(
id=container_id,
name=name,
type=ContainerType.CATALOG,
url=url
)
async def find_asset_container_by_id(
container_id: str, container_type: str
):
"""
Find container based on its id.
Args:
container_id: The ID of the container
container_type: The type of the container - "project", "catalog", or "space"
Returns:
Container object with the given id
Raises:
ServiceError: If the container is not found
"""
if container_type == "project":
params = {"bss_account_id": await get_bss_account_id()}
project_type = get_project_or_space_type_based_on_context()
if project_type:
params["type"] = project_type
response = await tool_helper_service.execute_get_request(
url=f"{tool_helper_service.base_url}{PROJECTS_BASE_ENDPOINT}/{container_id}",
params=params,
)
return _build_container_from_response(response, container_type, "guid")
elif container_type == "space":
response = await tool_helper_service.execute_get_request(
url=f"{tool_helper_service.base_url}{SPACES_BASE_ENDPOINT}/{container_id}",
)
return _build_container_from_response(response, container_type, "id")
else: # catalog or default
response = await tool_helper_service.execute_get_request(
url=f"{tool_helper_service.base_url}{CATALOGS_BASE_ENDPOINT}/{container_id}",
)
return _build_container_from_response(response, container_type, "guid")
async def find_asset_container_by_name(
container_name: str, container_type: str
):
"""
Find container based on its name using fuzzy matching.
Args:
container_name: The name of the container
container_type: The type of the container - "project", "catalog", or "space"
Returns:
Container object with the given name
Raises:
ServiceError: If the container is not found
"""
# Import here to avoid circular dependency
from app.services.search.tools.list_containers import _list_asset_containers
from app.services.search.models.container import ContainerType
# Convert string to ContainerType enum
container_type_enum = ContainerType(container_type)
containers = await _list_asset_containers(container_type_enum)
if not containers:
raise ServiceError(f"No {container_type}s found")
# Create list of name-id pairs for fuzzy matching
containers_names_ids = [
{"name": container.name, "id": container.id} for container in containers
]
# Find closest match
result_id = get_closest_match(containers_names_ids, container_name)
if result_id:
# Return the matching container
for container in containers:
if container.id == result_id:
return container
raise ServiceError(
f"Couldn't find any {container_type} with the name '{container_name}'"
)
async def find_asset_id(
asset_name: str, container_id: str, container_type: str
) -> str:
"""
Find id of asset based on asset name.
Args:
asset_name (str): Name of the asset.
catalog_id (str): ID of the to find the asset in.
container_type (str): Type of container (project/catalog) to find the asset in.
Returns:
uuid.UUID: Unique identifier of the asset.
"""
params = {
container_type + "_id": container_id,
"hide_deprecated_response_fields": True,
}
payload = {"query": "*:*"}
response = await tool_helper_service.execute_post_request(
url=str(tool_helper_service.base_url) + ASSET_TYPE_BASE_ENDPOINT + "/asset/search",
params=params,
json=payload,
)
result_id = None
if response["total_rows"] > 0:
asset_list = [
{"name": asset["metadata"]["name"], "id": asset["metadata"]["asset_id"]}
for asset in response["results"]
]
result_id = get_closest_match(asset_list, asset_name)
if result_id:
return result_id
else:
raise ServiceError(
f"find_asset_id failed to find any asset with the name '{asset_name}'"
)
async def find_datasource_type_asset_id(datasource_type: str) -> str:
"""
Find the asset ID for a datasource type by its name.
Searches for a datasource type matching the provided name or label and returns its ID.
The search is case-insensitive and matches partial names.
Args:
datasource_type (str): The name or label of the datasource type (e.g., "db2", "postgresql").
Returns:
str: The asset ID (UUID) of the matching datasource type, or empty string if not found.
"""
headers = {
"accept": JSON_PLUS_UTF8_ACCEPT_TYPE,
"Accept-Language": EN_LANGUAGE_ACCEPT_TYPE,
}
params = {
"offset": 0,
"limit": 100,
"connection_properties": False,
"interaction_properties": False,
"discovery": False,
"actions": False,
"generate_transitive_conditions": False,
"show_data_source_definitions_only": False,
"show_data_source_definition_section": False
}
response = await tool_helper_service.execute_get_request(
url=str(tool_helper_service.base_url) + DATASOURCE_TYPES_BASE_ENDPOINT,
headers=headers,
params=params
)
total_types = response.get("total_count")
offset = params["offset"]
datasource_type = datasource_type.lower()
while offset <= total_types:
for resource in response.get('resources', []):
datasource_type_name = resource['entity']['name'].lower()
datasource_type_label = resource['entity']['label'].lower()
if datasource_type == datasource_type_name or datasource_type == datasource_type_label:
return resource['metadata']['asset_id']
offset += 100
params["offset"] += offset
response = await tool_helper_service.execute_get_request(
url=str(tool_helper_service.base_url) + DATASOURCE_TYPES_BASE_ENDPOINT,
headers=headers,
params=params
)
raise ServiceError(
f"find_datasource_type_asset_id failed to find any datasource type with the name '{datasource_type}'"
)
async def get_datasource_type_name(datasource_type_id: str) -> str:
"""
Get the display name of a datasource type from its ID.
Retrieves the human-readable label for a datasource type using its unique identifier.
Args:
datasource_type_id (str): The unique identifier (UUID) of the datasource type.
Returns:
str: The display name/label of the datasource type.
"""
headers = {
"accept": JSON_PLUS_UTF8_ACCEPT_TYPE,
"Accept-Language": EN_LANGUAGE_ACCEPT_TYPE,
}
params = {
"generate_transitive_conditions": False,
"show_data_source_definition_section": False
}
response = await tool_helper_service.execute_get_request(
url=f'{str(tool_helper_service.base_url)}{DATASOURCE_TYPES_BASE_ENDPOINT}/{datasource_type_id}',
headers=headers,
params=params,
)
result = response.get("entity", {}).get("label", "")
if result:
return result
else:
raise ServiceError(
f"get_datasource_type_name failed to find any datasource type with id '{datasource_type_id}'"
)
async def find_metadata_enrichment_id(
metadata_enrichment_name: str, project_id: str
) -> str:
"""
Find ID of metadata enrichment based on metadata enrichment name.
Args:
metadata_enrichment_name (str): The name of the metadata enrichment that you want to execute.
project_id (uuid.UUID): The ID of the project in which you want to execute a metadata enrichment.
Returns:
str: The unique identifier of the metadata enrichment.
Raises:
ToolProcessFailedError: If the metadata enrichment asset is not found.
"""
post_url = (
tool_helper_service.base_url + "/v2/asset_types/metadata_enrichment_area/search"
)
query_params = {
"project_id": project_id,
}
payload = {"query": f'metadata_enrichment_area.name:"{metadata_enrichment_name}"'}
response = await tool_helper_service.execute_post_request(
url=post_url,
params=query_params,
json=payload,
)
result_id = None
list_of_results = response.get("results", [])
for metadata_enrichment in list_of_results:
result_id = metadata_enrichment.get("metadata", {}).get("asset_id", None)
if result_id:
return result_id
else:
raise ServiceError(
f"The metadata enrichment asset was not found with the name:'{metadata_enrichment_name}'"
)
async def find_asset_id_exact_match(
asset_name: str,
container_id: str,
container_type: Literal["catalog", "project"] = "project",
artifact_type: str = "data_asset",
) -> str:
"""
Find id of asset in specified project based on asset name.
Args:
asset_name (str): The name of the asset.
container_id (str): UUID of the project or catalog containing the asset.
container_type (Literal["project", "catalog"]): Type of container - either "project" or "catalog".
artifact_type (str): The artifact type of the asset
Returns:
str: Unique identifier of the asset
"""
if container_type == "catalog":
query_container = ENTITY_ASSETS_CATALOG_ID
else:
query_container = ENTITY_ASSETS_PROJECT_ID
query_params = {
"query": f"metadata.name:{asset_name} AND {query_container}:{container_id}"
}
response = await tool_helper_service.execute_get_request(
url=str(tool_helper_service.base_url) + GS_BASE_ENDPOINT,
params=query_params,
)
asset_id = None
for row in response.get("rows", []):
metadata = row["metadata"]
if (
metadata["artifact_type"] == artifact_type
and metadata["name"] == asset_name
):
asset_id = row["artifact_id"]
break
if asset_id:
return asset_id
else:
raise ServiceError(
f"Couldn't find any datasets with the name '{asset_name}' in {container_type} '{container_id}'"
)
def confirm_list_str(list_or_str: list[str] | str) -> list[str]:
"""
Convert a string or list input into a list of strings.
This utility function normalizes input that can be either a string or a list of strings
into a consistent list format. It handles multiple string formats including JSON arrays
and single values.
Processing logic:
- If input is already a list: returns it unchanged
- If input is a string:
1. Attempts to parse as JSON (with single quotes converted to double quotes)
2. If JSON parsing succeeds and result is a list: returns the parsed list
3. If JSON parsing succeeds but result is not a list: wraps it in a list
4. If JSON parsing fails: wraps the original string in a list
Args:
list_or_str (list[str] | str): The input which can be either:
- A list of strings (returned as-is)
- A JSON-formatted string representing a list (e.g., '["item1", "item2"]')
- A single string value (wrapped in a list)
Returns:
list[str]: A list of strings. Always returns a list, even for single string inputs.
Examples:
>>> get_list_from_str(["a", "b", "c"])
["a", "b", "c"]
>>> get_list_from_str('["item1", "item2"]')
["item1", "item2"]
>>> get_list_from_str("single_value")
["single_value"]
>>> get_list_from_str("['x', 'y', 'z']")
["x", "y", "z"]
"""
if isinstance(list_or_str, str):
try:
parsed = json.loads(list_or_str.replace("'", '"'))
if isinstance(parsed, list):
list_or_str = parsed
else:
list_or_str = [parsed]
except json.JSONDecodeError:
list_or_str = [list_or_str]
return list_or_str
async def find_category_id(category_name: str) -> str:
"""
Find id of category based on category name
Args:
category_name (str): Name of the category
Returns:
str: Category id of the category.
"""
must_match = [
{"match": {METADATA_ARTIFACT_TYPE: ARTIFACT_TYPE_CATEGORY}},
{"match": {METADATA_NAME: category_name}},
]
response = await tool_helper_service.execute_post_request(
url=str(tool_helper_service.base_url) + GS_BASE_ENDPOINT,
json={"query": {"bool": {"must": must_match}}},
)
result_id = None
for row in response.get("rows", []):
metadata = row["metadata"]
if metadata["artifact_type"] == "category" and (
metadata["name"] == category_name
or (
metadata["name"] == f"[{CATEGORY_UNCATEGORIZED}]"
and category_name == CATEGORY_UNCATEGORIZED
)
):
entity = row["entity"]
result_id = entity["artifacts"]["artifact_id"]
break
if result_id:
return result_id
else:
raise ServiceError(
f"Couldn't find any categories with the name '{category_name}'"
)