from googleapiclient.discovery import build
from datetime import datetime, timedelta
from src.google_auth import get_credentials
def get_calendar_service():
creds = get_credentials()
return build('calendar', 'v3', credentials=creds)
def list_events(calendar_id='primary', max_results=10, time_min=None):
try:
service = get_calendar_service()
if time_min is None:
time_min = datetime.utcnow().isoformat() + 'Z'
else:
time_min = datetime.fromisoformat(time_min.replace('Z', '+00:00')).isoformat() + 'Z'
events_result = service.events().list(
calendarId=calendar_id,
timeMin=time_min,
maxResults=max_results,
singleEvents=True,
orderBy='startTime'
).execute()
events = events_result.get('items', [])
formatted_events = []
for event in events:
formatted_events.append({
'id': event.get('id', ''),
'summary': event.get('summary', ''),
'description': event.get('description', ''),
'start': event.get('start', {}).get('dateTime') or event.get('start', {}).get('date', ''),
'end': event.get('end', {}).get('dateTime') or event.get('end', {}).get('date', ''),
'location': event.get('location', ''),
'attendees': [attendee.get('email', '') for attendee in event.get('attendees', [])],
'status': event.get('status', '')
})
return {
'success': True,
'events': formatted_events,
'total': len(formatted_events)
}
except Exception as error:
return {
'success': False,
'error': str(error)
}
def create_event(calendar_id='primary', event_data=None):
try:
if event_data is None:
event_data = {}
service = get_calendar_service()
event = {
'summary': event_data.get('summary', ''),
'description': event_data.get('description', ''),
'start': {
'dateTime': event_data.get('start', datetime.utcnow().isoformat() + 'Z'),
'timeZone': event_data.get('timeZone', 'America/Los_Angeles')
},
'end': {
'dateTime': event_data.get('end', (datetime.utcnow() + timedelta(hours=1)).isoformat() + 'Z'),
'timeZone': event_data.get('timeZone', 'America/Los_Angeles')
}
}
if event_data.get('location'):
event['location'] = event_data['location']
if event_data.get('attendees'):
event['attendees'] = [{'email': email} for email in event_data['attendees']]
created_event = service.events().insert(
calendarId=calendar_id,
body=event
).execute()
return {
'success': True,
'eventId': created_event.get('id', ''),
'htmlLink': created_event.get('htmlLink', ''),
'event': {
'id': created_event.get('id', ''),
'summary': created_event.get('summary', ''),
'start': created_event.get('start', {}).get('dateTime') or created_event.get('start', {}).get('date', ''),
'end': created_event.get('end', {}).get('dateTime') or created_event.get('end', {}).get('date', '')
}
}
except Exception as error:
return {
'success': False,
'error': str(error)
}
def update_event(calendar_id='primary', event_id=None, event_data=None):
try:
if event_data is None:
event_data = {}
if event_id is None:
raise ValueError('event_id is required')
service = get_calendar_service()
event = service.events().get(calendarId=calendar_id, eventId=event_id).execute()
if event_data.get('summary'):
event['summary'] = event_data['summary']
if event_data.get('description'):
event['description'] = event_data['description']
if event_data.get('start'):
event['start'] = {
'dateTime': event_data['start'],
'timeZone': event_data.get('timeZone', 'America/Los_Angeles')
}
if event_data.get('end'):
event['end'] = {
'dateTime': event_data['end'],
'timeZone': event_data.get('timeZone', 'America/Los_Angeles')
}
if event_data.get('location'):
event['location'] = event_data['location']
if event_data.get('attendees'):
event['attendees'] = [{'email': email} for email in event_data['attendees']]
updated_event = service.events().update(
calendarId=calendar_id,
eventId=event_id,
body=event
).execute()
return {
'success': True,
'eventId': updated_event.get('id', ''),
'htmlLink': updated_event.get('htmlLink', '')
}
except Exception as error:
return {
'success': False,
'error': str(error)
}
def delete_event(calendar_id='primary', event_id=None):
try:
if event_id is None:
raise ValueError('event_id is required')
service = get_calendar_service()
service.events().delete(
calendarId=calendar_id,
eventId=event_id
).execute()
return {
'success': True,
'message': 'Event deleted successfully'
}
except Exception as error:
return {
'success': False,
'error': str(error)
}
def get_event(calendar_id='primary', event_id=None):
try:
if event_id is None:
raise ValueError('event_id is required')
service = get_calendar_service()
event = service.events().get(
calendarId=calendar_id,
eventId=event_id
).execute()
return {
'success': True,
'event': {
'id': event.get('id', ''),
'summary': event.get('summary', ''),
'description': event.get('description', ''),
'start': event.get('start', {}).get('dateTime') or event.get('start', {}).get('date', ''),
'end': event.get('end', {}).get('dateTime') or event.get('end', {}).get('date', ''),
'location': event.get('location', ''),
'attendees': [attendee.get('email', '') for attendee in event.get('attendees', [])],
'status': event.get('status', '')
}
}
except Exception as error:
return {
'success': False,
'error': str(error)
}