"""
User resolution business logic.
This module contains the core logic for resolving user names/emails
to UUIDs, with support for fuzzy matching, verification, and disambiguation.
"""
import logging
from typing import Any
from src.database import (
search_users,
get_pool,
get_user_by_uuid as db_get_user_by_uuid,
get_team_members_by_manager,
get_managers_of_user,
search_users_in_team,
)
logger = logging.getLogger(__name__)
async def resolve_user_impl(name_or_email: str) -> dict[str, Any]:
"""
Internal implementation of user resolution logic.
Args:
name_or_email: The user's name or email to resolve
Returns:
Dictionary with action and appropriate data based on match confidence
"""
if not name_or_email or not name_or_email.strip():
return {
"action": "error",
"message": "Name or email cannot be empty"
}
try:
matches = await search_users(name_or_email)
if not matches:
return {
"action": "not_found",
"message": f"No users found matching '{name_or_email}'",
"suggestion": "Try with a different spelling or use email address"
}
# Check for exact match with high confidence
top_match = matches[0]
if top_match["confidence"] >= 0.95:
# Check if there are other high-confidence matches
other_high_conf = [m for m in matches[1:] if m["confidence"] >= 0.8]
if not other_high_conf:
return {
"action": "resolved",
"user_id": str(top_match["uuid"]),
"email": top_match["email"],
"name": top_match["name"],
"confidence": top_match["confidence"]
}
# Multiple potential matches - need disambiguation
if len(matches) == 1:
# Single match but low confidence - ask for verification
return {
"action": "verify",
"message": f"Found one possible match. Please verify:",
"candidate": {
"name": top_match["name"],
"email": top_match["email"],
"confidence": top_match["confidence"]
},
"instruction": "Use confirm_user with the email to confirm this is the correct user"
}
else:
# Multiple matches - need disambiguation
return {
"action": "disambiguate",
"message": f"Found {len(matches)} possible matches for '{name_or_email}':",
"candidates": [
{
"name": m["name"],
"email": m["email"],
"confidence": m["confidence"]
}
for m in matches[:5] # Limit to top 5
],
"instruction": "Ask the user which one is correct, then use confirm_user with their email"
}
except Exception as e:
logger.error(f"Error in resolve_user: {e}")
return {
"action": "error",
"message": f"An error occurred: {str(e)}"
}
async def resolve_users_batch_impl(names: list[str]) -> dict[str, Any]:
"""
Resolve multiple user names or emails to their UUIDs.
Args:
names: List of names or emails to resolve (max 50)
Returns:
Dictionary with summary, resolved_uuids, results, and all_resolved flag
"""
if not names:
return {
"summary": {"total": 0},
"resolved_uuids": {},
"results": [],
"all_resolved": True
}
# Limit batch size
if len(names) > 50:
return {
"action": "error",
"message": "Maximum batch size is 50 names"
}
results = []
resolved_uuids = {}
summary = {
"total": len(names),
"resolved": 0,
"verify_needed": 0,
"disambiguate_needed": 0,
"not_found": 0,
"error": 0
}
for name in names:
result = await resolve_user_impl(name)
result["input"] = name
results.append(result)
action = result.get("action", "error")
if action == "resolved":
summary["resolved"] += 1
resolved_uuids[name] = result["user_id"]
elif action == "verify":
summary["verify_needed"] += 1
elif action == "disambiguate":
summary["disambiguate_needed"] += 1
elif action == "not_found":
summary["not_found"] += 1
else:
summary["error"] += 1
return {
"summary": summary,
"resolved_uuids": resolved_uuids,
"results": results,
"all_resolved": summary["resolved"] == summary["total"]
}
async def confirm_user_impl(email: str) -> dict[str, Any]:
"""
Confirm a user by their exact email and get their UUID.
Args:
email: The exact email address to look up
Returns:
Dictionary with action and user data or not_found message
"""
if not email or not email.strip():
return {
"action": "error",
"message": "Email cannot be empty"
}
try:
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT DISTINCT user_uuid, user_name, user_email
FROM users_and_managers
WHERE LOWER(user_email) = LOWER($1)
""", email.strip())
if row:
return {
"action": "resolved",
"user_id": str(row["user_uuid"]),
"email": row["user_email"],
"name": row["user_name"],
"confidence": 1.0
}
else:
return {
"action": "not_found",
"message": f"No user found with email '{email}'"
}
except Exception as e:
logger.error(f"Error in confirm_user: {e}")
return {
"action": "error",
"message": f"An error occurred: {str(e)}"
}
async def confirm_users_batch_impl(emails: list[str]) -> dict[str, Any]:
"""
Confirm multiple users by their exact emails and get their UUIDs.
Args:
emails: List of exact email addresses to look up (max 50)
Returns:
Dictionary with summary, resolved_uuids, results, and all_resolved flag
"""
if not emails:
return {
"summary": {"total": 0, "resolved": 0, "not_found": 0},
"resolved_uuids": {},
"results": [],
"all_resolved": True
}
# Limit batch size
if len(emails) > 50:
return {
"action": "error",
"message": "Maximum batch size is 50 emails"
}
try:
pool = await get_pool()
results = []
resolved_uuids = {}
async with pool.acquire() as conn:
for email in emails:
if not email or not email.strip():
results.append({
"input": email,
"action": "error",
"message": "Email cannot be empty"
})
continue
row = await conn.fetchrow("""
SELECT DISTINCT user_uuid, user_name, user_email
FROM users_and_managers
WHERE LOWER(user_email) = LOWER($1)
""", email.strip())
if row:
result = {
"input": email,
"action": "resolved",
"user_id": str(row["user_uuid"]),
"email": row["user_email"],
"name": row["user_name"]
}
resolved_uuids[email] = str(row["user_uuid"])
else:
result = {
"input": email,
"action": "not_found",
"message": f"No user found with email '{email}'"
}
results.append(result)
summary = {
"total": len(emails),
"resolved": len(resolved_uuids),
"not_found": len(emails) - len(resolved_uuids)
}
return {
"summary": summary,
"resolved_uuids": resolved_uuids,
"results": results,
"all_resolved": summary["resolved"] == summary["total"]
}
except Exception as e:
logger.error(f"Error in confirm_users_batch: {e}")
return {
"action": "error",
"message": f"An error occurred: {str(e)}"
}
# === Team Management ===
async def get_team_members_impl(manager_identifier: str) -> dict[str, Any]:
"""
Get all team members under a manager.
Args:
manager_identifier: Manager's email (preferred), UUID, or name
Returns:
Dictionary with team members list or error
"""
if not manager_identifier or not manager_identifier.strip():
return {
"action": "error",
"message": "Manager identifier cannot be empty"
}
try:
identifier = manager_identifier.strip()
# Determine identifier type
if "@" in identifier:
identifier_type = "email"
elif len(identifier) == 36 and identifier.count("-") == 4:
# UUID format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
identifier_type = "uuid"
else:
identifier_type = "name"
members = await get_team_members_by_manager(identifier, identifier_type)
if not members:
return {
"action": "not_found",
"message": f"No team members found for manager '{manager_identifier}'",
"suggestion": "Verify the manager email/UUID or check if they have any team members"
}
return {
"action": "resolved",
"manager_query": manager_identifier,
"team_count": len(members),
"members": members
}
except Exception as e:
logger.error(f"Error in get_team_members: {e}")
return {
"action": "error",
"message": f"An error occurred: {str(e)}"
}
async def get_manager_of_user_impl(user_identifier: str) -> dict[str, Any]:
"""
Get the manager(s) of a specific user.
Args:
user_identifier: User's email (preferred) or UUID
Returns:
Dictionary with user info and their managers
"""
if not user_identifier or not user_identifier.strip():
return {
"action": "error",
"message": "User identifier cannot be empty"
}
try:
identifier = user_identifier.strip()
# Determine identifier type
if "@" in identifier:
identifier_type = "email"
elif len(identifier) == 36 and identifier.count("-") == 4:
identifier_type = "uuid"
else:
# Treat as potential email without @ or partial
identifier_type = "email"
result = await get_managers_of_user(identifier, identifier_type)
if not result:
return {
"action": "not_found",
"message": f"No user found with identifier '{user_identifier}'",
"suggestion": "Try using the user's email address or UUID"
}
# Build response
response = {
"action": "resolved",
"user": result["user"],
"has_manager": len(result["managers"]) > 0,
"has_remote_manager": len(result["remote_managers"]) > 0
}
if result["managers"]:
response["managers"] = result["managers"]
if result["remote_managers"]:
response["remote_managers"] = result["remote_managers"]
return response
except Exception as e:
logger.error(f"Error in get_manager_of_user: {e}")
return {
"action": "error",
"message": f"An error occurred: {str(e)}"
}
async def get_user_by_uuid_impl(user_uuid: str) -> dict[str, Any]:
"""
Get user details by their UUID.
Args:
user_uuid: The user's UUID
Returns:
Dictionary with user details including managers and clients
"""
if not user_uuid or not user_uuid.strip():
return {
"action": "error",
"message": "User UUID cannot be empty"
}
try:
uuid_str = user_uuid.strip()
# Basic UUID format validation
if not (len(uuid_str) == 36 and uuid_str.count("-") == 4):
return {
"action": "error",
"message": f"Invalid UUID format: '{user_uuid}'. Expected format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
}
result = await db_get_user_by_uuid(uuid_str)
if not result:
return {
"action": "not_found",
"message": f"No user found with UUID '{user_uuid}'"
}
return {
"action": "resolved",
"user": {
"uuid": result["uuid"],
"name": result["name"],
"email": result["email"]
},
"managers": result.get("managers", []),
"remote_managers": result.get("remote_managers", []),
"clients": result.get("clients", [])
}
except Exception as e:
logger.error(f"Error in get_user_by_uuid: {e}")
return {
"action": "error",
"message": f"An error occurred: {str(e)}"
}
async def resolve_user_in_team_impl(
name_or_email: str,
manager_identifier: str
) -> dict[str, Any]:
"""
Resolve a user name/email within a specific manager's team only.
This is a scoped search that only returns users who report to the specified manager.
Useful for ensuring searches stay within team boundaries.
Args:
name_or_email: The user's name or email to search for
manager_identifier: Manager's email (preferred) or UUID
Returns:
Dictionary with action and matching users (scoped to team)
"""
if not name_or_email or not name_or_email.strip():
return {
"action": "error",
"message": "Name or email cannot be empty"
}
if not manager_identifier or not manager_identifier.strip():
return {
"action": "error",
"message": "Manager identifier cannot be empty"
}
try:
manager_id = manager_identifier.strip()
# Determine manager identifier type
if "@" in manager_id:
manager_type = "email"
elif len(manager_id) == 36 and manager_id.count("-") == 4:
manager_type = "uuid"
else:
manager_type = "email"
matches = await search_users_in_team(name_or_email, manager_id, manager_type)
if not matches:
return {
"action": "not_found",
"message": f"No team members found matching '{name_or_email}' under manager '{manager_identifier}'",
"suggestion": "The user might not be in this team, or try a different spelling"
}
top_match = matches[0]
# High confidence single match
if top_match["confidence"] >= 0.95:
other_high_conf = [m for m in matches[1:] if m["confidence"] >= 0.8]
if not other_high_conf:
return {
"action": "resolved",
"user_id": str(top_match["uuid"]),
"email": top_match["email"],
"name": top_match["name"],
"confidence": top_match["confidence"],
"scope": f"team of {manager_identifier}"
}
# Single match but low confidence
if len(matches) == 1:
return {
"action": "verify",
"message": f"Found one possible team member. Please verify:",
"candidate": {
"name": top_match["name"],
"email": top_match["email"],
"confidence": top_match["confidence"]
},
"scope": f"team of {manager_identifier}",
"instruction": "Use confirm_user with the email to confirm this is the correct user"
}
# Multiple matches
return {
"action": "disambiguate",
"message": f"Found {len(matches)} team members matching '{name_or_email}':",
"candidates": [
{
"name": m["name"],
"email": m["email"],
"confidence": m["confidence"]
}
for m in matches[:5]
],
"scope": f"team of {manager_identifier}",
"instruction": "Ask the user which one is correct, then use confirm_user with their email"
}
except Exception as e:
logger.error(f"Error in resolve_user_in_team: {e}")
return {
"action": "error",
"message": f"An error occurred: {str(e)}"
}