calendar_actions.py•50.4 kB
import logging
from datetime import datetime, date, timedelta, time, timezone
from typing import Optional, List, Dict, Any, Tuple
from dateutil import parser # For robust datetime parsing
import json
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.oauth2.credentials import Credentials
from .models import (
GoogleCalendarEvent,
EventsResponse,
EventCreateRequest,
EventDateTime,
EventAttendee,
EventUpdateRequest,
CalendarListResponse,
CalendarListEntry
)
# Import analysis functions
try:
from .analysis import project_recurring_events, ProjectedEventOccurrence, analyze_busyness
except ImportError:
logging.error("Could not import from .analysis. Ensure structure is correct.")
# Define dummies for type hinting
def project_recurring_events(*args, **kwargs): return []
def analyze_busyness(*args, **kwargs): return None # Added dummy
class ProjectedEventOccurrence: pass
logger = logging.getLogger(__name__)
# --- Helper Function to Build Service ---
def _get_calendar_service(credentials: Credentials):
"""Builds the Google Calendar API service client."""
try:
service = build('calendar', 'v3', credentials=credentials)
logger.debug("Google Calendar service client created successfully.")
return service
except Exception as e:
logger.error(f"Failed to build Google Calendar service: {e}", exc_info=True)
raise # Re-raise the exception to be handled by the caller
# --- Calendar Action Functions ---
def find_events(
credentials: Credentials,
calendar_id: str = 'primary',
time_min: Optional[datetime] = None,
time_max: Optional[datetime] = None,
query: Optional[str] = None,
max_results: int = 50,
single_events: bool = True, # Expand recurring events into instances
order_by: str = 'startTime', # Order by start time
iCalUID: Optional[str] = None, # Filter by iCalendar UID
sharedExtendedProperty: Optional[str] = None, # Filter by shared extended properties (key=value or key)
privateExtendedProperty: Optional[str] = None, # Filter by private extended properties (key=value or key)
showDeleted: bool = False, # Show deleted events
eventTypes: Optional[List[str]] = None # Filter by event types (e.g., ['default', 'focusTime'])
) -> Optional[EventsResponse]:
"""Finds events in a specified calendar based on various criteria.
Args:
credentials: Valid Google OAuth2 credentials.
calendar_id: Calendar identifier (e.g., 'primary', email address, or calendar ID).
time_min: Start of the time range (inclusive). If None, no lower bound.
time_max: End of the time range (exclusive). If None, no upper bound.
query: Free text search query.
max_results: Maximum number of events to return.
single_events: Whether to expand recurring events into single instances.
order_by: The order of the events returned ('startTime' or 'updated').
iCalUID: Specific iCalendar UID to filter by.
sharedExtendedProperty: Filter by shared extended properties. Format "key=value" or "key".
privateExtendedProperty: Filter by private extended properties. Format "key=value" or "key".
showDeleted: Whether to include deleted events in the results.
eventTypes: List of event types to return (e.g., ['default', 'focusTime', 'outOfOffice']).
Returns:
An EventsResponse object containing the list of events, or None if an error occurs.
"""
service = _get_calendar_service(credentials)
if not service:
return None
# Format datetime objects to RFC3339 string format required by the API
time_min_str = time_min.isoformat() + 'Z' if time_min and time_min.tzinfo is None else (time_min.isoformat() if time_min else None)
time_max_str = time_max.isoformat() + 'Z' if time_max and time_max.tzinfo is None else (time_max.isoformat() if time_max else None)
# Build the arguments dictionary dynamically to avoid passing None values for optional params
list_kwargs = {
'calendarId': calendar_id,
'timeMin': time_min_str,
'timeMax': time_max_str,
'q': query,
'maxResults': max_results,
'singleEvents': single_events,
'orderBy': order_by,
'showDeleted': showDeleted,
# Conditionally add parameters if they are provided
**(({'iCalUID': iCalUID}) if iCalUID else {}),
**(({'sharedExtendedProperty': sharedExtendedProperty}) if sharedExtendedProperty else {}),
**(({'privateExtendedProperty': privateExtendedProperty}) if privateExtendedProperty else {}),
**(({'eventTypes': eventTypes}) if eventTypes else {}),
}
# Filter out None values from list_kwargs to avoid API errors for empty optional params
list_kwargs = {k: v for k, v in list_kwargs.items() if v is not None}
logger.info(
f"Fetching events from calendar '{calendar_id}' with parameters: {list_kwargs}"
# f"time_min='{time_min_str}', time_max='{time_max_str}', query='{query}', "
# f"max_results={max_results}, single_events={single_events}, order_by='{order_by}', "
# f"iCalUID='{iCalUID}', sharedExtendedProperty='{sharedExtendedProperty}', "
# f"privateExtendedProperty='{privateExtendedProperty}', showDeleted={showDeleted}, "
# f"eventTypes={eventTypes}"
)
try:
events_result = service.events().list(**list_kwargs).execute()
logger.info(f"Found {len(events_result.get('items', []))} events.")
# Parse the result using Pydantic models for validation and structure
events_response = EventsResponse(**events_result)
return events_response
except HttpError as error:
logger.error(f"An API error occurred while finding events: {error}", exc_info=True)
# Add more detailed logging if possible
try:
error_content = error.content.decode('utf-8')
logger.error(f"Google API error details (find_events): {error.resp.status} - {error_content}")
except Exception:
logger.error(f"Google API error details (find_events): {error.resp.status} - Could not decode error content.")
return None
except Exception as e:
logger.error(f"An unexpected error occurred while finding events: {e}", exc_info=True)
return None
def create_event(
credentials: Credentials,
event_data: EventCreateRequest, # Use the Pydantic model for input validation
calendar_id: str = 'primary',
send_notifications: bool = True # Whether to send notifications to attendees
) -> Optional[GoogleCalendarEvent]:
"""Creates a new event in the specified calendar.
Args:
credentials: Valid Google OAuth2 credentials.
event_data: An EventCreateRequest object containing event details.
calendar_id: Calendar identifier.
send_notifications: Whether to send notifications about the creation to attendees.
Returns:
A GoogleCalendarEvent object representing the created event, or None if an error occurs.
"""
service = _get_calendar_service(credentials)
if not service:
return None
# --- Manually Construct Event Body ---
# Ensure datetime objects are formatted as strings for JSON serialization
event_body: Dict[str, Any] = {}
def format_datetime_field(dt_obj: datetime) -> str:
if dt_obj.tzinfo is None:
# Assume UTC if timezone is naive
return dt_obj.isoformat() + 'Z'
else:
return dt_obj.isoformat()
# Required fields
if not event_data.start or not event_data.end:
logger.error("Event creation failed: Start and End times are required.")
return None # Or raise an error
event_body['start'] = {}
if event_data.start.dateTime:
event_body['start']['dateTime'] = format_datetime_field(event_data.start.dateTime)
if event_data.start.timeZone:
event_body['start']['timeZone'] = event_data.start.timeZone
elif event_data.start.date:
event_body['start']['date'] = str(event_data.start.date) # Ensure date is string
else:
logger.error("Event creation failed: Start time requires either dateTime or date.")
return None
event_body['end'] = {}
if event_data.end.dateTime:
event_body['end']['dateTime'] = format_datetime_field(event_data.end.dateTime)
if event_data.end.timeZone:
event_body['end']['timeZone'] = event_data.end.timeZone
elif event_data.end.date:
event_body['end']['date'] = str(event_data.end.date) # Ensure date is string
else:
logger.error("Event creation failed: End time requires either dateTime or date.")
return None
# Optional fields
if event_data.summary:
event_body['summary'] = event_data.summary
if event_data.description:
event_body['description'] = event_data.description
if event_data.location:
event_body['location'] = event_data.location
if event_data.attendees:
event_body['attendees'] = [{'email': email} for email in event_data.attendees]
if event_data.recurrence:
event_body['recurrence'] = event_data.recurrence
if event_data.reminders:
# Pydantic should handle nested model serialization correctly here if reminders is a model
# If it needs specific formatting, adjust here. Assuming .dict() is okay for reminders.
event_body['reminders'] = event_data.reminders.dict(by_alias=True, exclude_unset=True)
# Add other optional fields from EventCreateRequest if needed (e.g., colorId, transparency, etc.)
# --- End Manual Construction ---
logger.info(f"Creating event in calendar '{calendar_id}': {event_body.get('summary', '[No Summary]')}")
# Use json.dumps for accurate debug logging of what will be serialized
try:
event_body_json = json.dumps(event_body, indent=2) # Pretty print for readability
logger.debug(f"Event body for creation (JSON):\n{event_body_json}")
except TypeError as json_err:
# This should not happen now, but good to have a fallback log
logger.error(f"Could not serialize event_body for debug log: {json_err}")
logger.debug(f"Event body for creation (raw dict): {event_body}")
try:
created_event = service.events().insert(
calendarId=calendar_id,
body=event_body,
sendNotifications=send_notifications
).execute()
logger.info(f"Successfully created event with ID: {created_event.get('id')}")
# Parse the created event using Pydantic model
parsed_event = GoogleCalendarEvent(**created_event)
return parsed_event
except HttpError as error:
# Log the actual error response content for debugging
error_content = "Unknown error content"
try:
error_content = error.content.decode('utf-8')
except Exception:
pass # Keep default message if decoding fails
logger.error(f"Google API error while creating event: {error.resp.status} - {error_content}", exc_info=True)
return None
except Exception as e:
logger.error(f"An unexpected error occurred while creating event: {e}", exc_info=True)
return None
def quick_add_event(
credentials: Credentials,
text: str,
calendar_id: str = 'primary',
send_notifications: bool = False # Default to False for quick add?
) -> Optional[GoogleCalendarEvent]:
"""Creates an event based on a simple text string using Google's parser.
Args:
credentials: Valid Google OAuth2 credentials.
text: The text description of the event (e.g., "Tennis at 5pm tomorrow").
calendar_id: Calendar identifier.
send_notifications: Whether to send notifications.
Returns:
A GoogleCalendarEvent object representing the created event, or None if an error occurs.
"""
service = _get_calendar_service(credentials)
if not service:
return None
logger.info(f"Quick adding event to calendar '{calendar_id}' with text: \"{text}\"")
try:
created_event = service.events().quickAdd(
calendarId=calendar_id,
text=text,
sendNotifications=send_notifications
).execute()
logger.info(f"Successfully quick-added event with ID: {created_event.get('id')}")
# Parse the created event using Pydantic model
parsed_event = GoogleCalendarEvent(**created_event)
return parsed_event
except HttpError as error:
logger.error(f"An API error occurred during quick add: {error}", exc_info=True)
# Add more detailed logging
try:
error_content = error.content.decode('utf-8')
logger.error(f"Google API error details (quick_add): {error.resp.status} - {error_content}")
except Exception:
logger.error(f"Google API error details (quick_add): {error.resp.status} - Could not decode error content.")
return None
except Exception as e:
logger.error(f"An unexpected error occurred during quick add: {e}", exc_info=True)
return None
def update_event(
credentials: Credentials,
event_id: str,
update_data: EventUpdateRequest, # Use Pydantic model for partial update data
calendar_id: str = 'primary',
send_notifications: bool = True # Whether to send notifications
) -> Optional[GoogleCalendarEvent]:
"""Updates an existing event using patch semantics (only specified fields are changed).
Args:
credentials: Valid Google OAuth2 credentials.
event_id: The ID of the event to update.
update_data: An EventUpdateRequest object containing fields to update.
calendar_id: Calendar identifier.
send_notifications: Whether to send update notifications to attendees.
Returns:
A GoogleCalendarEvent object representing the updated event, or None if an error occurs.
"""
service = _get_calendar_service(credentials)
if not service:
return None
# Manually construct the update body dictionary
update_body: Dict[str, Any] = {}
# Helper function (can be reused or defined locally)
def format_datetime_field(dt_obj: datetime) -> str:
if dt_obj.tzinfo is None:
return dt_obj.isoformat() + 'Z'
else:
return dt_obj.isoformat()
# Populate update_body only with fields present in update_data
if update_data.summary is not None:
update_body['summary'] = update_data.summary
if update_data.description is not None:
update_body['description'] = update_data.description
if update_data.location is not None:
update_body['location'] = update_data.location
# Handle start time - need to format if present
if update_data.start is not None:
start_details = {}
if update_data.start.dateTime:
start_details['dateTime'] = format_datetime_field(update_data.start.dateTime)
if update_data.start.timeZone:
start_details['timeZone'] = update_data.start.timeZone
elif update_data.start.date:
start_details['date'] = str(update_data.start.date)
# Add check if neither date nor dateTime provided in start? Unlikely via model validation.
if start_details: # Only add 'start' if we have valid sub-fields
update_body['start'] = start_details
# Handle end time - need to format if present
if update_data.end is not None:
end_details = {}
if update_data.end.dateTime:
end_details['dateTime'] = format_datetime_field(update_data.end.dateTime)
if update_data.end.timeZone:
end_details['timeZone'] = update_data.end.timeZone
elif update_data.end.date:
end_details['date'] = str(update_data.end.date)
# Add check if neither date nor dateTime provided in end?
if end_details: # Only add 'end' if we have valid sub-fields
update_body['end'] = end_details
# Handle attendees - PATCH replaces the attendee list
if update_data.attendees is not None:
# Convert EventAttendee models back to simple dicts for API
update_body['attendees'] = [
attendee.dict(by_alias=True, exclude_unset=True)
for attendee in update_data.attendees
]
# Add other updatable fields from EventUpdateRequest if needed
if not update_body:
logger.warning(f"Update called for event {event_id} with no fields to update.")
# Optionally, retrieve and return the existing event data?
# For now, return None or raise an error, as no update was performed.
# Let's retrieve the existing event to provide some feedback.
try:
existing_event = service.events().get(calendarId=calendar_id, eventId=event_id).execute()
return GoogleCalendarEvent(**existing_event)
except HttpError as e:
logger.error(f"Failed to retrieve event {event_id} after empty update request: {e}")
return None
logger.info(f"Updating event '{event_id}' in calendar '{calendar_id}'.")
logger.debug(f"Update body for patch: {update_body}")
try:
updated_event = service.events().patch(
calendarId=calendar_id,
eventId=event_id,
body=update_body,
sendNotifications=send_notifications
).execute()
logger.info(f"Successfully updated event '{event_id}'.")
# Parse the updated event using Pydantic model
parsed_event = GoogleCalendarEvent(**updated_event)
return parsed_event
except HttpError as error:
# Handle common errors like 404 Not Found
if error.resp.status == 404:
logger.error(f"Event '{event_id}' not found in calendar '{calendar_id}'.")
else:
# Log detailed error content
error_content = "Unknown error content"
try:
error_content = error.content.decode('utf-8')
except Exception:
pass
logger.error(f"Google API error while updating event '{event_id}': {error.resp.status} - {error_content}", exc_info=True)
return None
except Exception as e:
logger.error(f"An unexpected error occurred while updating event '{event_id}': {e}", exc_info=True)
return None
def delete_event(
credentials: Credentials,
event_id: str,
calendar_id: str = 'primary',
send_notifications: bool = True # Whether to send notifications
) -> bool:
"""Deletes an event.
Args:
credentials: Valid Google OAuth2 credentials.
event_id: The ID of the event to delete.
calendar_id: Calendar identifier.
send_notifications: Whether to send deletion notifications to attendees.
Returns:
True if the event was deleted successfully, False otherwise.
"""
service = _get_calendar_service(credentials)
if not service:
return False
logger.info(f"Attempting to delete event '{event_id}' from calendar '{calendar_id}'.")
try:
service.events().delete(
calendarId=calendar_id,
eventId=event_id,
sendNotifications=send_notifications
).execute()
# Delete returns no content on success (204)
logger.info(f"Successfully deleted event '{event_id}'.")
return True
except HttpError as error:
# Handle common errors like 404 Not Found or 410 Gone
if error.resp.status in [404, 410]:
logger.error(f"Event '{event_id}' not found or already deleted in calendar '{calendar_id}'. Cannot delete.")
else:
# Log detailed error content
error_content = "Unknown error content"
try:
error_content = error.content.decode('utf-8')
except Exception:
pass
logger.error(f"Google API error while deleting event '{event_id}': {error.resp.status} - {error_content}", exc_info=True)
return False
except Exception as e:
logger.error(f"An unexpected error occurred while deleting event '{event_id}': {e}", exc_info=True)
return False
def add_attendee(
credentials: Credentials,
event_id: str,
attendee_emails: List[str], # Simple list of emails to add
calendar_id: str = 'primary',
send_notifications: bool = True
) -> Optional[GoogleCalendarEvent]:
"""Adds one or more attendees to an existing event.
Note: This replaces the entire attendee list in the event.
Args:
credentials: Valid Google OAuth2 credentials.
event_id: The ID of the event to modify.
attendee_emails: A list of email addresses to add/set as attendees.
calendar_id: Calendar identifier.
send_notifications: Whether to send update notifications.
Returns:
The updated GoogleCalendarEvent object, or None if an error occurs.
"""
service = _get_calendar_service(credentials)
if not service:
return None
logger.info(f"Attempting to add attendees {attendee_emails} to event '{event_id}' in calendar '{calendar_id}'.")
# 1. Get the existing event
try:
event = service.events().get(calendarId=calendar_id, eventId=event_id).execute()
logger.debug(f"Retrieved existing event '{event_id}' for adding attendees.")
except HttpError as error:
if error.resp.status == 404:
logger.error(f"Event '{event_id}' not found in calendar '{calendar_id}'. Cannot add attendees.")
else:
# Log detailed error content when retrieving event
error_content = "Unknown error content"
try:
error_content = error.content.decode('utf-8')
except Exception:
pass
logger.error(f"Google API error retrieving event '{event_id}' for adding attendees: {error.resp.status} - {error_content}", exc_info=True)
return None
except Exception as e:
logger.error(f"Unexpected error retrieving event '{event_id}': {e}", exc_info=True)
return None
# 2. Modify the attendee list
# Get current attendees, ensuring it's a list
current_attendees = event.get('attendees', [])
if not isinstance(current_attendees, list):
current_attendees = [] # Ensure it's a list if API returns something unexpected
# Create a set of current attendee emails for efficient lookup
current_emails = {attendee.get('email') for attendee in current_attendees if attendee.get('email')}
# Prepare the list of new attendee objects to add
new_attendees_to_add = [
{'email': email} for email in attendee_emails if email not in current_emails
]
if not new_attendees_to_add:
logger.warning(f"All provided attendees {attendee_emails} are already in event '{event_id}'. No update needed.")
# Return the current event data as no changes were made
return GoogleCalendarEvent(**event)
# Combine current and new attendees
updated_attendee_list = current_attendees + new_attendees_to_add
# 3. Prepare the patch body
patch_body = {
'attendees': updated_attendee_list
}
# 4. Patch the event
logger.debug(f"Patching event '{event_id}' with updated attendees: {patch_body}")
try:
updated_event = service.events().patch(
calendarId=calendar_id,
eventId=event_id,
body=patch_body,
sendNotifications=send_notifications
).execute()
logger.info(f"Successfully added attendees to event '{event_id}'.")
# Parse the updated event using Pydantic model
parsed_event = GoogleCalendarEvent(**updated_event)
return parsed_event
except HttpError as error:
# Log detailed error content when patching event
error_content = "Unknown error content"
try:
error_content = error.content.decode('utf-8')
except Exception:
pass
logger.error(f"Google API error occurred while patching event '{event_id}' with new attendees: {error.resp.status} - {error_content}", exc_info=True)
return None
except Exception as e:
logger.error(f"An unexpected error occurred while patching event '{event_id}' with new attendees: {e}", exc_info=True)
return None
def find_calendars(
credentials: Credentials,
min_access_role: Optional[str] = None # e.g., 'reader', 'writer', 'owner'
) -> Optional[CalendarListResponse]:
"""Lists the calendars on the user's calendar list.
Args:
credentials: Valid Google OAuth2 credentials.
min_access_role: The minimum access role for the user in the returned calendars.
Returns:
A CalendarListResponse object containing the list of calendars, or None if an error occurs.
"""
service = _get_calendar_service(credentials)
if not service:
return None
logger.info(f"Fetching calendar list. Min access role: {min_access_role}")
# Paginate? For simplicity, get first page for now.
# Add maxResults if needed.
try:
calendar_list = service.calendarList().list(
minAccessRole=min_access_role
).execute()
logger.info(f"Found {len(calendar_list.get('items', []))} calendars in the list.")
# Parse the result using Pydantic model
parsed_list = CalendarListResponse(**calendar_list)
return parsed_list
except HttpError as error:
logger.error(f"An API error occurred while fetching calendar list: {error}", exc_info=True)
# Add more detailed logging if possible
try:
error_content = error.content.decode('utf-8')
logger.error(f"Google API error details (find_calendars): {error.resp.status} - {error_content}")
except Exception:
logger.error(f"Google API error details (find_calendars): {error.resp.status} - Could not decode error content.")
return None
except Exception as e:
logger.error(f"An unexpected error occurred while fetching calendar list: {e}", exc_info=True)
return None
def create_calendar(
credentials: Credentials,
summary: str # The title of the new calendar
) -> Optional[CalendarListEntry]: # Returns the created calendar (similar to CalendarListEntry)
"""Creates a new secondary calendar.
Args:
credentials: Valid Google OAuth2 credentials.
summary: The title for the new calendar.
Returns:
A CalendarListEntry object representing the created calendar, or None if an error occurs.
"""
service = _get_calendar_service(credentials)
if not service:
return None
logger.info(f"Attempting to create a new calendar with summary: '{summary}'")
calendar_body = {
'summary': summary
# Add other fields like description or timeZone if needed
}
try:
created_calendar = service.calendars().insert(body=calendar_body).execute()
logger.info(f"Successfully created calendar with ID: {created_calendar.get('id')}")
# The response is a Calendar resource, parse it using CalendarListEntry model
# (Structure is identical for relevant fields)
parsed_calendar = CalendarListEntry(**created_calendar)
return parsed_calendar
except HttpError as error:
logger.error(f"An API error occurred while creating calendar '{summary}': {error}", exc_info=True)
# Log detailed error content
error_content = "Unknown error content"
try:
error_content = error.content.decode('utf-8')
except Exception:
pass
logger.error(f"Google API error occurred while creating calendar '{summary}': {error.resp.status} - {error_content}", exc_info=True)
return None
except Exception as e:
logger.error(f"An unexpected error occurred while creating calendar '{summary}': {e}", exc_info=True)
return None
def check_attendee_status(
credentials: Credentials,
event_id: str,
calendar_id: str = 'primary',
attendee_emails: Optional[List[str]] = None
) -> Optional[Dict[str, str]]:
"""Checks the response status of attendees for a specific event.
Args:
credentials: Valid Google OAuth2 credentials.
event_id: The ID of the event to check.
calendar_id: Calendar identifier.
attendee_emails: Optional list of specific attendee emails to check.
If None, checks status for all attendees.
Returns:
A dictionary mapping attendee emails to their response status
(e.g., 'accepted', 'declined', 'tentative', 'needsAction'),
or None if the event is not found or an error occurs.
"""
service = _get_calendar_service(credentials)
if not service:
return None
logger.info(f"Checking attendee status for event '{event_id}' in calendar '{calendar_id}'. Target emails: {attendee_emails or 'All'}")
try:
event = service.events().get(calendarId=calendar_id, eventId=event_id).execute()
logger.debug(f"Retrieved event '{event_id}' for status check.")
except HttpError as error:
if error.resp.status == 404:
logger.error(f"Event '{event_id}' not found in calendar '{calendar_id}'. Cannot check status.")
else:
# Log detailed error content
error_content = "Unknown error content"
try:
error_content = error.content.decode('utf-8')
except Exception:
pass
logger.error(f"Google API error retrieving event '{event_id}' for status check: {error.resp.status} - {error_content}", exc_info=True)
return None
except Exception as e:
logger.error(f"Unexpected error retrieving event '{event_id}': {e}", exc_info=True)
return None
attendees = event.get('attendees', [])
if not attendees:
logger.info(f"Event '{event_id}' has no attendees.")
return {}
status_map: Dict[str, str] = {}
target_emails_set = set(attendee_emails) if attendee_emails is not None else None
for attendee in attendees:
email = attendee.get('email')
status = attendee.get('responseStatus')
if not email or not status:
continue # Skip attendees without email or status
# If specific emails were requested, check if this attendee is one of them
if target_emails_set is not None:
if email in target_emails_set:
status_map[email] = status
else:
# Otherwise, include all attendees
status_map[email] = status
logger.info(f"Attendee statuses retrieved for event '{event_id}': {len(status_map)} attendees found.")
return status_map
def find_availability(
credentials: Credentials,
time_min: datetime,
time_max: datetime,
calendar_ids: List[str]
) -> Optional[Dict[str, Dict[str, Any]]]: # Return Dict mapping calendar_id to {'busy': List[Dict], 'errors': List[Dict]} ?
"""Finds free/busy information for a list of calendars.
Args:
credentials: Valid Google OAuth2 credentials.
time_min: Start of the time range (inclusive, timezone-aware recommended).
time_max: End of the time range (exclusive, timezone-aware recommended).
calendar_ids: A list of calendar identifiers (email or ID) to query.
Returns:
A dictionary mapping each calendar ID to its free/busy information.
The value for each calendar ID is a dictionary containing:
- 'busy': A list of busy time intervals [{'start': datetime, 'end': datetime}].
- 'errors': A list of errors encountered for that specific calendar (from API).
Returns None if a major API error occurs.
"""
service = _get_calendar_service(credentials)
if not service:
return None
if not calendar_ids:
logger.warning("find_availability called with empty calendar_ids list.")
return {}
# Ensure time_min and time_max are in RFC3339 format
# Add 'Z' for UTC if timezone is naive, otherwise format appropriately
time_min_str = time_min.isoformat() + ('Z' if time_min.tzinfo is None else '')
time_max_str = time_max.isoformat() + ('Z' if time_max.tzinfo is None else '')
request_body = {
"timeMin": time_min_str,
"timeMax": time_max_str,
"items": [{"id": cal_id} for cal_id in calendar_ids]
# Optional: Add groupExpansionMax, calendarExpansionMax if needed
}
logger.info(f"Querying free/busy information for calendars: {calendar_ids} between {time_min_str} and {time_max_str}")
logger.debug(f"Free/busy request body: {request_body}")
try:
freebusy_result = service.freebusy().query(body=request_body).execute()
logger.debug(f"Free/busy raw response: {freebusy_result}")
# Process the response into a more usable format
processed_results: Dict[str, Dict[str, Any]] = {}
calendars_data = freebusy_result.get('calendars', {})
for cal_id, data in calendars_data.items():
busy_intervals = []
for interval in data.get('busy', []):
try:
# Parse RFC3339 strings back to datetime objects
start_dt = parser.isoparse(interval.get('start'))
end_dt = parser.isoparse(interval.get('end'))
busy_intervals.append({'start': start_dt, 'end': end_dt})
except (TypeError, ValueError) as parse_error:
logger.warning(f"Could not parse busy interval for {cal_id}: {interval}. Error: {parse_error}")
# Optionally add this interval with raw strings or skip it
processed_results[cal_id] = {
'busy': busy_intervals,
'errors': data.get('errors', []) # Keep API errors as is
}
logger.info(f"Successfully retrieved free/busy information for {len(processed_results)} calendars.")
return processed_results
except HttpError as error:
logger.error(f"An API error occurred during free/busy query: {error}", exc_info=True)
# Log detailed error content
error_content = "Unknown error content"
try:
error_content = error.content.decode('utf-8')
except Exception:
pass
logger.error(f"Google API error occurred during free/busy query: {error.resp.status} - {error_content}", exc_info=True)
return None
except Exception as e:
logger.error(f"An unexpected error occurred during free/busy query: {e}", exc_info=True)
return None
def _merge_intervals(intervals: List[Dict[str, datetime]]) -> List[Dict[str, datetime]]:
"""Merges overlapping or adjacent time intervals."""
if not intervals:
return []
# Sort intervals by start time
sorted_intervals = sorted(intervals, key=lambda x: x['start'])
merged = [sorted_intervals[0]]
for current in sorted_intervals[1:]:
last_merged = merged[-1]
# If current interval overlaps or is adjacent to the last merged one
if current['start'] <= last_merged['end']:
# Merge by extending the end time if current ends later
merged[-1]['end'] = max(last_merged['end'], current['end'])
else:
# No overlap, add the current interval as a new one
merged.append(current)
return merged
def _find_first_available_slot(
time_min: datetime,
time_max: datetime,
duration: timedelta,
busy_intervals: List[Dict[str, datetime]],
working_hours_start: Optional[time] = None,
working_hours_end: Optional[time] = None,
) -> Optional[Tuple[datetime, datetime]]:
"""Finds the first available time slot of a given duration within a range, considering busy times and working hours.
Ensures the search starts from the current time if time_min is in the past.
"""
logger.info("--- Entering _find_first_available_slot ---")
logger.debug(f"Initial inputs: time_min={time_min}, time_max={time_max}, duration={duration}")
# --- Ensure Timezones (use UTC for consistency) ---
try:
logger.debug(f"Original time_min tz: {time_min.tzinfo}, time_max tz: {time_max.tzinfo}")
time_min_utc = time_min.astimezone(timezone.utc) if time_min.tzinfo else time_min.replace(tzinfo=timezone.utc)
time_max_utc = time_max.astimezone(timezone.utc) if time_max.tzinfo else time_max.replace(tzinfo=timezone.utc)
now_utc = datetime.now(timezone.utc)
logger.debug(f"Normalized to UTC: time_min={time_min_utc}, time_max={time_max_utc}, now={now_utc}")
except Exception as tz_err:
logger.error(f"Error normalizing timezones to UTC: {tz_err}")
# Fallback, though less ideal
time_min_utc = time_min
time_max_utc = time_max
now_utc = datetime.now()
# Ensure comparison is possible if fallback occurred
if time_min_utc.tzinfo is None and now_utc.tzinfo is not None:
now_utc = now_utc.replace(tzinfo=None)
elif time_min_utc.tzinfo is not None and now_utc.tzinfo is None:
logger.warning("Cannot reliably compare naive now() with timezone-aware time_min in fallback.")
# Consider returning None or raising error here
# Determine effective start time (MUST be in the future)
effective_start = max(time_min_utc, now_utc)
# Log at INFO level for visibility
logger.info(f"Search range: {time_min_utc} to {time_max_utc}. Current time: {now_utc}. Effective start for search: {effective_start}")
# Adjust merged busy intervals to be UTC as well for correct comparison
busy_intervals_utc = []
for interval in busy_intervals:
try:
start_utc = interval['start'].astimezone(timezone.utc) if interval['start'].tzinfo else interval['start'].replace(tzinfo=timezone.utc)
end_utc = interval['end'].astimezone(timezone.utc) if interval['end'].tzinfo else interval['end'].replace(tzinfo=timezone.utc)
busy_intervals_utc.append({'start': start_utc, 'end': end_utc})
except Exception as busy_tz_err:
logger.warning(f"Could not normalize busy interval {interval} to UTC: {busy_tz_err}")
# Skip this interval or handle error appropriately
# Sort busy intervals (important for gap logic)
busy_intervals_utc.sort(key=lambda x: x['start'])
# --- Refactored Slot Finding Logic ---
current_search_time = effective_start
logger.info(f"Search pointer initialized to: {current_search_time}")
# --- Working Hours Check (Placeholder - implement timezone logic if needed) ---
def is_within_working_hours(slot_start: datetime, slot_end: datetime) -> bool:
if not working_hours_start or not working_hours_end:
return True # No working hours constraint
# WARNING: This naive comparison assumes slot times are in local time
# matching working_hours_start/end. Proper implementation requires
# converting slot_start/end back to the relevant local timezone first.
# For now, proceeding with naive check or ignoring if WH not set.
try:
# Attempt comparison assuming compatible types/timezones
return (working_hours_start <= slot_start.time() and
slot_end.time() <= working_hours_end and
slot_start.date() == slot_end.date())
except TypeError as te:
logger.warning(f"Could not compare working hours due to timezone mismatch or type error: {te}. Ignoring working hours constraint for this slot.")
return True # Default to True if comparison fails
# --- End Working Hours Check ---
while current_search_time < time_max_utc:
potential_end_time = current_search_time + duration
# Check if slot goes beyond the overall search window
if potential_end_time > time_max_utc:
logger.info(f"Potential slot end {potential_end_time} exceeds time_max {time_max_utc}. Search finished.")
break # Stop searching
# Check for overlap with any busy interval
overlap_found = False
for busy in busy_intervals_utc:
# Check if the potential slot overlaps with this busy interval
# Overlap definition: (SlotStart < BusyEnd) and (SlotEnd > BusyStart)
if current_search_time < busy['end'] and potential_end_time > busy['start']:
# Overlap found. Move search time to the end of this busy interval.
logger.debug(f"Potential slot {current_search_time} - {potential_end_time} overlaps with busy {busy['start']} - {busy['end']}. Jumping search time.")
current_search_time = busy['end']
overlap_found = True
break # Break inner loop, restart outer while loop check
if overlap_found:
continue # Restart the while loop with the adjusted current_search_time
# If we reach here, the slot [current_search_time, potential_end_time] is free
# Check working hours
# TODO: Implement proper timezone conversion for working hours check if needed
if is_within_working_hours(current_search_time, potential_end_time):
logger.info(f"Found available slot: {current_search_time} - {potential_end_time}")
return current_search_time, potential_end_time
else:
# Slot is free but outside working hours. Move search time forward.
# A simple jump might be to the start of the next working hour block,
# but for now, just advance slightly to check the next possible interval.
# Advancing by duration ensures we check the next non-overlapping slot.
logger.debug(f"Slot {current_search_time} - {potential_end_time} rejected due to working hours. Advancing search time.")
current_search_time += timedelta(minutes=15) # Or duration? Let's try 15 min increment.
# Looped through entire window without finding a suitable slot
logger.info("No suitable available slot found within the time window.")
return None
def find_mutual_availability_and_schedule(
credentials: Credentials,
attendee_calendar_ids: List[str],
time_min: datetime,
time_max: datetime,
duration_minutes: int,
event_details: EventCreateRequest, # Pre-filled event details (summary, desc, etc.) excluding times
organizer_calendar_id: str = 'primary',
working_hours_start: Optional[time] = None, # e.g., time(9, 0)
working_hours_end: Optional[time] = None, # e.g., time(17, 0)
send_notifications: bool = True
) -> Optional[GoogleCalendarEvent]:
"""Finds the first mutually available time slot for attendees and schedules an event.
Args:
credentials: Valid Google OAuth2 credentials.
attendee_calendar_ids: List of calendar IDs (emails) for attendees.
time_min: Start of the search window (timezone-aware recommended).
time_max: End of the search window (timezone-aware recommended).
duration_minutes: Required duration of the event in minutes.
event_details: Pydantic model instance with event details (summary, description,
location, etc.) but WITHOUT start/end times. Attendees can be pre-filled
or will be added from attendee_calendar_ids.
organizer_calendar_id: Calendar ID where the event will be created.
working_hours_start: Optional start time for daily working hours constraint.
working_hours_end: Optional end time for daily working hours constraint.
send_notifications: Whether to send notifications for the created event.
Returns:
The created GoogleCalendarEvent object if a slot is found and scheduling succeeds,
otherwise None.
"""
service = _get_calendar_service(credentials)
if not service:
return None
logger.info(f"Attempting to find mutual availability and schedule for: {attendee_calendar_ids}")
logger.info(f"Search window: {time_min} to {time_max}, Duration: {duration_minutes} mins")
# 1. Find availability for all attendees
availability_data = find_availability(
credentials=credentials,
time_min=time_min,
time_max=time_max,
calendar_ids=attendee_calendar_ids
)
if availability_data is None:
logger.error("Failed to retrieve availability data.")
return None
# 2. Aggregate and merge all busy intervals
all_busy_intervals: List[Dict[str, datetime]] = []
for cal_id, data in availability_data.items():
if data.get('errors'):
logger.warning(f"Encountered errors fetching availability for {cal_id}: {data['errors']}")
# Decide how to handle errors: fail, proceed without this calendar, etc.
# For now, let's log a warning and proceed, potentially scheduling over their busy time.
# A stricter approach would be to return None here.
all_busy_intervals.extend(data.get('busy', []))
merged_busy = _merge_intervals(all_busy_intervals)
logger.debug(f"Merged busy intervals: {merged_busy}")
# 3. Find the first available slot
duration = timedelta(minutes=duration_minutes)
available_slot = _find_first_available_slot(
time_min=time_min,
time_max=time_max,
duration=duration,
busy_intervals=merged_busy,
working_hours_start=working_hours_start,
working_hours_end=working_hours_end
)
if not available_slot:
logger.warning("No mutually available time slot found meeting the criteria.")
return None
slot_start, slot_end = available_slot
logger.info(f"Found available slot: {slot_start} - {slot_end}")
# 4. Prepare full event data
# Create a copy to avoid modifying the original input
final_event_data = event_details.copy(deep=True)
final_event_data.start = EventDateTime(dateTime=slot_start)
final_event_data.end = EventDateTime(dateTime=slot_end)
# Ensure all required attendees are in the event data
existing_attendees = {att.email for att in final_event_data.attendees} if final_event_data.attendees else set()
for email in attendee_calendar_ids:
# Skip adding 'primary' as an attendee email
if email == 'primary':
continue
if email not in existing_attendees:
if final_event_data.attendees is None:
final_event_data.attendees = []
# Assuming EventCreateRequest uses a simple list of emails for input
# If it expects EventAttendee models, adjust accordingly.
# Based on create_event, it expects a list of emails which it converts.
final_event_data.attendees.append(email)
existing_attendees.add(email) # Keep track
logger.debug(f"Final event data for creation: {final_event_data.dict(by_alias=True)}")
# 5. Create the event
created_event = create_event(
credentials=credentials,
event_data=final_event_data,
calendar_id=organizer_calendar_id,
send_notifications=send_notifications
)
if created_event:
logger.info(f"Successfully scheduled event '{created_event.summary}' (ID: {created_event.id}) at {slot_start}")
else:
logger.error("Failed to create the event after finding an available slot.")
return created_event
# --- Analysis Wrappers ---
def get_projected_recurring_events(
credentials: Credentials,
time_min: datetime,
time_max: datetime,
calendar_id: str = 'primary',
event_query: Optional[str] = None
) -> List[ProjectedEventOccurrence]:
"""Wrapper function to find recurring events and project their occurrences.
This calls the core logic in the analysis module.
Args:
credentials: Valid Google OAuth2 credentials.
time_min: Start of the projection window (timezone-aware recommended).
time_max: End of the projection window (timezone-aware recommended).
calendar_id: The calendar to search within.
event_query: Optional text query to filter master recurring events (e.g., "Birthday").
Returns:
A list of ProjectedEventOccurrence objects representing calculated occurrences.
"""
logger.info(f"Action: get_projected_recurring_events called for calendar '{calendar_id}'")
# Directly call the analysis function
return project_recurring_events(
credentials=credentials,
time_min=time_min,
time_max=time_max,
calendar_id=calendar_id,
event_query=event_query
)
def get_busyness_analysis(
credentials: Credentials,
time_min: datetime,
time_max: datetime,
calendar_id: str = 'primary',
) -> Optional[Dict[date, Dict[str, Any]]]:
"""Wrapper function to analyze daily event busyness.
This calls the core logic in the analysis module.
Args:
credentials: Valid Google OAuth2 credentials.
time_min: Start of the analysis window (timezone-aware recommended).
time_max: End of the analysis window (timezone-aware recommended).
calendar_id: The calendar to analyze.
Returns:
A dictionary mapping each date to its busyness stats, or None on error.
"""
logger.info(f"Action: get_busyness_analysis called for calendar '{calendar_id}'")
# Directly call the analysis function
# Add error handling if analyze_busyness itself can raise specific exceptions
try:
return analyze_busyness(
credentials=credentials,
time_min=time_min,
time_max=time_max,
calendar_id=calendar_id,
)
except Exception as e:
# Log the specific error from the analysis function
logger.error(f"Error during busyness analysis execution: {e}", exc_info=True)
return None # Return None to signal error to the server endpoint
# --- Add other action functions below (create_calendar) ---