"""Calendar source adapter for iCal feeds."""
from datetime import date, datetime, timedelta
from typing import Any
from zoneinfo import ZoneInfo
import httpx
from icalendar import Calendar
from daily_briefing.config import settings
from daily_briefing.logging import get_logger
from daily_briefing.models import CalendarEvent, EventType, TravelInfo
from daily_briefing.sources.base import DataSource
logger = get_logger(__name__)
class CalendarSource(DataSource):
"""Data source for iCal calendar feeds (Google Calendar, TripIt, etc.)."""
def __init__(
self,
name: str,
ical_url: str,
enabled: bool = True,
is_travel_source: bool = False,
):
"""Initialize the calendar source.
Args:
name: Display name for the source
ical_url: URL to the iCal feed
enabled: Whether the source is enabled
is_travel_source: Whether this source contains travel events
"""
super().__init__(name, enabled)
self.ical_url = ical_url
self.is_travel_source = is_travel_source
self._tz = ZoneInfo(settings.timezone)
async def get_events(
self, start_date: date, end_date: date
) -> list[CalendarEvent]:
"""Get calendar events from the iCal feed.
Args:
start_date: Start of date range
end_date: End of date range
Returns:
List of calendar events
"""
if not self.enabled or not self.ical_url:
return []
try:
logger.info(
"fetching_ical_events",
source=self.name,
start=start_date.isoformat(),
end=end_date.isoformat(),
)
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(self.ical_url)
response.raise_for_status()
ical_data = response.text
return self._parse_ical(ical_data, start_date, end_date)
except Exception as e:
logger.error("ical_fetch_error", source=self.name, error=str(e))
return []
def _parse_ical(
self, ical_data: str, start_date: date, end_date: date
) -> list[CalendarEvent]:
"""Parse iCal data into CalendarEvent objects.
Args:
ical_data: Raw iCal data
start_date: Filter start date
end_date: Filter end date
Returns:
List of calendar events
"""
events = []
cal = Calendar.from_ical(ical_data)
start_dt = datetime.combine(start_date, datetime.min.time()).replace(
tzinfo=self._tz
)
end_dt = datetime.combine(end_date, datetime.max.time()).replace(
tzinfo=self._tz
)
for component in cal.walk():
if component.name != "VEVENT":
continue
try:
event = self._parse_event(component)
if event and self._event_in_range(event, start_dt, end_dt):
events.append(event)
except Exception as e:
logger.warning(
"event_parse_error",
source=self.name,
error=str(e),
)
logger.info("parsed_ical_events", source=self.name, count=len(events))
return sorted(events, key=lambda e: e.start)
def _parse_event(self, component: Any) -> CalendarEvent | None:
"""Parse a single iCal event component.
Args:
component: iCal VEVENT component
Returns:
CalendarEvent or None if parsing fails
"""
uid = str(component.get("uid", ""))
summary = str(component.get("summary", "Untitled"))
# Handle dates
dtstart = component.get("dtstart")
dtend = component.get("dtend")
if not dtstart:
return None
start = dtstart.dt
is_all_day = not isinstance(start, datetime)
if is_all_day:
start = datetime.combine(start, datetime.min.time()).replace(
tzinfo=self._tz
)
if dtend:
end = datetime.combine(dtend.dt, datetime.min.time()).replace(
tzinfo=self._tz
)
else:
end = start + timedelta(days=1)
else:
if start.tzinfo is None:
start = start.replace(tzinfo=self._tz)
else:
start = start.astimezone(self._tz)
if dtend:
end = dtend.dt
if end.tzinfo is None:
end = end.replace(tzinfo=self._tz)
else:
end = end.astimezone(self._tz)
else:
end = start + timedelta(hours=1)
# Determine event type
event_type = self._determine_event_type(summary, component)
# Get location and description
location = str(component.get("location", "")) or None
description = str(component.get("description", "")) or None
return CalendarEvent(
id=uid,
title=summary,
start=start,
end=end,
source=self.name,
event_type=event_type,
location=location,
description=description,
is_all_day=is_all_day,
)
def _determine_event_type(self, summary: str, component: Any) -> EventType:
"""Determine the type of event based on its content.
Args:
summary: Event summary/title
component: iCal component
Returns:
EventType enum value
"""
summary_lower = summary.lower()
# Check for travel-related keywords
if self.is_travel_source:
if any(
kw in summary_lower
for kw in ["flight", "air", "plane", "airline"]
):
return EventType.FLIGHT
if any(kw in summary_lower for kw in ["hotel", "stay", "lodging"]):
return EventType.HOTEL
if any(kw in summary_lower for kw in ["car", "rental", "vehicle"]):
return EventType.CAR_RENTAL
if any(kw in summary_lower for kw in ["train", "rail", "amtrak"]):
return EventType.TRAIN
# Default to meeting for non-travel events
return EventType.MEETING
def _event_in_range(
self, event: CalendarEvent, start_dt: datetime, end_dt: datetime
) -> bool:
"""Check if event falls within date range.
Args:
event: Calendar event
start_dt: Range start
end_dt: Range end
Returns:
True if event is in range
"""
return event.start < end_dt and event.end > start_dt
async def get_travel_info(
self, start_date: date, end_date: date
) -> list[TravelInfo]:
"""Get travel information from the calendar.
Args:
start_date: Start of date range
end_date: End of date range
Returns:
List of travel info
"""
if not self.is_travel_source:
return []
events = await self.get_events(start_date, end_date)
travel_events = [
e
for e in events
if e.event_type
in (EventType.FLIGHT, EventType.HOTEL, EventType.CAR_RENTAL)
]
if not travel_events:
return []
# Group events into trips (consecutive travel days)
trips: list[TravelInfo] = []
current_trip_events: list[CalendarEvent] = []
last_end_date: date | None = None
for event in sorted(travel_events, key=lambda e: e.start):
event_date = event.start.date()
# Start new trip if gap > 1 day
if last_end_date and (event_date - last_end_date).days > 1:
if current_trip_events:
trips.append(self._create_travel_info(current_trip_events))
current_trip_events = []
current_trip_events.append(event)
last_end_date = event.end.date()
# Don't forget the last trip
if current_trip_events:
trips.append(self._create_travel_info(current_trip_events))
return trips
def _create_travel_info(self, events: list[CalendarEvent]) -> TravelInfo:
"""Create TravelInfo from a list of travel events.
Args:
events: List of travel events
Returns:
TravelInfo object
"""
sorted_events = sorted(events, key=lambda e: e.start)
first = sorted_events[0]
last = sorted_events[-1]
# Try to extract destination
destination = None
for event in sorted_events:
if event.location:
destination = event.location
break
# Generate alerts
alerts = []
flight_events = [e for e in events if e.event_type == EventType.FLIGHT]
if flight_events:
first_flight = min(flight_events, key=lambda e: e.start)
days_until = (first_flight.start.date() - date.today()).days
if days_until <= 3:
alerts.append(f"Flight in {days_until} day(s): {first_flight.title}")
return TravelInfo(
id=f"trip-{first.id}",
title=first.title.split(" - ")[0] if " - " in first.title else first.title,
start_date=first.start.date(),
end_date=last.end.date(),
destination=destination,
events=sorted_events,
alerts=alerts,
)
async def health_check(self) -> dict[str, Any]:
"""Check if the calendar source is accessible.
Returns:
Health status dictionary
"""
if not self.enabled:
return {"status": "disabled", "source": self.name}
if not self.ical_url:
return {"status": "not_configured", "source": self.name}
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.head(self.ical_url)
return {
"status": "healthy" if response.status_code == 200 else "error",
"source": self.name,
"status_code": response.status_code,
}
except Exception as e:
return {"status": "error", "source": self.name, "error": str(e)}