"""Briefing aggregator that combines data from multiple sources."""
from datetime import date, datetime, timedelta
from typing import Any
from zoneinfo import ZoneInfo
from daily_briefing.config import settings
from daily_briefing.logging import get_logger
from daily_briefing.models import (
ActionItem,
CalendarEvent,
Conflict,
ConflictType,
DailyBriefing,
FreeSlot,
MeetingSummary,
Priority,
TravelInfo,
)
from daily_briefing.sources.base import DataSource
logger = get_logger(__name__)
class BriefingAggregator:
"""Aggregates data from multiple sources into a unified briefing."""
def __init__(self, sources: list[DataSource]):
"""Initialize the aggregator.
Args:
sources: List of data sources to aggregate from
"""
self.sources = sources
self._tz = ZoneInfo(settings.timezone)
async def generate_briefing(self, briefing_date: date | None = None) -> DailyBriefing:
"""Generate a daily briefing for the specified date.
Args:
briefing_date: Date for the briefing (defaults to today)
Returns:
Complete DailyBriefing object
"""
if briefing_date is None:
briefing_date = date.today()
logger.info("generating_briefing", date=briefing_date.isoformat())
# Collect data from all sources
all_events = await self._collect_events(briefing_date)
recent_meetings = await self._collect_meetings(briefing_date)
action_items = await self._collect_action_items()
travel_info = await self._collect_travel_info(briefing_date)
# Analyze the data
conflicts = self._detect_conflicts(all_events)
free_slots = self._find_free_slots(all_events, briefing_date)
# Calculate stats
meeting_events = [e for e in all_events if not e.is_all_day]
total_meetings = len(meeting_events)
total_meeting_minutes = sum(
(e.end - e.start).total_seconds() / 60 for e in meeting_events
)
# Calculate focus time (free slots >= 30 min)
focus_minutes = sum(
s.duration_minutes for s in free_slots if s.duration_minutes >= 30
)
# Collect travel alerts
travel_alerts = []
for trip in travel_info:
travel_alerts.extend(trip.alerts)
briefing = DailyBriefing(
date=briefing_date,
generated_at=datetime.now(self._tz),
events=all_events,
conflicts=conflicts,
free_slots=free_slots,
recent_meetings=recent_meetings,
action_items=action_items,
upcoming_travel=travel_info,
travel_alerts=travel_alerts,
total_meetings=total_meetings,
total_meeting_hours=total_meeting_minutes / 60,
focus_time_hours=focus_minutes / 60,
)
logger.info(
"briefing_generated",
date=briefing_date.isoformat(),
events=len(all_events),
conflicts=len(conflicts),
action_items=len(action_items),
)
return briefing
async def _collect_events(self, briefing_date: date) -> list[CalendarEvent]:
"""Collect calendar events from all sources.
Args:
briefing_date: Date to get events for
Returns:
List of all calendar events
"""
all_events = []
for source in self.sources:
if not source.enabled:
continue
try:
events = await source.get_events(briefing_date, briefing_date)
all_events.extend(events)
except Exception as e:
logger.warning(
"source_events_error",
source=source.name,
error=str(e),
)
# Sort by start time
return sorted(all_events, key=lambda e: e.start)
async def _collect_meetings(self, briefing_date: date) -> list[MeetingSummary]:
"""Collect meeting summaries from sources like Fireflies.
Args:
briefing_date: Reference date
Returns:
List of recent meeting summaries
"""
# Look back N days for meeting summaries
start_date = briefing_date - timedelta(days=settings.meeting_lookback_days)
end_date = briefing_date - timedelta(days=1) # Yesterday
all_meetings = []
for source in self.sources:
if not source.enabled:
continue
try:
meetings = await source.get_meetings(start_date, end_date)
all_meetings.extend(meetings)
except Exception as e:
logger.warning(
"source_meetings_error",
source=source.name,
error=str(e),
)
# Sort by date (most recent first)
return sorted(all_meetings, key=lambda m: m.date, reverse=True)
async def _collect_action_items(self) -> list[ActionItem]:
"""Collect action items from all sources.
Returns:
List of all action items
"""
all_items = []
for source in self.sources:
if not source.enabled:
continue
try:
items = await source.get_action_items()
all_items.extend(items)
except Exception as e:
logger.warning(
"source_action_items_error",
source=source.name,
error=str(e),
)
return all_items
async def _collect_travel_info(self, briefing_date: date) -> list[TravelInfo]:
"""Collect travel information from all sources.
Args:
briefing_date: Reference date
Returns:
List of upcoming travel
"""
end_date = briefing_date + timedelta(days=settings.travel_lookahead_days)
all_travel = []
for source in self.sources:
if not source.enabled:
continue
try:
travel = await source.get_travel_info(briefing_date, end_date)
all_travel.extend(travel)
except Exception as e:
logger.warning(
"source_travel_error",
source=source.name,
error=str(e),
)
# Sort by start date
return sorted(all_travel, key=lambda t: t.start_date)
def _detect_conflicts(self, events: list[CalendarEvent]) -> list[Conflict]:
"""Detect scheduling conflicts in the events.
Args:
events: List of calendar events
Returns:
List of conflicts
"""
conflicts = []
non_all_day = [e for e in events if not e.is_all_day]
for i, event1 in enumerate(non_all_day):
for event2 in non_all_day[i + 1 :]:
# Check for overlap
if event1.end > event2.start and event1.start < event2.end:
conflicts.append(
Conflict(
conflict_type=ConflictType.OVERLAP,
events=[event1, event2],
description=f"'{event1.title}' overlaps with '{event2.title}'",
severity=Priority.HIGH,
)
)
# Check for back-to-back (less than buffer time)
elif (
event2.start - event1.end
).total_seconds() / 60 < settings.conflict_buffer_minutes:
conflicts.append(
Conflict(
conflict_type=ConflictType.BACK_TO_BACK,
events=[event1, event2],
description=(
f"'{event1.title}' and '{event2.title}' are back-to-back "
f"(less than {settings.conflict_buffer_minutes} min gap)"
),
severity=Priority.LOW,
)
)
return conflicts
def _find_free_slots(
self, events: list[CalendarEvent], briefing_date: date
) -> list[FreeSlot]:
"""Find available time slots in the day.
Args:
events: List of calendar events
briefing_date: Date to find slots for
Returns:
List of free time slots
"""
# Define work hours
work_start = datetime.combine(
briefing_date,
datetime.min.time().replace(hour=settings.work_start_hour),
).replace(tzinfo=self._tz)
work_end = datetime.combine(
briefing_date,
datetime.min.time().replace(hour=settings.work_end_hour),
).replace(tzinfo=self._tz)
# Get non-all-day events sorted by start time
busy_periods = [
(e.start, e.end) for e in events if not e.is_all_day
]
busy_periods.sort(key=lambda p: p[0])
free_slots = []
current_time = work_start
for busy_start, busy_end in busy_periods:
# Skip events outside work hours
if busy_end <= work_start or busy_start >= work_end:
continue
# Clamp to work hours
busy_start = max(busy_start, work_start)
busy_end = min(busy_end, work_end)
# Free slot before this busy period
if current_time < busy_start:
duration = int((busy_start - current_time).total_seconds() / 60)
if duration >= 15: # Minimum 15 min slot
free_slots.append(
FreeSlot(
start=current_time,
end=busy_start,
duration_minutes=duration,
)
)
current_time = max(current_time, busy_end)
# Free slot after last event
if current_time < work_end:
duration = int((work_end - current_time).total_seconds() / 60)
if duration >= 15:
free_slots.append(
FreeSlot(
start=current_time,
end=work_end,
duration_minutes=duration,
)
)
return free_slots
async def get_source_health(self) -> dict[str, Any]:
"""Get health status of all sources.
Returns:
Dictionary of source health statuses
"""
health = {}
for source in self.sources:
try:
status = await source.health_check()
health[source.name] = status
except Exception as e:
health[source.name] = {"status": "error", "error": str(e)}
return health