"""
Database connection and query utilities.
This module handles the PostgreSQL connection pool and provides
query functions for user resolution.
"""
import logging
import os
from typing import Any
import asyncpg
logger = logging.getLogger(__name__)
# Global pool reference (initialized via init_pool, closed via close_pool)
_db_pool: asyncpg.Pool | None = None
async def get_pool() -> asyncpg.Pool:
"""Get the database connection pool."""
if _db_pool is None:
raise RuntimeError("Database pool not initialized. Call init_pool() first.")
return _db_pool
async def init_pool() -> None:
"""Initialize the database connection pool."""
global _db_pool
# Build connection config from environment
db_config = {
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", "5432")),
"database": os.getenv("DB_NAME", "postgres"),
"user": os.getenv("DB_USER", "postgres"),
"password": os.getenv("DB_PASSWORD", ""),
}
logger.info(f"Connecting to database at {db_config['host']}:{db_config['port']}")
_db_pool = await asyncpg.create_pool(
**db_config,
min_size=2,
max_size=10,
command_timeout=30,
)
logger.info("Database pool initialized successfully")
# Test connection
async with _db_pool.acquire() as conn:
version = await conn.fetchval("SELECT version()")
logger.info(f"Connected to: {version[:50]}...")
async def close_pool() -> None:
"""Close the database connection pool."""
global _db_pool
if _db_pool:
logger.info("Closing database pool...")
await _db_pool.close()
_db_pool = None
logger.info("Database pool closed")
async def search_users(query: str) -> list[dict[str, Any]]:
"""
Search for users by name or email using fuzzy matching.
Returns matching users with confidence scores.
Args:
query: The search term (name or email)
Returns:
List of matching users with uuid, name, email, confidence, and match_type
"""
pool = await get_pool()
# Normalize the query
normalized_query = query.strip().lower()
async with pool.acquire() as conn:
# Check if pg_trgm extension is available for fuzzy matching
has_trgm = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm')"
)
# print(f'has_trgm: {has_trgm}')
if has_trgm:
# Use trigram similarity for fuzzy matching
# Subquery ensures unique users (same user may appear multiple times due to manager relationships)
results = await conn.fetch("""
SELECT * FROM (
SELECT DISTINCT ON (user_uuid, user_email)
user_uuid,
user_name,
user_email,
GREATEST(
similarity(LOWER(user_name), $1),
similarity(LOWER(user_email), $1)
) as confidence,
CASE
WHEN LOWER(user_email) = $1 THEN 'email_exact'
WHEN LOWER(user_name) = $1 THEN 'name_exact'
WHEN similarity(LOWER(user_email), $1) > similarity(LOWER(user_name), $1)
THEN 'email_fuzzy'
ELSE 'name_fuzzy'
END as match_type
FROM users_and_managers
WHERE
similarity(LOWER(user_name), $1) > 0.2
OR similarity(LOWER(user_email), $1) > 0.2
OR LOWER(user_name) LIKE '%' || $1 || '%'
OR LOWER(user_email) LIKE '%' || $1 || '%'
ORDER BY user_uuid, user_email
) AS unique_users
ORDER BY confidence DESC
LIMIT 10
""", normalized_query)
else:
# Fallback to ILIKE for basic matching
# Subquery ensures unique users (same user may appear multiple times due to manager relationships)
results = await conn.fetch("""
SELECT * FROM (
SELECT DISTINCT ON (user_uuid, user_email)
user_uuid,
user_name,
user_email,
CASE
WHEN LOWER(user_email) = $1 THEN 1.0
WHEN LOWER(user_name) = $1 THEN 1.0
WHEN LOWER(user_name) ILIKE '%' || $1 || '%' THEN 0.6
WHEN LOWER(user_email) ILIKE '%' || $1 || '%' THEN 0.6
ELSE 0.3
END as confidence,
CASE
WHEN LOWER(user_email) = $1 THEN 'email_exact'
WHEN LOWER(user_name) = $1 THEN 'name_exact'
WHEN LOWER(user_email) ILIKE '%' || $1 || '%' THEN 'email_fuzzy'
ELSE 'name_fuzzy'
END as match_type
FROM users_and_managers
WHERE
LOWER(user_name) ILIKE '%' || $1 || '%'
OR LOWER(user_email) ILIKE '%' || $1 || '%'
ORDER BY user_uuid, user_email
) AS unique_users
ORDER BY confidence DESC
LIMIT 10
""", normalized_query)
return [
{
"uuid": str(row["user_uuid"]),
"name": row["user_name"],
"email": row["user_email"],
"confidence": float(row["confidence"]),
"match_type": row["match_type"],
}
for row in results
]
async def get_user_by_email(email: str) -> dict[str, Any] | None:
"""
Get a user by exact email match.
Args:
email: The email address to search for
Returns:
User dict with uuid, name, email or None if not found
"""
pool = await get_pool()
async with pool.acquire() as conn:
# DISTINCT ensures we get one unique user even if they appear multiple times
row = await conn.fetchrow("""
SELECT DISTINCT user_uuid, user_name, user_email
FROM users_and_managers
WHERE LOWER(user_email) = LOWER($1)
LIMIT 1
""", email.strip())
if row:
return {
"uuid": str(row["user_uuid"]),
"name": row["user_name"],
"email": row["user_email"],
}
return None
async def get_user_by_uuid(user_uuid: str) -> dict[str, Any] | None:
"""
Get a user by their UUID.
Args:
user_uuid: The user's UUID
Returns:
User dict with uuid, name, email, managers, and client or None if not found
"""
pool = await get_pool()
async with pool.acquire() as conn:
# Get user basic info
rows = await conn.fetch("""
SELECT
user_uuid, user_name, user_email,
manager_uuid, manager_name, manager_email,
remote_manager_uuid, remote_manager_name, remote_manager_email,
client_uuid, client_name
FROM users_and_managers
WHERE user_uuid = $1::uuid
""", user_uuid)
if not rows:
return None
# Build user info with all managers and clients
first_row = rows[0]
managers = set()
remote_managers = set()
clients = set()
for row in rows:
if row["manager_uuid"]:
managers.add((
str(row["manager_uuid"]),
row["manager_name"],
row["manager_email"]
))
if row["remote_manager_uuid"]:
remote_managers.add((
str(row["remote_manager_uuid"]),
row["remote_manager_name"],
row["remote_manager_email"]
))
if row["client_uuid"]:
clients.add((str(row["client_uuid"]), row["client_name"]))
return {
"uuid": str(first_row["user_uuid"]),
"name": first_row["user_name"],
"email": first_row["user_email"],
"managers": [
{"uuid": m[0], "name": m[1], "email": m[2]}
for m in managers
],
"remote_managers": [
{"uuid": m[0], "name": m[1], "email": m[2]}
for m in remote_managers
],
"clients": [
{"uuid": c[0], "name": c[1]}
for c in clients
]
}
async def get_team_members_by_manager(
manager_identifier: str,
identifier_type: str = "email"
) -> list[dict[str, Any]]:
"""
Get all team members under a manager.
Args:
manager_identifier: Manager's email, uuid, or name
identifier_type: "email", "uuid", or "name"
Returns:
List of users with uuid, name, email
"""
pool = await get_pool()
async with pool.acquire() as conn:
if identifier_type == "uuid":
# Search in both manager and remote_manager columns
rows = await conn.fetch("""
SELECT DISTINCT user_uuid, user_name, user_email
FROM users_and_managers
WHERE manager_uuid = $1::uuid OR remote_manager_uuid = $1::uuid
ORDER BY user_name
""", manager_identifier)
elif identifier_type == "email":
rows = await conn.fetch("""
SELECT DISTINCT user_uuid, user_name, user_email
FROM users_and_managers
WHERE LOWER(manager_email) = LOWER($1)
OR LOWER(remote_manager_email) = LOWER($1)
ORDER BY user_name
""", manager_identifier.strip())
else: # name - fuzzy search
rows = await conn.fetch("""
SELECT DISTINCT user_uuid, user_name, user_email
FROM users_and_managers
WHERE LOWER(manager_name) ILIKE '%' || $1 || '%'
OR LOWER(remote_manager_name) ILIKE '%' || $1 || '%'
ORDER BY user_name
""", manager_identifier.strip().lower())
return [
{
"uuid": str(row["user_uuid"]),
"name": row["user_name"],
"email": row["user_email"]
}
for row in rows
]
async def get_managers_of_user(user_identifier: str, identifier_type: str = "email") -> dict[str, Any] | None:
"""
Get all managers of a specific user.
Args:
user_identifier: User's email or uuid
identifier_type: "email" or "uuid"
Returns:
Dict with user info and their managers (primary and remote)
"""
pool = await get_pool()
async with pool.acquire() as conn:
if identifier_type == "uuid":
rows = await conn.fetch("""
SELECT
user_uuid, user_name, user_email,
manager_uuid, manager_name, manager_email,
remote_manager_uuid, remote_manager_name, remote_manager_email,
client_uuid, client_name
FROM users_and_managers
WHERE user_uuid = $1::uuid
""", user_identifier)
else: # email
rows = await conn.fetch("""
SELECT
user_uuid, user_name, user_email,
manager_uuid, manager_name, manager_email,
remote_manager_uuid, remote_manager_name, remote_manager_email,
client_uuid, client_name
FROM users_and_managers
WHERE LOWER(user_email) = LOWER($1)
""", user_identifier.strip())
if not rows:
return None
first_row = rows[0]
managers = set()
remote_managers = set()
for row in rows:
if row["manager_uuid"]:
managers.add((
str(row["manager_uuid"]),
row["manager_name"],
row["manager_email"]
))
if row["remote_manager_uuid"]:
remote_managers.add((
str(row["remote_manager_uuid"]),
row["remote_manager_name"],
row["remote_manager_email"]
))
return {
"user": {
"uuid": str(first_row["user_uuid"]),
"name": first_row["user_name"],
"email": first_row["user_email"]
},
"managers": [
{"uuid": m[0], "name": m[1], "email": m[2]}
for m in managers
],
"remote_managers": [
{"uuid": m[0], "name": m[1], "email": m[2]}
for m in remote_managers
]
}
async def search_users_in_team(
query: str,
manager_identifier: str,
manager_type: str = "email"
) -> list[dict[str, Any]]:
"""
Search for users by name/email within a specific manager's team only.
Args:
query: The search term (name or email)
manager_identifier: Manager's email or uuid
manager_type: "email" or "uuid"
Returns:
List of matching users with uuid, name, email, confidence
"""
pool = await get_pool()
normalized_query = query.strip().lower()
async with pool.acquire() as conn:
# Check if pg_trgm extension is available
has_trgm = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm')"
)
# Build manager filter
if manager_type == "uuid":
manager_filter = "(manager_uuid = $2::uuid OR remote_manager_uuid = $2::uuid)"
else:
manager_filter = "(LOWER(manager_email) = LOWER($2) OR LOWER(remote_manager_email) = LOWER($2))"
if has_trgm:
results = await conn.fetch(f"""
SELECT * FROM (
SELECT DISTINCT ON (user_uuid, user_email)
user_uuid,
user_name,
user_email,
GREATEST(
similarity(LOWER(user_name), $1),
similarity(LOWER(user_email), $1)
) as confidence,
CASE
WHEN LOWER(user_email) = $1 THEN 'email_exact'
WHEN LOWER(user_name) = $1 THEN 'name_exact'
WHEN similarity(LOWER(user_email), $1) > similarity(LOWER(user_name), $1)
THEN 'email_fuzzy'
ELSE 'name_fuzzy'
END as match_type
FROM users_and_managers
WHERE {manager_filter}
AND (
similarity(LOWER(user_name), $1) > 0.2
OR similarity(LOWER(user_email), $1) > 0.2
OR LOWER(user_name) LIKE '%' || $1 || '%'
OR LOWER(user_email) LIKE '%' || $1 || '%'
)
ORDER BY user_uuid, user_email
) AS unique_users
ORDER BY confidence DESC
LIMIT 10
""", normalized_query, manager_identifier)
else:
results = await conn.fetch(f"""
SELECT * FROM (
SELECT DISTINCT ON (user_uuid, user_email)
user_uuid,
user_name,
user_email,
CASE
WHEN LOWER(user_email) = $1 THEN 1.0
WHEN LOWER(user_name) = $1 THEN 1.0
WHEN LOWER(user_name) ILIKE '%' || $1 || '%' THEN 0.6
WHEN LOWER(user_email) ILIKE '%' || $1 || '%' THEN 0.6
ELSE 0.3
END as confidence,
CASE
WHEN LOWER(user_email) = $1 THEN 'email_exact'
WHEN LOWER(user_name) = $1 THEN 'name_exact'
WHEN LOWER(user_email) ILIKE '%' || $1 || '%' THEN 'email_fuzzy'
ELSE 'name_fuzzy'
END as match_type
FROM users_and_managers
WHERE {manager_filter}
AND (
LOWER(user_name) ILIKE '%' || $1 || '%'
OR LOWER(user_email) ILIKE '%' || $1 || '%'
)
ORDER BY user_uuid, user_email
) AS unique_users
ORDER BY confidence DESC
LIMIT 10
""", normalized_query, manager_identifier)
return [
{
"uuid": str(row["user_uuid"]),
"name": row["user_name"],
"email": row["user_email"],
"confidence": float(row["confidence"]),
"match_type": row["match_type"],
}
for row in results
]