Google Calendar MCP Server

by amornpan
Verified
from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request from googleapiclient.discovery import build from googleapiclient.errors import HttpError from datetime import datetime, timedelta import logging import asyncio import os import json from zoneinfo import ZoneInfo import sys # Set up logging log_formatter = logging.Formatter( '[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Console handler console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) # File handler log_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs') os.makedirs(log_dir, exist_ok=True) # Create logs directory if it doesn't exist log_file = os.path.join(log_dir, 'calendar_service.log') file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') file_handler.setFormatter(log_formatter) # Set up logger logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.addHandler(console_handler) logger.addHandler(file_handler) class CalendarService: SCOPES = ['https://www.googleapis.com/auth/calendar'] # Try to use Bangkok timezone, but fallback to others if not available TIMEZONE = 'Asia/Bangkok' @classmethod def get_available_timezone(cls): """Get an available timezone closest to Bangkok, with fallbacks""" timezones_to_try = [ 'Asia/Bangkok', # First choice (UTC+7) 'Asia/Jakarta', # Second choice (UTC+7) 'Asia/Singapore', # Third choice (UTC+8) 'Asia/Kolkata', # Fourth choice (UTC+5:30) 'UTC' # Last resort ] for tz in timezones_to_try: try: ZoneInfo(tz) logger.info(f"Using timezone: {tz}") return tz except Exception as e: logger.warning(f"Timezone {tz} not available: {str(e)}") logger.error("No suitable timezone found. Please install tzdata package with: pip install tzdata") sys.exit(1) def __init__(self, credentials_path: str, token_path: str): self.credentials_path = credentials_path self.token_path = token_path self.creds = None self.service = None # Use the first available timezone self.TIMEZONE = self.get_available_timezone() self.tz = ZoneInfo(self.TIMEZONE) self.events_cache = {} # Initialize events cache async def authenticate(self): """Authenticate with Google Calendar API""" try: self.creds = Credentials.from_authorized_user_file(self.token_path, self.SCOPES) # Check if credentials are expired and refresh if needed if self.creds and self.creds.expired and self.creds.refresh_token: logger.info("Token expired, refreshing...") self.creds.refresh(Request()) # Save refreshed credentials with open(self.token_path, 'w') as token: token.write(self.creds.to_json()) logger.info("Token refreshed and saved") self.service = build('calendar', 'v3', credentials=self.creds) logger.info("Authentication successful") return True except Exception as e: logger.error(f"Authentication error: {str(e)}") raise async def list_events(self, max_results: int = 1000): """List calendar events and cache their IDs""" try: if not self.service: await self.authenticate() logger.info("Fetching calendar events...") now = datetime.now(self.tz) two_years_ago = now - timedelta(days=730) one_year_later = now + timedelta(days=365) events_result = await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().list( calendarId='primary', timeMin=two_years_ago.isoformat(), timeMax=one_year_later.isoformat(), maxResults=max_results, singleEvents=True, orderBy='startTime', timeZone=self.TIMEZONE ).execute() ) events = events_result.get('items', []) logger.info(f"Found {len(events)} events") if not events: return "No events found." # Reset and update cache self.events_cache = {} formatted_text = "" for event in events: start_time = self._format_event_time(event) summary = event.get('summary', 'No title') cache_key = f"{start_time} {summary}" formatted_text += f"{cache_key}\n" self.events_cache[cache_key] = event['id'] return formatted_text except HttpError as error: error_msg = f"An error occurred: {str(error)}" logger.error(error_msg) return error_msg def _format_event_time(self, event: dict) -> str: """Format event time consistently with timezone""" start = event['start'].get('dateTime', event['start'].get('date')) if 'T' in start: # This is a datetime dt = datetime.fromisoformat(start) if dt.tzinfo is None: # Add timezone if not present dt = dt.replace(tzinfo=self.tz) formatted_time = dt.strftime('%Y-%m-%dT%H:%M:%S%z') else: # This is a date formatted_time = start return formatted_time async def delete_single_event(self, event_time: str, event_summary: str): """ Delete a specific event by its time and summary Parameters: - event_time: Event time in format matching list output - event_summary: Event title to match Returns: - Status message """ try: if not self.service: await self.authenticate() # First update the cache await self.list_events() # Get event ID from cache cache_key = f"{event_time} {event_summary}" event_id = self.events_cache.get(cache_key) if not event_id: return f"Event not found: {cache_key}" # Delete the event await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().delete( calendarId='primary', eventId=event_id ).execute() ) logger.info(f"Deleted event: {event_id} - {cache_key}") return f"Successfully deleted event: {cache_key}" except HttpError as error: error_msg = f"An error occurred: {str(error)}" logger.error(error_msg) return error_msg except Exception as error: error_msg = f"Unexpected error: {str(error)}" logger.error(error_msg) return error_msg async def create_event(self, summary: str, start_time: str, end_time: str = None, description: str = None): """ Create a new calendar event Parameters: - summary: Event title - start_time: Start time in YYYY-MM-DDTHH:MM:SS format or date in YYYY-MM-DD format - end_time: End time (optional). If not provided, event will be 1 hour long - description: Event description (optional) Returns: - Status message """ try: if not self.service: await self.authenticate() # Parse start time try: # Check if time is included if 'T' in start_time: start_dt = datetime.fromisoformat(start_time) is_datetime = True else: start_dt = datetime.strptime(start_time, '%Y-%m-%d') is_datetime = False except ValueError: return f"Invalid start time format: {start_time}. Use YYYY-MM-DDTHH:MM:SS or YYYY-MM-DD" # Handle end time if end_time: try: if 'T' in end_time: end_dt = datetime.fromisoformat(end_time) else: end_dt = datetime.strptime(end_time, '%Y-%m-%d') if is_datetime: # If start has time but end doesn't, make end time 23:59:59 end_dt = end_dt.replace(hour=23, minute=59, second=59) except ValueError: return f"Invalid end time format: {end_time}. Use YYYY-MM-DDTHH:MM:SS or YYYY-MM-DD" else: # Default to 1 hour duration for datetime events, or same day for date events if is_datetime: end_dt = start_dt + timedelta(hours=1) else: end_dt = start_dt # Create event body event_body = { 'summary': summary, 'start': { 'dateTime' if is_datetime else 'date': start_dt.isoformat(), 'timeZone': self.TIMEZONE if is_datetime else None }, 'end': { 'dateTime' if is_datetime else 'date': end_dt.isoformat(), 'timeZone': self.TIMEZONE if is_datetime else None } } # Add optional description if provided if description: event_body['description'] = description # Create the event created_event = await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().insert( calendarId='primary', body=event_body ).execute() ) logger.info(f"Created event: {created_event['id']} - {summary}") return f"Successfully created event: {summary}" except HttpError as error: error_msg = f"An error occurred: {str(error)}" logger.error(error_msg) return error_msg except Exception as error: error_msg = f"Unexpected error: {str(error)}" logger.error(error_msg) return error_msg async def delete_duplicate_events(self, target_date: str, event_summary: str): """ Delete duplicate events on a specific date Parameters: - target_date: Target date in YYYY-MM-DD format - event_summary: Event title to match Returns: - Status message """ try: if not self.service: await self.authenticate() # Parse target date try: target_dt = datetime.strptime(target_date, '%Y-%m-%d') except ValueError: return f"Invalid date format: {target_date}. Use YYYY-MM-DD" # Set time range for the target date start_dt = target_dt.replace(hour=0, minute=0, second=0, tzinfo=self.tz) end_dt = target_dt.replace(hour=23, minute=59, second=59, tzinfo=self.tz) # Get events for the target date events_result = await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().list( calendarId='primary', timeMin=start_dt.isoformat(), timeMax=end_dt.isoformat(), singleEvents=True, orderBy='startTime', timeZone=self.TIMEZONE ).execute() ) events = events_result.get('items', []) # Find duplicate events duplicate_ids = [] seen_times = set() for event in events: if event.get('summary') == event_summary: start_time = self._format_event_time(event) if start_time in seen_times: duplicate_ids.append(event['id']) else: seen_times.add(start_time) if not duplicate_ids: return f"No duplicate events found for '{event_summary}' on {target_date}" # Delete duplicate events deleted_count = 0 for event_id in duplicate_ids: try: await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().delete( calendarId='primary', eventId=event_id ).execute() ) deleted_count += 1 except HttpError as error: logger.error(f"Error deleting event {event_id}: {str(error)}") logger.info(f"Deleted {deleted_count} duplicate events for '{event_summary}' on {target_date}") return f"Successfully deleted {deleted_count} duplicate events for '{event_summary}' on {target_date}" except HttpError as error: error_msg = f"An error occurred: {str(error)}" logger.error(error_msg) return error_msg except Exception as error: error_msg = f"Unexpected error: {str(error)}" logger.error(error_msg) return error_msg async def renew_token(self): """Renew the authentication token""" try: if not self.creds: self.creds = Credentials.from_authorized_user_file(self.token_path, self.SCOPES) if self.creds and self.creds.expired and self.creds.refresh_token: logger.info("Token expired, refreshing...") self.creds.refresh(Request()) # Save refreshed credentials with open(self.token_path, 'w') as token: token.write(self.creds.to_json()) logger.info("Token refreshed and saved") return "Token renewed successfully" else: return "Token is still valid" except Exception as e: error_msg = f"Error renewing token: {str(e)}" logger.error(error_msg) return error_msg def close(self): """Clean up resources""" if self.service: logger.info("Closing calendar service") self.service.close()