"""
Calendar database queries.
This module contains SQL queries for calendar tables in the silver schema:
- calendar_user_day_facts_s5: Daily aggregates per user
- calendar_user_event_day_slices_s4: User-event-day junction
- calendar_events_s1: Master event details
"""
import logging
from datetime import date, timedelta
from typing import Any, Literal
from src.database import get_pool
logger = logging.getLogger(__name__)
# =============================================================================
# S4 + S1 QUERIES (Meeting-level data)
# =============================================================================
async def get_user_meetings_db(
user_identifier: str,
start_date: date,
end_date: date,
identifier_type: str = "email",
sort_by: str = "start_time",
order: str = "desc",
limit: int = 20,
meeting_type: str | None = None,
is_external: bool | None = None,
is_recurring: bool | None = None,
has_agenda: bool | None = None,
min_duration: int | None = None,
min_attendees: int | None = None,
search: str | None = None,
) -> list[dict[str, Any]]:
"""
Query user's meetings from S4 + S1 with flexible filtering and sorting.
Args:
user_identifier: User's email or UUID
start_date: Start of date range
end_date: End of date range
identifier_type: "email" or "uuid"
sort_by: "start_time", "duration", "attendees", "agenda_quality"
order: "asc" or "desc"
limit: Max number of results
meeting_type: Filter by type (1_1, STANDUP, REVIEW, PLANNING, EXTERNAL, OTHER)
is_external: Filter by external flag
is_recurring: Filter by recurring flag
has_agenda: Filter by agenda presence
min_duration: Minimum duration in minutes
min_attendees: Minimum attendee count
search: Title keyword search (case-insensitive)
Returns:
List of meetings with details from S4 + S1
"""
pool = await get_pool()
# Build WHERE conditions
conditions = ["s4.slice_day >= $1", "s4.slice_day <= $2"]
params: list[Any] = [start_date, end_date]
param_idx = 3
# User filter
if identifier_type == "uuid":
conditions.append(f"s4.user_id = ${param_idx}::uuid")
else:
conditions.append(f"LOWER(s4.email) = LOWER(${param_idx})")
params.append(user_identifier.strip() if identifier_type == "email" else user_identifier)
param_idx += 1
# Optional filters
if meeting_type:
conditions.append(f"s4.meeting_type_code = ${param_idx}")
params.append(meeting_type)
param_idx += 1
if is_external is not None:
conditions.append(f"s4.is_external = ${param_idx}")
params.append(is_external)
param_idx += 1
if is_recurring is not None:
conditions.append(f"s4.is_recurring = ${param_idx}")
params.append(is_recurring)
param_idx += 1
if has_agenda is not None:
conditions.append(f"s4.has_agenda = ${param_idx}")
params.append(has_agenda)
param_idx += 1
if min_duration:
conditions.append(f"s4.slice_duration_min >= ${param_idx}")
params.append(min_duration)
param_idx += 1
if min_attendees:
conditions.append(f"s4.attendee_count >= ${param_idx}")
params.append(min_attendees)
param_idx += 1
if search:
conditions.append(f"LOWER(s1.title) LIKE '%' || LOWER(${param_idx}) || '%'")
params.append(search)
param_idx += 1
# Build ORDER BY
sort_map = {
"start_time": "s4.slice_start_ts",
"duration": "s4.slice_duration_min",
"attendees": "s4.attendee_count",
"agenda_quality": "s4.agenda_quality_index",
}
order_col = sort_map.get(sort_by, "s4.slice_start_ts")
order_dir = "DESC" if order.lower() == "desc" else "ASC"
where_clause = " AND ".join(conditions)
query = f"""
SELECT
s4.event_id,
s4.user_id,
s4.email,
s4.slice_day,
s4.slice_start_ts,
s4.slice_end_ts,
s4.slice_duration_min,
s4.meeting_type_code,
s4.is_recurring,
s4.series_id,
s4.is_external,
s4.is_large_meeting,
s4.has_agenda,
s4.agenda_quality_index,
s1.title,
s1.organizer_email,
s1.attendee_count,
s1.internal_attendee_count,
s1.external_attendee_count,
s1.duration_min as total_duration_min,
s1.attendees_json
FROM silver.calendar_user_event_day_slices_s4 s4
JOIN silver.calendar_events_s1 s1 ON s4.event_id = s1.event_id
WHERE {where_clause}
ORDER BY {order_col} {order_dir}
LIMIT {limit}
"""
async with pool.acquire() as conn:
rows = await conn.fetch(query, *params)
return [dict(row) for row in rows]
async def get_meeting_details_db(event_id: str) -> dict[str, Any] | None:
"""
Get full details of a specific meeting from S1.
Args:
event_id: The event ID
Returns:
Full meeting details or None if not found
"""
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT
event_id,
title,
organizer_email,
start_ts,
end_ts,
duration_min,
day,
attendees_json,
attendee_count,
internal_attendee_count,
external_attendee_count,
is_mail_group_present,
meeting_type_code,
is_recurring,
series_id,
instance_key,
is_external,
is_large_meeting,
has_agenda,
agenda_quality_index,
agenda_quality_signals,
tagged_priorities
FROM silver.calendar_events_s1
WHERE event_id = $1
""", event_id)
if not row:
return None
return dict(row)
async def get_user_meeting_extremes_db(
user_identifier: str,
start_date: date,
end_date: date,
identifier_type: str = "email"
) -> dict[str, Any]:
"""
Get statistical extremes (longest, shortest, largest meetings) for a user.
Args:
user_identifier: User's email or UUID
start_date: Start of date range
end_date: End of date range
identifier_type: "email" or "uuid"
Returns:
Dictionary with longest, shortest, largest meetings
"""
pool = await get_pool()
# Build user condition
if identifier_type == "uuid":
user_cond = "s4.user_id = $3::uuid"
else:
user_cond = "LOWER(s4.email) = LOWER($3)"
user_param = user_identifier.strip() if identifier_type == "email" else user_identifier
async with pool.acquire() as conn:
# Get longest meeting
longest = await conn.fetchrow(f"""
SELECT
s4.event_id,
s4.slice_day as date,
s4.slice_duration_min as duration_min,
s4.attendee_count,
s1.title
FROM silver.calendar_user_event_day_slices_s4 s4
JOIN silver.calendar_events_s1 s1 ON s4.event_id = s1.event_id
WHERE s4.slice_day >= $1 AND s4.slice_day <= $2 AND {user_cond}
ORDER BY s4.slice_duration_min DESC
LIMIT 1
""", start_date, end_date, user_param)
# Get shortest meeting
shortest = await conn.fetchrow(f"""
SELECT
s4.event_id,
s4.slice_day as date,
s4.slice_duration_min as duration_min,
s4.attendee_count,
s1.title
FROM silver.calendar_user_event_day_slices_s4 s4
JOIN silver.calendar_events_s1 s1 ON s4.event_id = s1.event_id
WHERE s4.slice_day >= $1 AND s4.slice_day <= $2 AND {user_cond}
ORDER BY s4.slice_duration_min ASC
LIMIT 1
""", start_date, end_date, user_param)
# Get largest meeting (most attendees)
largest = await conn.fetchrow(f"""
SELECT
s4.event_id,
s4.slice_day as date,
s4.slice_duration_min as duration_min,
s4.attendee_count,
s1.title
FROM silver.calendar_user_event_day_slices_s4 s4
JOIN silver.calendar_events_s1 s1 ON s4.event_id = s1.event_id
WHERE s4.slice_day >= $1 AND s4.slice_day <= $2 AND {user_cond}
ORDER BY s4.attendee_count DESC
LIMIT 1
""", start_date, end_date, user_param)
# Get busiest day
busiest_day = await conn.fetchrow(f"""
SELECT
slice_day as date,
SUM(slice_duration_min) as meeting_minutes,
COUNT(*) as meetings_count
FROM silver.calendar_user_event_day_slices_s4 s4
WHERE s4.slice_day >= $1 AND s4.slice_day <= $2 AND {user_cond}
GROUP BY slice_day
ORDER BY meeting_minutes DESC
LIMIT 1
""", start_date, end_date, user_param)
# Get lightest day (with at least 1 meeting)
lightest_day = await conn.fetchrow(f"""
SELECT
slice_day as date,
SUM(slice_duration_min) as meeting_minutes,
COUNT(*) as meetings_count
FROM silver.calendar_user_event_day_slices_s4 s4
WHERE s4.slice_day >= $1 AND s4.slice_day <= $2 AND {user_cond}
GROUP BY slice_day
ORDER BY meeting_minutes ASC
LIMIT 1
""", start_date, end_date, user_param)
return {
"longest_meeting": dict(longest) if longest else None,
"shortest_meeting": dict(shortest) if shortest else None,
"largest_meeting": dict(largest) if largest else None,
"busiest_day": dict(busiest_day) if busiest_day else None,
"lightest_day": dict(lightest_day) if lightest_day else None,
}
async def get_user_recurring_summary_db(
user_identifier: str,
start_date: date,
end_date: date,
identifier_type: str = "email",
limit: int = 5
) -> dict[str, Any]:
"""
Get recurring meeting analysis for a user.
Args:
user_identifier: User's email or UUID
start_date: Start of date range
end_date: End of date range
identifier_type: "email" or "uuid"
limit: Number of top recurring series to return
Returns:
Recurring meeting statistics and top series
"""
pool = await get_pool()
# Build user condition
if identifier_type == "uuid":
user_cond = "s4.user_id = $3::uuid"
else:
user_cond = "LOWER(s4.email) = LOWER($3)"
user_param = user_identifier.strip() if identifier_type == "email" else user_identifier
async with pool.acquire() as conn:
# Get recurring stats
recurring_stats = await conn.fetchrow(f"""
SELECT
COUNT(*) as recurring_count,
COALESCE(SUM(slice_duration_min), 0) as recurring_minutes
FROM silver.calendar_user_event_day_slices_s4 s4
WHERE s4.slice_day >= $1 AND s4.slice_day <= $2 AND {user_cond}
AND s4.is_recurring = true
""", start_date, end_date, user_param)
# Get total stats for percentage
total_stats = await conn.fetchrow(f"""
SELECT
COUNT(*) as total_count,
COALESCE(SUM(slice_duration_min), 0) as total_minutes
FROM silver.calendar_user_event_day_slices_s4 s4
WHERE s4.slice_day >= $1 AND s4.slice_day <= $2 AND {user_cond}
""", start_date, end_date, user_param)
# Get top recurring series
top_series = await conn.fetch(f"""
SELECT
s4.series_id,
s1.title,
COUNT(*) as occurrences,
SUM(s4.slice_duration_min) as total_minutes
FROM silver.calendar_user_event_day_slices_s4 s4
JOIN silver.calendar_events_s1 s1 ON s4.event_id = s1.event_id
WHERE s4.slice_day >= $1 AND s4.slice_day <= $2 AND {user_cond}
AND s4.is_recurring = true AND s4.series_id IS NOT NULL
GROUP BY s4.series_id, s1.title
ORDER BY total_minutes DESC
LIMIT {limit}
""", start_date, end_date, user_param)
total_minutes = total_stats["total_minutes"] if total_stats else 0
recurring_minutes = recurring_stats["recurring_minutes"] if recurring_stats else 0
return {
"recurring_count": recurring_stats["recurring_count"] if recurring_stats else 0,
"recurring_minutes": recurring_minutes,
"recurring_pct": round(recurring_minutes / total_minutes * 100, 1) if total_minutes > 0 else 0,
"top_series": [dict(s) for s in top_series],
}
async def get_user_meeting_quality_db(
user_identifier: str,
start_date: date,
end_date: date,
identifier_type: str = "email"
) -> dict[str, Any]:
"""
Get meeting quality statistics for a user.
Args:
user_identifier: User's email or UUID
start_date: Start of date range
end_date: End of date range
identifier_type: "email" or "uuid"
Returns:
Quality statistics (agenda coverage, quality index, etc.)
"""
pool = await get_pool()
# Build user condition
if identifier_type == "uuid":
user_cond = "s4.user_id = $3::uuid"
else:
user_cond = "LOWER(s4.email) = LOWER($3)"
user_param = user_identifier.strip() if identifier_type == "email" else user_identifier
async with pool.acquire() as conn:
stats = await conn.fetchrow(f"""
SELECT
COUNT(*) as total_meetings,
SUM(CASE WHEN s4.has_agenda THEN 1 ELSE 0 END) as with_agenda,
SUM(CASE WHEN NOT s4.has_agenda THEN 1 ELSE 0 END) as without_agenda,
AVG(s4.agenda_quality_index) as avg_agenda_quality,
SUM(CASE WHEN s4.is_large_meeting THEN 1 ELSE 0 END) as large_meetings
FROM silver.calendar_user_event_day_slices_s4 s4
WHERE s4.slice_day >= $1 AND s4.slice_day <= $2 AND {user_cond}
""", start_date, end_date, user_param)
if not stats or stats["total_meetings"] == 0:
return {
"total_meetings": 0,
"with_agenda": 0,
"without_agenda": 0,
"agenda_coverage_pct": 0,
"avg_agenda_quality": 0,
"large_meetings": 0,
}
return {
"total_meetings": stats["total_meetings"],
"with_agenda": stats["with_agenda"] or 0,
"without_agenda": stats["without_agenda"] or 0,
"agenda_coverage_pct": round((stats["with_agenda"] or 0) / stats["total_meetings"] * 100, 1),
"avg_agenda_quality": round(float(stats["avg_agenda_quality"] or 0), 2),
"large_meetings": stats["large_meetings"] or 0,
}
async def get_calendar_day_for_user(
user_identifier: str,
target_date: date,
identifier_type: str = "email"
) -> dict[str, Any] | None:
"""
Get calendar metrics for a specific user on a specific day.
Args:
user_identifier: User's email or UUID
target_date: The date to query
identifier_type: "email" or "uuid"
Returns:
Dictionary with all calendar metrics for the day, or None if not found
"""
pool = await get_pool()
async with pool.acquire() as conn:
if identifier_type == "uuid":
row = await conn.fetchrow("""
SELECT
c.*,
u.user_name,
u.user_email
FROM silver.calendar_user_day_facts_s5 c
LEFT JOIN users_and_managers u ON c.user_id = u.user_uuid
WHERE c.user_id = $1::uuid AND c.day = $2
LIMIT 1
""", user_identifier, target_date)
else:
row = await conn.fetchrow("""
SELECT
c.*,
u.user_name,
u.user_email
FROM silver.calendar_user_day_facts_s5 c
LEFT JOIN users_and_managers u ON c.user_id = u.user_uuid
WHERE LOWER(c.email) = LOWER($1) AND c.day = $2
LIMIT 1
""", user_identifier.strip(), target_date)
if not row:
return None
return dict(row)
async def get_calendar_range_for_user(
user_identifier: str,
start_date: date,
end_date: date,
identifier_type: str = "email"
) -> list[dict[str, Any]]:
"""
Get calendar metrics for a user over a date range.
Args:
user_identifier: User's email or UUID
start_date: Start of date range
end_date: End of date range
identifier_type: "email" or "uuid"
Returns:
List of daily calendar metrics
"""
pool = await get_pool()
async with pool.acquire() as conn:
if identifier_type == "uuid":
rows = await conn.fetch("""
SELECT
c.*,
u.user_name,
u.user_email
FROM silver.calendar_user_day_facts_s5 c
LEFT JOIN users_and_managers u ON c.user_id = u.user_uuid
WHERE c.user_id = $1::uuid
AND c.day >= $2
AND c.day <= $3
ORDER BY c.day
""", user_identifier, start_date, end_date)
else:
rows = await conn.fetch("""
SELECT
c.*,
u.user_name,
u.user_email
FROM silver.calendar_user_day_facts_s5 c
LEFT JOIN users_and_managers u ON c.user_id = u.user_uuid
WHERE LOWER(c.email) = LOWER($1)
AND c.day >= $2
AND c.day <= $3
ORDER BY c.day
""", user_identifier.strip(), start_date, end_date)
return [dict(row) for row in rows]
async def get_user_info_by_identifier(
user_identifier: str,
identifier_type: str = "email"
) -> dict[str, Any] | None:
"""
Get basic user info (uuid, name, email) by identifier.
Args:
user_identifier: User's email or UUID
identifier_type: "email" or "uuid"
Returns:
Dictionary with uuid, name, email or None
"""
pool = await get_pool()
async with pool.acquire() as conn:
if identifier_type == "uuid":
row = await conn.fetchrow("""
SELECT DISTINCT user_uuid, user_name, user_email
FROM users_and_managers
WHERE user_uuid = $1::uuid
LIMIT 1
""", user_identifier)
else:
row = await conn.fetchrow("""
SELECT DISTINCT user_uuid, user_name, user_email
FROM users_and_managers
WHERE LOWER(user_email) = LOWER($1)
LIMIT 1
""", user_identifier.strip())
if not row:
return None
return {
"uuid": str(row["user_uuid"]),
"name": row["user_name"],
"email": row["user_email"]
}