"""Discovery tools for EMS MCP server.
These tools enable LLMs to discover EMS systems, databases, fields, and analytics.
Discovery must be performed before querying data, as field and analytic IDs are
opaque strings that cannot be constructed manually.
"""
import logging
from collections import deque
from typing import Any
from ems_mcp.api.client import EMSAPIError, EMSNotFoundError
from ems_mcp.cache import database_cache, field_cache, make_cache_key
from ems_mcp.server import get_client, mcp
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Result reference store
# ---------------------------------------------------------------------------
# Discovery tools assign numbered references [N] to search results instead of
# displaying full opaque IDs. The agent can later call get_result_id([N, ...])
# to retrieve the actual IDs for the specific results it needs.
_result_store: dict[int, dict[str, str]] = {}
_next_ref: int = 0
_STORE_MAX_SIZE: int = 500
def _store_result(name: str, result_id: str) -> int:
"""Store a result and return its reference number.
Entries accumulate across searches so the agent can reference results
from any prior search within the session. When the store exceeds
``_STORE_MAX_SIZE``, the oldest entries are evicted.
Args:
name: Human-readable name of the result.
result_id: The full opaque ID string.
Returns:
The reference number assigned to this result.
"""
global _next_ref # noqa: PLW0603
ref = _next_ref
_next_ref += 1
_result_store[ref] = {"name": name, "id": result_id}
# Evict oldest entries when over capacity
if len(_result_store) > _STORE_MAX_SIZE:
oldest_keys = sorted(_result_store.keys())[: len(_result_store) - _STORE_MAX_SIZE]
for k in oldest_keys:
del _result_store[k]
return ref
def _get_stored_result(ref: int) -> dict[str, str] | None:
"""Look up a stored result by reference number.
Args:
ref: The reference number returned by ``_store_result``.
Returns:
Dict with ``name`` and ``id`` keys, or ``None`` if not found.
"""
return _result_store.get(ref)
def _reset_result_store() -> None:
"""Reset the result store (for testing only)."""
global _next_ref # noqa: PLW0603
_result_store.clear()
_next_ref = 0
def _is_entity_type_database(database_id: str) -> bool:
"""Check if a database ID is an entity-type database.
Entity-type databases (containing ``[entity-type]`` but not
``[entity-type-group]``) don't support the field search endpoint.
Fields must be discovered via ``list_fields`` (field-group navigation) instead.
Args:
database_id: The database ID to check.
Returns:
True if this is an entity-type database.
"""
return "[entity-type]" in database_id and "[entity-type-group]" not in database_id
def _format_ems_systems(systems: list[dict[str, Any]]) -> str:
"""Format EMS systems list for display."""
if not systems:
return "No EMS systems found."
lines = [f"Found {len(systems)} EMS system(s):"]
for sys in systems:
name = sys.get("name", "Unknown")
sys_id = sys.get("id", "?")
desc = sys.get("description", "")
if desc:
lines.append(f" - {name} (ID: {sys_id}): {desc}")
else:
lines.append(f" - {name} (ID: {sys_id})")
return "\n".join(lines)
def _format_database_group(group: dict[str, Any]) -> str:
"""Format database group response for display."""
lines = []
group_name = group.get("name", "Root")
group_id = group.get("id", "[none]")
lines.append(f"Group: {group_name} (ID: {group_id})")
# Format databases
databases = group.get("databases", [])
if databases:
lines.append(f"\nDatabases ({len(databases)}):")
for db in databases:
db_id = db.get("id", "?")
# Handle both root level (name/description) and nested (pluralName/singularName)
db_name = db.get("name") or db.get("pluralName") or db.get("singularName", "Unknown")
desc = db.get("description", "")
# Annotate IDs that may need further navigation
note = ""
if "[entity-type-group]" in str(db_id):
note = " [NOTE: this is a group ID - navigate deeper with list_databases]"
if desc:
lines.append(f" - {db_name} (ID: {db_id}): {desc}{note}")
else:
lines.append(f" - {db_name} (ID: {db_id}){note}")
# Format subgroups
groups = group.get("groups", [])
if groups:
lines.append(f"\nSubgroups ({len(groups)}):")
for g in groups:
g_id = g.get("id", "?")
g_name = g.get("name", "Unknown")
lines.append(f" - {g_name} (ID: {g_id})")
if not databases and not groups:
lines.append("\n(Empty group)")
return "\n".join(lines)
def _format_field_group(group: dict[str, Any]) -> str:
"""Format field group response for display."""
lines = []
group_name = group.get("name", "Root")
group_id = group.get("id", "[none]")
lines.append(f"Field Group: {group_name} (ID: {group_id})")
# Format fields
fields = group.get("fields", [])
if fields:
lines.append(f"\nFields ({len(fields)}):")
for f in fields:
field_id = f.get("id", "?")
field_name = f.get("name", "Unknown")
field_type = f.get("type", "unknown")
lines.append(f" - {field_name} ({field_type})")
lines.append(f" ID: {field_id}")
# Format subgroups
groups = group.get("groups", [])
if groups:
lines.append(f"\nSubgroups ({len(groups)}):")
for g in groups:
g_id = g.get("id", "?")
g_name = g.get("name", "Unknown")
lines.append(f" - {g_name} (ID: {g_id})")
if not fields and not groups:
lines.append("\n(Empty group)")
return "\n".join(lines)
def _format_field_search_results(
fields: list[dict[str, Any]],
show_ids: bool = False,
) -> str:
"""Format field search results for display.
Args:
fields: List of field dicts from the API.
show_ids: If True, show full IDs inline (backward compat).
If False (default), assign numbered refs and hide IDs.
"""
if not fields:
return "No fields found matching the search criteria."
lines = [f"Found {len(fields)} field(s):"]
for f in fields:
field_id = f.get("id", "?")
field_name = f.get("name", "Unknown")
field_type = f.get("type", "unknown")
units = f.get("units")
type_str = field_type
if units:
type_str = f"{field_type} ({units})"
if show_ids:
lines.append(f"\n {field_name} [{type_str}]")
lines.append(f" ID: {field_id}")
else:
ref = _store_result(field_name, field_id)
lines.append(f"\n [{ref}] {field_name} [{type_str}]")
if not show_ids:
lines.append("\nUse get_result_id to retrieve full IDs for query_database.")
return "\n".join(lines)
def _format_field_info(field: dict[str, Any]) -> str:
"""Format detailed field information for display."""
lines = []
field_name = field.get("name", "Unknown")
field_id = field.get("id", "?")
field_type = field.get("type", "unknown")
lines.append(f"Field: {field_name}")
lines.append(f"Type: {field_type}")
units = field.get("units")
if units:
lines.append(f"Units: {units}")
description = field.get("description")
if description:
lines.append(f"Description: {description}")
lines.append(f"\nField ID: {field_id}")
# Handle discrete values
discrete_values = field.get("discreteValues")
if discrete_values:
# Normalize dict format {"code": "label"} to list format [{"value": code, "label": label}]
if isinstance(discrete_values, dict):
discrete_values = [{"value": k, "label": v} for k, v in discrete_values.items()]
lines.append(f"\nDiscrete Values ({len(discrete_values)}):")
# Limit display for large value sets
display_count = min(len(discrete_values), 50)
for dv in discrete_values[:display_count]:
value = dv.get("value", "?")
label = dv.get("label", "Unknown")
lines.append(f" {value}: {label}")
if len(discrete_values) > display_count:
lines.append(f" ... and {len(discrete_values) - display_count} more values")
return "\n".join(lines)
def _format_analytics_search_results(
analytics: list[dict[str, Any]],
show_ids: bool = False,
) -> str:
"""Format analytics search results for display.
Args:
analytics: List of analytic dicts from the API.
show_ids: If True, show full IDs inline (backward compat).
If False (default), assign numbered refs and hide IDs.
"""
if not analytics:
return "No analytics found matching the search criteria."
lines = [f"Found {len(analytics)} analytic(s):"]
for a in analytics:
analytic_id = a.get("id", "?")
analytic_name = a.get("name", "Unknown")
analytic_type = a.get("type", "unknown")
units = a.get("units")
description = a.get("description")
type_str = analytic_type
if units:
type_str = f"{analytic_type} ({units})"
if show_ids:
lines.append(f"\n {analytic_name} [{type_str}]")
if description:
lines.append(f" {description}")
lines.append(f" ID: {analytic_id}")
else:
ref = _store_result(analytic_name, analytic_id)
lines.append(f"\n [{ref}] {analytic_name} [{type_str}]")
if description:
lines.append(f" {description}")
if not show_ids:
lines.append(
"\nYou can pass analytic names directly to query_flight_analytics."
)
return "\n".join(lines)
@mcp.tool
async def list_ems_systems() -> str:
"""List available EMS systems the user can access.
Returns a list of EMS systems with their IDs, names, and descriptions.
Use the system ID in subsequent API calls.
Returns:
Formatted list of available EMS systems.
"""
client = get_client()
try:
systems = await client.get("/api/v2/ems-systems")
return _format_ems_systems(systems)
except EMSAPIError as e:
return f"Error listing EMS systems: {e.message}"
@mcp.tool
async def list_databases(
ems_system_id: int,
group_id: str | None = None,
) -> str:
"""Navigate database hierarchy to find available databases.
Databases are organized hierarchically. Call without group_id to see
the root level, then use returned group IDs to navigate deeper.
Common database: "FDW Flights" (Flight Data Warehouse) contains flight records.
Args:
ems_system_id: The EMS system ID (from list_ems_systems).
group_id: Optional group ID to navigate into. Omit for root level.
Returns:
Formatted list of databases and subgroups at the specified level.
"""
client = get_client()
# Check cache first
cache_key = make_cache_key("database_group", ems_system_id, group_id or "root")
cached = await database_cache.get(cache_key)
if cached is not None:
logger.debug("Using cached database group: %s", cache_key)
return _format_database_group(cached)
try:
path = f"/api/v2/ems-systems/{ems_system_id}/database-groups"
if group_id:
path += f"?groupId={group_id}"
group = await client.get(path)
# Cache the result
await database_cache.set(cache_key, group)
return _format_database_group(group)
except EMSNotFoundError:
return f"Error: Database group not found. Verify ems_system_id={ems_system_id} is valid."
except EMSAPIError as e:
return f"Error listing databases: {e.message}"
@mcp.tool
async def list_fields(
ems_system_id: int,
database_id: str,
group_id: str | None = None,
) -> str:
"""Navigate field hierarchy within a database.
Fields are organized into groups. Call without group_id to see root level,
then use returned group IDs to navigate deeper.
Args:
ems_system_id: The EMS system ID.
database_id: Database ID (from list_databases).
group_id: Optional field group ID to navigate into.
Returns:
Formatted list of fields and field groups at the specified level.
"""
# Early validation: warn if this looks like a group ID, not a database ID
if "[entity-type-group]" in database_id:
return (
"Error: This appears to be a database GROUP ID, not a database ID. "
"Use list_databases with this as group_id to navigate deeper and find "
"actual database IDs."
)
client = get_client()
# Check cache first
cache_key = make_cache_key("field_group", ems_system_id, database_id, group_id or "root")
cached = await field_cache.get(cache_key)
if cached is not None:
logger.debug("Using cached field group: %s", cache_key)
return _format_field_group(cached)
try:
path = f"/api/v2/ems-systems/{ems_system_id}/databases/{database_id}/field-groups"
if group_id:
path += f"?groupId={group_id}"
group = await client.get(path)
# Cache the result
await field_cache.set(cache_key, group)
return _format_field_group(group)
except EMSNotFoundError:
return (
f"Error: Field group not found. Verify database_id='{database_id}' is valid. "
"Use list_databases to find valid database IDs."
)
except EMSAPIError as e:
if e.status_code == 405:
return (
f"Error: HTTP 405 Method Not Allowed. "
f"This usually means database_id='{database_id}' is invalid. "
"Make sure you are using a database ID (not a group ID). "
"Use list_databases to find valid database IDs."
)
return f"Error listing fields: {e.message}"
@mcp.tool
async def search_fields(
ems_system_id: int,
database_id: str,
search_text: str,
max_results: int = 50,
show_ids: bool = False,
) -> str:
"""Search for fields by name within a database.
Results show numbered references [N] instead of full IDs. Use
get_result_id([N]) to retrieve the full ID when needed for query_database.
Args:
ems_system_id: The EMS system ID.
database_id: Database ID (from list_databases).
search_text: Text to search for in field names (case-insensitive partial match).
max_results: Maximum results to return (default: 50).
show_ids: If True, show full IDs inline instead of numbered references.
Returns:
Formatted list of matching fields with types and units.
"""
# Early validation: warn if this looks like a group ID, not a database ID
if "[entity-type-group]" in database_id:
return (
"Error: This appears to be a database GROUP ID, not a database ID. "
"Use list_databases with this as group_id to navigate deeper and find "
"actual database IDs."
)
# Proactive check: entity-type databases don't support field search
if _is_entity_type_database(database_id):
return (
f"Error: database_id='{database_id}' is an entity-type database, which does "
"not support the field search endpoint. Use list_fields to browse fields "
"by navigating the field group hierarchy instead."
)
client = get_client()
# Check cache first
cache_key = make_cache_key("field_search", ems_system_id, database_id, search_text.lower())
cached = await field_cache.get(cache_key)
if cached is not None:
logger.debug("Using cached field search: %s", cache_key)
# Apply max_results limit to cached results
return _format_field_search_results(cached[:max_results], show_ids=show_ids)
try:
path = f"/api/v2/ems-systems/{ems_system_id}/databases/{database_id}/fields"
params = {"text": search_text}
fields = await client.get(path, params=params)
# Cache the full results
await field_cache.set(cache_key, fields)
# Return limited results
return _format_field_search_results(fields[:max_results], show_ids=show_ids)
except EMSNotFoundError:
return (
f"Error: Database not found. Verify database_id='{database_id}' is valid. "
"Use list_databases to find valid database IDs."
)
except EMSAPIError as e:
if e.status_code == 405:
return (
f"Error: Field search failed with HTTP 405 Method Not Allowed for "
f"database_id='{database_id}'. This may be an entity-type database "
"that doesn't support field search, or the database ID may be invalid. "
"Try using list_fields to browse fields by navigating the field group "
"hierarchy, or use list_databases to verify the database ID."
)
return f"Error searching fields: {e.message}"
@mcp.tool
async def get_field_info(
ems_system_id: int,
database_id: str,
field_id: str,
) -> str:
"""Get detailed information about a specific field.
Returns field metadata including type, units, description, and discrete
value mappings if applicable. Discrete fields have numeric codes that
map to labels - use this to find the correct codes for filtering.
Args:
ems_system_id: The EMS system ID.
database_id: Database ID (from list_databases).
field_id: Field ID (from list_fields or search_fields).
Returns:
Detailed field information including discrete value mappings.
"""
client = get_client()
# Check cache first
cache_key = make_cache_key("field_info", ems_system_id, database_id, field_id)
cached = await field_cache.get(cache_key)
if cached is not None:
logger.debug("Using cached field info: %s", cache_key)
return _format_field_info(cached)
try:
# URL encode the field_id as it contains special characters
import urllib.parse
encoded_field_id = urllib.parse.quote(field_id, safe="")
path = (
f"/api/v2/ems-systems/{ems_system_id}/databases/{database_id}/fields/{encoded_field_id}"
)
field = await client.get(path)
# Cache the result
await field_cache.set(cache_key, field)
return _format_field_info(field)
except EMSNotFoundError:
return (
"Error: Field not found. Verify field_id is correct. "
"Use search_fields to find valid field IDs."
)
except EMSAPIError as e:
return f"Error getting field info: {e.message}"
@mcp.tool
async def search_analytics(
ems_system_id: int,
search_text: str,
group_id: str | None = None,
max_results: int = 50,
show_ids: bool = False,
) -> str:
"""Search for time-series analytics by name.
Analytics provide time-series data for flights (altitude, airspeed, etc.).
Results show numbered references [N] instead of full IDs. You can pass
analytic names directly to query_flight_analytics -- raw IDs are not needed.
Args:
ems_system_id: The EMS system ID.
search_text: Text to search for in analytic names.
group_id: Optional: limit search to specific analytic group.
max_results: Maximum results to return (default: 50).
show_ids: If True, show full IDs inline instead of numbered references.
Returns:
Formatted list of matching analytics with types, units, and descriptions.
"""
client = get_client()
# Check cache first
cache_key = make_cache_key(
"analytics_search", ems_system_id, search_text.lower(), group_id or "all"
)
cached = await field_cache.get(cache_key)
if cached is not None:
logger.debug("Using cached analytics search: %s", cache_key)
return _format_analytics_search_results(cached[:max_results], show_ids=show_ids)
try:
path = f"/api/v2/ems-systems/{ems_system_id}/analytics"
params: dict[str, str] = {"text": search_text}
if group_id:
params["groupId"] = group_id
analytics = await client.get(path, params=params)
# Cache the full results
await field_cache.set(cache_key, analytics)
# Return limited results
return _format_analytics_search_results(analytics[:max_results], show_ids=show_ids)
except EMSNotFoundError:
return f"Error: EMS system {ems_system_id} not found. Use list_ems_systems to find valid system IDs."
except EMSAPIError as e:
return f"Error searching analytics: {e.message}"
async def _fetch_field_group(
client: Any,
ems_system_id: int,
database_id: str,
group_id: str | None,
) -> dict[str, Any]:
"""Fetch a field group from the API, with caching.
Reuses the same cache key pattern as ``list_fields`` so cache entries
are shared between the two code paths.
Args:
client: The EMS API client.
ems_system_id: The EMS system ID.
database_id: The database ID.
group_id: The field group ID, or None for root.
Returns:
The field group response dict.
"""
cache_key = make_cache_key("field_group", ems_system_id, database_id, group_id or "root")
cached = await field_cache.get(cache_key)
if cached is not None:
return cached
path = f"/api/v2/ems-systems/{ems_system_id}/databases/{database_id}/field-groups"
if group_id:
path += f"?groupId={group_id}"
group = await client.get(path)
await field_cache.set(cache_key, group)
return group
async def _recursive_field_search(
client: Any,
ems_system_id: int,
database_id: str,
search_text: str,
max_depth: int,
max_results: int,
max_groups: int,
) -> tuple[list[dict[str, Any]], int]:
"""BFS traversal of field groups to find fields matching search text.
Args:
client: The EMS API client.
ems_system_id: The EMS system ID.
database_id: The database ID.
search_text: Text to match against field names (case-insensitive partial).
max_depth: Maximum depth to traverse.
max_results: Maximum number of matching fields to return.
max_groups: Hard cap on total field-group API calls to prevent timeouts.
Returns:
Tuple of (matching fields list, groups_visited count).
"""
search_lower = search_text.lower()
search_words = set(search_lower.split())
matches: list[dict[str, Any]] = []
groups_visited = 0
# BFS queue entries: (group_id_or_None, depth, path_parts)
queue: deque[tuple[str | None, int, list[str]]] = deque()
queue.append((None, 0, []))
while queue and len(matches) < max_results:
if groups_visited >= max_groups:
break
group_id, depth, path_parts = queue.popleft()
if depth > max_depth:
continue
try:
group = await _fetch_field_group(client, ems_system_id, database_id, group_id)
groups_visited += 1
except (EMSAPIError, EMSNotFoundError):
groups_visited += 1
continue
group_name = group.get("name", "")
current_path = path_parts + [group_name] if group_name and depth > 0 else path_parts
# Check fields at this level
for field in group.get("fields", []):
if len(matches) >= max_results:
break
field_name = field.get("name", "")
if search_lower in field_name.lower():
matches.append({
"name": field_name,
"id": field.get("id", ""),
"type": field.get("type", "unknown"),
"units": field.get("units"),
"path": " > ".join(current_path) if current_path else "(root)",
})
# Enqueue subgroups with relevance prioritization
if depth < max_depth:
for sub in group.get("groups", []):
sub_id = sub.get("id")
if sub_id:
sub_name_lower = sub.get("name", "").lower()
entry = (sub_id, depth + 1, current_path)
# Prioritize groups whose name contains a search word
if search_words & set(sub_name_lower.split()):
queue.appendleft(entry)
else:
queue.append(entry)
return matches, groups_visited
def _format_deep_search_results(
results: list[dict[str, Any]],
search_text: str,
groups_visited: int = 0,
max_groups: int = 0,
show_ids: bool = False,
) -> str:
"""Format recursive field search results for display.
Args:
results: List of matching field dicts.
search_text: The original search text (for display).
groups_visited: Number of field-group API calls made.
max_groups: The max_groups budget that was configured.
show_ids: If True, show full IDs inline (backward compat).
If False (default), assign numbered refs and hide IDs.
Returns:
Formatted search results string.
"""
if not results:
msg = f"No fields found matching '{search_text}' in deep search."
if groups_visited > 0 and max_groups > 0:
msg += f"\n(Searched {groups_visited} group(s), budget: {max_groups})"
if groups_visited >= max_groups:
msg += "\nBudget exhausted -- try increasing max_groups for a wider search."
return msg
lines = [f"Found {len(results)} field(s) matching '{search_text}':"]
for f in results:
field_name = f["name"]
field_type = f["type"]
units = f.get("units")
path = f["path"]
field_id = f["id"]
type_str = field_type
if units:
type_str = f"{field_type} ({units})"
if show_ids:
lines.append(f"\n {field_name} [{type_str}]")
lines.append(f" Path: {path}")
lines.append(f" ID: {field_id}")
else:
ref = _store_result(field_name, field_id)
lines.append(f"\n [{ref}] {field_name} [{type_str}]")
lines.append(f" Path: {path}")
if not show_ids and results:
lines.append("\nUse get_result_id to retrieve full IDs for query_database.")
if groups_visited > 0 and max_groups > 0:
stats = f"\n(Searched {groups_visited} group(s), budget: {max_groups})"
if groups_visited >= max_groups:
stats += "\nBudget exhausted -- try increasing max_groups for a wider search."
lines.append(stats)
return "\n".join(lines)
@mcp.tool
async def search_fields_deep(
ems_system_id: int,
database_id: str,
search_text: str,
max_depth: int = 5,
max_results: int = 20,
max_groups: int = 50,
show_ids: bool = False,
) -> str:
"""Recursively search for fields by name across all field groups.
Unlike search_fields, this works on ALL databases including entity-type
databases that don't support the field search endpoint. It traverses the
field group hierarchy using breadth-first search, matching fields by name.
This is slower than search_fields because it makes multiple API calls
(one per group visited). Use search_fields first when possible.
Results show numbered references [N] instead of full IDs. Use
get_result_id([N]) to retrieve the full ID when needed for query_database.
Args:
ems_system_id: The EMS system ID.
database_id: Database ID (from list_databases).
search_text: Text to search for in field names (case-insensitive partial match).
max_depth: Maximum depth to traverse (default: 5, max: 10).
max_results: Maximum matching fields to return (default: 20, max: 50).
max_groups: Maximum field-group API calls to make (default: 50, max: 200).
At ~200ms per call, 50 groups takes ~10s worst case.
show_ids: If True, show full IDs inline instead of numbered references.
Returns:
Matching fields with their names, types, and group paths.
"""
if not search_text or not search_text.strip():
return "Error: search_text cannot be empty."
max_depth = min(max(1, max_depth), 10)
max_results = min(max(1, max_results), 50)
max_groups = min(max(1, max_groups), 200)
if "[entity-type-group]" in database_id:
return (
"Error: This appears to be a database GROUP ID, not a database ID. "
"Use list_databases with this as group_id to navigate deeper and find "
"actual database IDs."
)
client = get_client()
try:
results, groups_visited = await _recursive_field_search(
client, ems_system_id, database_id, search_text.strip(),
max_depth, max_results, max_groups,
)
return _format_deep_search_results(
results, search_text.strip(), groups_visited, max_groups,
show_ids=show_ids,
)
except EMSNotFoundError:
return (
f"Error: Database not found. Verify database_id='{database_id}' is valid. "
"Use list_databases to find valid database IDs."
)
except EMSAPIError as e:
return f"Error during deep field search: {e.message}"
@mcp.tool
async def get_result_id(
result_numbers: list[int],
) -> str:
"""Retrieve full IDs for numbered search results.
Discovery tools (search_fields, search_fields_deep, search_analytics)
assign reference numbers [N] to results instead of showing full IDs.
Use this tool to get the actual IDs when needed (e.g., for
query_database field_id parameters).
Args:
result_numbers: Reference numbers from search results (e.g., [1, 3, 5]).
Returns:
The name and full ID for each requested result.
"""
if not result_numbers:
return "Error: result_numbers cannot be empty."
lines: list[str] = []
not_found: list[int] = []
for ref in result_numbers:
entry = _get_stored_result(ref)
if entry is not None:
lines.append(f"[{ref}] {entry['name']}")
lines.append(f" ID: {entry['id']}")
else:
not_found.append(ref)
if not_found:
lines.append(
f"\nNot found: {not_found}. These may have been evicted or never existed. "
"Re-run the search to get fresh references."
)
return "\n".join(lines) if lines else "No results found for the given reference numbers."