"""
Calendar insights business logic.
This module contains the core logic for the 3 calendar tools:
1. get_user_calendar_insights - Complete dashboard for one user
2. query_user_meetings - Filter/sort/search meetings
3. get_meeting_details - Full details of a specific meeting
"""
import logging
from datetime import date, timedelta
from typing import Any
from src.database_calendar import (
# S5 queries (daily aggregates)
get_calendar_day_for_user,
get_calendar_range_for_user,
get_user_info_by_identifier,
# S4 + S1 queries (meeting-level)
get_user_meetings_db,
get_meeting_details_db,
get_user_meeting_extremes_db,
get_user_recurring_summary_db,
get_user_meeting_quality_db,
)
logger = logging.getLogger(__name__)
# Thresholds for health assessment
MEETING_LOAD_HIGH_THRESHOLD = 70 # % - above this is concerning
MEETING_LOAD_MODERATE_THRESHOLD = 50 # % - above this is moderate
FOCUS_LOW_THRESHOLD = 60 # minutes - below this is low focus
def _determine_identifier_type(identifier: str) -> str:
"""Determine if identifier is email or UUID."""
if "@" in identifier:
return "email"
elif len(identifier) == 36 and identifier.count("-") == 4:
return "uuid"
return "email"
def _get_load_status(meeting_load_pct: float | None) -> str:
"""Determine load status from meeting load percentage."""
if meeting_load_pct is None:
return "unknown"
if meeting_load_pct >= MEETING_LOAD_HIGH_THRESHOLD:
return "heavy"
if meeting_load_pct >= MEETING_LOAD_MODERATE_THRESHOLD:
return "moderate"
if meeting_load_pct >= 20:
return "light"
return "minimal"
def _assess_health(
avg_meeting_load: float,
avg_focus_minutes: float,
high_load_days: int,
low_focus_days: int,
total_working_days: int,
recurring_pct: float,
agenda_coverage_pct: float,
) -> dict[str, Any]:
"""
Analyze calendar health and generate concerns/suggestions.
"""
concerns = []
positives = []
suggestions = []
# Analyze meeting load
if avg_meeting_load >= MEETING_LOAD_HIGH_THRESHOLD:
concerns.append(f"Average meeting load is {avg_meeting_load:.0f}% (above {MEETING_LOAD_HIGH_THRESHOLD}% threshold)")
suggestions.append("Review recurring meetings for optimization opportunities")
elif avg_meeting_load < MEETING_LOAD_MODERATE_THRESHOLD:
positives.append(f"Healthy meeting load at {avg_meeting_load:.0f}%")
# Analyze high load days
if total_working_days > 0 and high_load_days >= total_working_days * 0.5:
concerns.append(f"High meeting load on {high_load_days} of {total_working_days} working days")
# Analyze focus time
if avg_focus_minutes < FOCUS_LOW_THRESHOLD:
concerns.append(f"Average focus time is only {avg_focus_minutes:.0f} minutes per day")
suggestions.append("Consider blocking dedicated focus time in calendar")
if total_working_days > 0 and low_focus_days >= total_working_days * 0.5:
concerns.append(f"Low focus time on {low_focus_days} of {total_working_days} working days")
elif low_focus_days == 0 and total_working_days > 0:
positives.append("No days flagged for low focus time")
# Analyze recurring meetings
if recurring_pct > 60:
concerns.append(f"Recurring meetings account for {recurring_pct:.0f}% of total meeting time")
suggestions.append("Audit recurring meetings - some may no longer be needed")
# Analyze agenda quality
if agenda_coverage_pct < 50:
concerns.append(f"Only {agenda_coverage_pct:.0f}% of meetings have agendas")
suggestions.append("Encourage adding agendas to improve meeting effectiveness")
elif agenda_coverage_pct >= 70:
positives.append(f"Good agenda coverage at {agenda_coverage_pct:.0f}%")
# Determine overall status
if len(concerns) == 0:
health_status = "healthy"
elif len(concerns) <= 2 and avg_meeting_load < MEETING_LOAD_HIGH_THRESHOLD:
health_status = "warning"
else:
health_status = "at_risk"
return {
"status": health_status,
"concerns": concerns if concerns else ["No significant concerns detected"],
"positives": positives,
"suggestions": suggestions,
}
# =============================================================================
# TOOL 1: get_user_calendar_insights
# =============================================================================
async def get_user_calendar_insights_impl(
user_identifier: str,
date_str: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
include_daily: bool = False,
include_meetings: bool = False,
) -> dict[str, Any]:
"""
Get comprehensive calendar insights for a user.
Unified dashboard tool that provides health assessment, metrics,
statistical extremes, recurring analysis, and quality metrics.
Args:
user_identifier: User's email or UUID
date_str: Single date (YYYY-MM-DD) - returns detailed day view
start_date: Range start (if date_str not provided)
end_date: Range end
include_daily: Include daily breakdown array
include_meetings: Include actual meeting list
Returns:
Complete calendar dashboard
"""
if not user_identifier or not user_identifier.strip():
return {
"action": "error",
"message": "User identifier cannot be empty"
}
try:
identifier = user_identifier.strip()
identifier_type = _determine_identifier_type(identifier)
# Get user info
user_info = await get_user_info_by_identifier(identifier, identifier_type)
if not user_info:
return {
"action": "not_found",
"message": f"No user found with identifier '{user_identifier}'"
}
# Determine date range
if date_str:
# Single day mode
try:
query_date = date.fromisoformat(date_str)
except ValueError:
return {
"action": "error",
"message": f"Invalid date format: '{date_str}'. Use YYYY-MM-DD."
}
query_start = query_date
query_end = query_date
is_single_day = True
else:
# Range mode
if not start_date or not end_date:
# Default to last 7 days
query_end = date.today()
query_start = query_end - timedelta(days=6)
else:
try:
query_start = date.fromisoformat(start_date)
query_end = date.fromisoformat(end_date)
except ValueError as e:
return {
"action": "error",
"message": f"Invalid date format. Use YYYY-MM-DD. Error: {e}"
}
if query_start > query_end:
return {
"action": "error",
"message": "start_date must be before or equal to end_date"
}
if (query_end - query_start).days > 90:
return {
"action": "error",
"message": "Date range cannot exceed 90 days"
}
is_single_day = False
# Get S5 data (daily aggregates)
if is_single_day:
s5_data = await get_calendar_day_for_user(identifier, query_start, identifier_type)
s5_list = [s5_data] if s5_data else []
else:
s5_list = await get_calendar_range_for_user(identifier, query_start, query_end, identifier_type)
if not s5_list:
return {
"action": "no_data",
"user": user_info,
"period": {"start": str(query_start), "end": str(query_end)},
"message": "No calendar data available for this period"
}
# Filter to working days for metrics
working_days = [d for d in s5_list if not d.get("is_weekend") and not d.get("is_holiday")]
total_working_days = len(working_days)
if total_working_days == 0:
return {
"action": "no_data",
"user": user_info,
"period": {"start": str(query_start), "end": str(query_end)},
"message": "No working days in this period (weekends/holidays only)"
}
# Calculate S5-based metrics
meeting_loads = [float(d["meeting_load_pct"]) for d in working_days if d.get("meeting_load_pct") is not None]
avg_meeting_load = sum(meeting_loads) / len(meeting_loads) if meeting_loads else 0
focus_list = [d.get("focus_minutes_total", 0) or 0 for d in working_days]
avg_focus_minutes = sum(focus_list) / len(focus_list) if focus_list else 0
total_focus_minutes = sum(focus_list)
high_load_days = sum(1 for ml in meeting_loads if ml >= MEETING_LOAD_HIGH_THRESHOLD)
low_focus_days = sum(1 for d in working_days if d.get("low_focus_flag"))
# Meeting time totals
total_meeting_minutes = sum(d.get("meeting_minutes_union_work", 0) or 0 for d in working_days)
total_meetings = sum(d.get("total_meetings_count", 0) or 0 for d in working_days)
working_minutes_total = sum(d.get("working_minutes", 0) or 0 for d in working_days)
# By type breakdown from S5
by_type = {
"one_on_one": {
"count": sum(d.get("one_on_one_count", 0) or 0 for d in working_days),
"minutes": sum(d.get("one_on_one_minutes", 0) or 0 for d in working_days),
},
"standup": {
"count": sum(d.get("standup_count", 0) or 0 for d in working_days),
"minutes": sum(d.get("standup_minutes", 0) or 0 for d in working_days),
},
"review": {
"count": sum(d.get("review_count", 0) or 0 for d in working_days),
"minutes": sum(d.get("review_minutes", 0) or 0 for d in working_days),
},
"planning": {
"count": sum(d.get("planning_count", 0) or 0 for d in working_days),
"minutes": sum(d.get("planning_minutes", 0) or 0 for d in working_days),
},
"external": {
"count": sum(d.get("external_count", 0) or 0 for d in working_days),
"minutes": sum(d.get("external_minutes", 0) or 0 for d in working_days),
},
"internal": {
"count": sum(d.get("internal_count", 0) or 0 for d in working_days),
"minutes": sum(d.get("internal_minutes", 0) or 0 for d in working_days),
},
}
# Add percentage to by_type
for key in by_type:
minutes = by_type[key]["minutes"]
by_type[key]["pct"] = round(minutes / total_meeting_minutes * 100, 1) if total_meeting_minutes > 0 else 0
# Get S4+S1 data (extremes, recurring, quality)
extremes = await get_user_meeting_extremes_db(identifier, query_start, query_end, identifier_type)
recurring = await get_user_recurring_summary_db(identifier, query_start, query_end, identifier_type)
quality = await get_user_meeting_quality_db(identifier, query_start, query_end, identifier_type)
# Health assessment
health = _assess_health(
avg_meeting_load=avg_meeting_load,
avg_focus_minutes=avg_focus_minutes,
high_load_days=high_load_days,
low_focus_days=low_focus_days,
total_working_days=total_working_days,
recurring_pct=recurring.get("recurring_pct", 0),
agenda_coverage_pct=quality.get("agenda_coverage_pct", 0),
)
# Build response
result = {
"action": "resolved",
"user": user_info,
"period": {
"start": str(query_start),
"end": str(query_end),
"total_days": (query_end - query_start).days + 1,
"working_days": total_working_days,
},
# Health assessment
"health": health,
# Time metrics
"time": {
"total_meeting_hours": round(total_meeting_minutes / 60, 1),
"total_focus_hours": round(total_focus_minutes / 60, 1),
"total_working_hours": round(working_minutes_total / 60, 1),
"meeting_pct_of_work": round(total_meeting_minutes / working_minutes_total * 100, 1) if working_minutes_total > 0 else 0,
},
# Averages
"averages": {
"meeting_hours_per_day": round(total_meeting_minutes / 60 / total_working_days, 1),
"focus_minutes_per_day": round(avg_focus_minutes, 0),
"meeting_load_pct": round(avg_meeting_load, 1),
"meetings_per_day": round(total_meetings / total_working_days, 1),
},
# Counts
"counts": {
"total_meetings": total_meetings,
"high_load_days": high_load_days,
"low_focus_days": low_focus_days,
},
# By type breakdown
"by_type": by_type,
# Recurring analysis
"recurring": recurring,
# External engagement
"external": {
"count": by_type["external"]["count"],
"hours": round(by_type["external"]["minutes"] / 60, 1),
"pct": by_type["external"]["pct"],
},
# Quality metrics
"quality": quality,
# Statistical extremes
"extremes": extremes,
}
# Add daily breakdown if requested
if include_daily:
daily = []
for d in working_days:
load = float(d["meeting_load_pct"]) if d.get("meeting_load_pct") else None
daily.append({
"date": str(d["day"]),
"day_type": "holiday" if d.get("is_holiday") else ("weekend" if d.get("is_weekend") else "workday"),
"meeting_hours": round((d.get("meeting_minutes_union_work", 0) or 0) / 60, 1),
"focus_minutes": d.get("focus_minutes_total", 0) or 0,
"meeting_load_pct": round(load, 1) if load else None,
"meetings_count": d.get("total_meetings_count", 0) or 0,
"status": _get_load_status(load),
"low_focus_flag": d.get("low_focus_flag", False),
})
result["daily"] = daily
# Add meetings if requested
if include_meetings:
meetings = await get_user_meetings_db(
identifier, query_start, query_end, identifier_type,
sort_by="start_time", order="asc", limit=100
)
result["meetings"] = [
{
"event_id": m["event_id"],
"title": m.get("title", ""),
"date": str(m["slice_day"]),
"start_time": m["slice_start_ts"].strftime("%H:%M") if m.get("slice_start_ts") else None,
"duration_min": m.get("slice_duration_min", 0),
"meeting_type": m.get("meeting_type_code", ""),
"attendee_count": m.get("attendee_count", 0),
"is_recurring": m.get("is_recurring", False),
"has_agenda": m.get("has_agenda", False),
}
for m in meetings
]
return result
except Exception as e:
logger.error(f"Error in get_user_calendar_insights: {e}")
return {
"action": "error",
"message": f"An error occurred: {str(e)}"
}
# =============================================================================
# TOOL 2: query_user_meetings
# =============================================================================
async def query_user_meetings_impl(
user_identifier: str,
start_date: str,
end_date: str,
sort_by: str = "start_time",
order: str = "desc",
limit: int = 20,
meeting_type: str | None = None,
is_external: bool | None = None,
is_recurring: bool | None = None,
has_agenda: bool | None = None,
min_duration: int | None = None,
min_attendees: int | None = None,
search: str | None = None,
) -> dict[str, Any]:
"""
Query user's meetings with flexible filtering and sorting.
Args:
user_identifier: User's email or UUID
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
sort_by: "start_time", "duration", "attendees", "agenda_quality"
order: "asc" or "desc"
limit: Max results (default 20, max 100)
meeting_type: Filter by type (1_1, STANDUP, REVIEW, PLANNING, EXTERNAL, OTHER)
is_external: Filter external meetings
is_recurring: Filter recurring meetings
has_agenda: Filter by agenda presence
min_duration: Minimum duration in minutes
min_attendees: Minimum attendee count
search: Title keyword search
Returns:
List of meetings matching criteria
"""
if not user_identifier or not user_identifier.strip():
return {
"action": "error",
"message": "User identifier cannot be empty"
}
try:
identifier = user_identifier.strip()
identifier_type = _determine_identifier_type(identifier)
# Parse dates
try:
query_start = date.fromisoformat(start_date)
query_end = date.fromisoformat(end_date)
except ValueError as e:
return {
"action": "error",
"message": f"Invalid date format. Use YYYY-MM-DD. Error: {e}"
}
if query_start > query_end:
return {
"action": "error",
"message": "start_date must be before or equal to end_date"
}
# Validate parameters
valid_sort = ["start_time", "duration", "attendees", "agenda_quality"]
if sort_by not in valid_sort:
sort_by = "start_time"
if order.lower() not in ["asc", "desc"]:
order = "desc"
limit = min(max(1, limit), 100) # Clamp to 1-100
# Get user info
user_info = await get_user_info_by_identifier(identifier, identifier_type)
if not user_info:
return {
"action": "not_found",
"message": f"No user found with identifier '{user_identifier}'"
}
# Query meetings
meetings = await get_user_meetings_db(
identifier, query_start, query_end, identifier_type,
sort_by=sort_by,
order=order,
limit=limit,
meeting_type=meeting_type,
is_external=is_external,
is_recurring=is_recurring,
has_agenda=has_agenda,
min_duration=min_duration,
min_attendees=min_attendees,
search=search,
)
# Build filter description
filters_applied = []
if meeting_type:
filters_applied.append(f"type={meeting_type}")
if is_external is not None:
filters_applied.append(f"external={is_external}")
if is_recurring is not None:
filters_applied.append(f"recurring={is_recurring}")
if has_agenda is not None:
filters_applied.append(f"has_agenda={has_agenda}")
if min_duration:
filters_applied.append(f"min_duration={min_duration}")
if min_attendees:
filters_applied.append(f"min_attendees={min_attendees}")
if search:
filters_applied.append(f"search='{search}'")
# Format response
formatted_meetings = []
for m in meetings:
formatted_meetings.append({
"event_id": m["event_id"],
"title": m.get("title", ""),
"date": str(m["slice_day"]),
"start_time": m["slice_start_ts"].strftime("%H:%M") if m.get("slice_start_ts") else None,
"end_time": m["slice_end_ts"].strftime("%H:%M") if m.get("slice_end_ts") else None,
"duration_min": m.get("slice_duration_min", 0),
"meeting_type": m.get("meeting_type_code", ""),
"attendee_count": m.get("attendee_count", 0),
"organizer": m.get("organizer_email", ""),
"is_recurring": m.get("is_recurring", False),
"is_external": m.get("is_external", False),
"is_large_meeting": m.get("is_large_meeting", False),
"has_agenda": m.get("has_agenda", False),
"agenda_quality": float(m["agenda_quality_index"]) if m.get("agenda_quality_index") else None,
})
return {
"action": "resolved",
"user": user_info,
"query": {
"period": {"start": str(query_start), "end": str(query_end)},
"sort_by": sort_by,
"order": order,
"limit": limit,
"filters_applied": filters_applied if filters_applied else ["none"],
},
"total_found": len(formatted_meetings),
"meetings": formatted_meetings,
}
except Exception as e:
logger.error(f"Error in query_user_meetings: {e}")
return {
"action": "error",
"message": f"An error occurred: {str(e)}"
}
# =============================================================================
# TOOL 3: get_meeting_details
# =============================================================================
async def get_meeting_details_impl(event_id: str) -> dict[str, Any]:
"""
Get full details of a specific meeting.
Args:
event_id: The event ID (from query_user_meetings results)
Returns:
Complete meeting details including attendees, organizer, agenda quality
"""
if not event_id or not event_id.strip():
return {
"action": "error",
"message": "Event ID cannot be empty"
}
try:
details = await get_meeting_details_db(event_id.strip())
if not details:
return {
"action": "not_found",
"message": f"No meeting found with event_id '{event_id}'"
}
# Parse attendees
attendees_json = details.get("attendees_json") or []
if isinstance(attendees_json, str):
import json
try:
attendees_json = json.loads(attendees_json)
except:
attendees_json = []
# Format response
return {
"action": "resolved",
"event_id": details["event_id"],
"title": details.get("title", ""),
"organizer": details.get("organizer_email", ""),
"time": {
"start": str(details["start_ts"]) if details.get("start_ts") else None,
"end": str(details["end_ts"]) if details.get("end_ts") else None,
"date": str(details["day"]) if details.get("day") else None,
"duration_min": details.get("duration_min", 0),
},
"attendees": {
"total": details.get("attendee_count", 0),
"internal": details.get("internal_attendee_count", 0),
"external": details.get("external_attendee_count", 0),
"list": attendees_json,
},
"classification": {
"meeting_type": details.get("meeting_type_code", ""),
"is_recurring": details.get("is_recurring", False),
"is_external": details.get("is_external", False),
"is_large_meeting": details.get("is_large_meeting", False),
"is_mail_group_present": details.get("is_mail_group_present", False),
},
"quality": {
"has_agenda": details.get("has_agenda", False),
"agenda_quality_index": float(details["agenda_quality_index"]) if details.get("agenda_quality_index") else None,
"agenda_signals": details.get("agenda_quality_signals"),
},
"recurring_info": {
"series_id": details.get("series_id"),
"instance_key": details.get("instance_key"),
} if details.get("is_recurring") else None,
"tagged_priorities": details.get("tagged_priorities") or [],
}
except Exception as e:
logger.error(f"Error in get_meeting_details: {e}")
return {
"action": "error",
"message": f"An error occurred: {str(e)}"
}