calendar_tools.py•4.57 kB
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
from googleapiclient.errors import HttpError
from ..auth.google_auth import get_service
def list_calendars() -> Dict[str, Any]:
try:
service = get_service()
calendar_list = service.calendarList().list().execute()
calendars = []
for calendar_item in calendar_list.get('items', []):
calendars.append({
'id': calendar_item['id'],
'summary': calendar_item['summary'],
'description': calendar_item.get('description', ''),
'timeZone': calendar_item.get('timeZone', ''),
'accessRole': calendar_item.get('accessRole', '')
})
return {
'success': True,
'calendars': calendars,
'count': len(calendars)
}
except HttpError as error:
return {
'success': False,
'error': f"An error occurred: {error}",
'calendars': []
}
def get_events(
calendar_id: str = 'primary',
time_min: Optional[str] = None,
time_max: Optional[str] = None,
max_results: int = 10
) -> Dict[str, Any]:
try:
service = get_service()
if time_min is None:
time_min = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
if time_max is None:
time_max = (datetime.now(timezone.utc) + timedelta(days=7)).isoformat().replace('+00:00', 'Z')
events_result = service.events().list(
calendarId=calendar_id,
timeMin=time_min,
timeMax=time_max,
maxResults=max_results,
singleEvents=True,
orderBy='startTime'
).execute()
events = events_result.get('items', [])
formatted_events = []
for event in events:
start = event['start'].get('dateTime', event['start'].get('date'))
end = event['end'].get('dateTime', event['end'].get('date'))
formatted_events.append({
'id': event['id'],
'summary': event.get('summary', 'No Title'),
'description': event.get('description', ''),
'start': start,
'end': end,
'location': event.get('location', ''),
'attendees': [att.get('email', '') for att in event.get('attendees', [])]
})
return {
'success': True,
'events': formatted_events,
'count': len(formatted_events)
}
except HttpError as error:
return {
'success': False,
'error': f"An error occurred: {error}",
'events': []
}
def create_event(
summary: str,
start_time: str,
end_time: str,
description: str = '',
location: str = '',
calendar_id: str = 'primary',
attendees: List[str] = None
) -> Dict[str, Any]:
try:
service = get_service()
event = {
'summary': summary,
'description': description,
'location': location,
'start': {
'dateTime': start_time,
'timeZone': 'UTC',
},
'end': {
'dateTime': end_time,
'timeZone': 'UTC',
},
}
if attendees:
event['attendees'] = [{'email': email} for email in attendees]
created_event = service.events().insert(
calendarId=calendar_id,
body=event
).execute()
return {
'success': True,
'event_id': created_event['id'],
'event_link': created_event.get('htmlLink', ''),
'message': f"Event '{summary}' created successfully"
}
except HttpError as error:
return {
'success': False,
'error': f"An error occurred: {error}",
'event_id': None
}
def delete_event(event_id: str, calendar_id: str = 'primary') -> Dict[str, Any]:
try:
service = get_service()
service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
return {
'success': True,
'message': f"Event {event_id} deleted successfully"
}
except HttpError as error:
return {
'success': False,
'error': f"An error occurred: {error}"
}