MCP Personal Assistant Agent

from typing import List, Dict, Any, Optional import datetime import os import logging import re from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.errors import HttpError # Import the MCP server instance from the main file from mcp_server import mcp, Context logger = logging.getLogger("mcp-pa-agent.calendar") # Helper functions async def get_google_calendar_service(): """Get an authenticated Google Calendar service if credentials are available.""" try: # In a real implementation, you would handle OAuth2 credentials properly # For demo purposes, we'll just check if the required env vars exist if not all([ os.getenv("GOOGLE_CLIENT_ID"), os.getenv("GOOGLE_CLIENT_SECRET"), os.getenv("GOOGLE_REFRESH_TOKEN") ]): return None credentials = Credentials( token=os.getenv("GOOGLE_ACCESS_TOKEN"), refresh_token=os.getenv("GOOGLE_REFRESH_TOKEN"), client_id=os.getenv("GOOGLE_CLIENT_ID"), client_secret=os.getenv("GOOGLE_CLIENT_SECRET"), token_uri="https://oauth2.googleapis.com/token", scopes=["https://www.googleapis.com/auth/calendar"] ) return build("calendar", "v3", credentials=credentials) except Exception as e: logger.error(f"Failed to get calendar service: {str(e)}") return None # Prompts @mcp.prompt() def schedule_meeting_prompt(participants: str, duration: str = "30 minutes") -> str: """Create a prompt for scheduling a meeting""" return f"Please help me schedule a {duration} meeting with {participants}. Suggest some appropriate times and draft a meeting invitation." @mcp.prompt() def check_availability_prompt(date: str) -> str: """Create a prompt for checking calendar availability""" return f"Please check my calendar for {date} and let me know when I'm free for meetings." # Resources @mcp.resource("calendar://today") async def today_events_resource() -> str: """Resource providing today's calendar events""" service = await get_google_calendar_service() if not service: return "Calendar service is not available. Please check your Google API credentials." try: # Calculate time bounds for today now = datetime.datetime.utcnow() start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0) end_of_day = start_of_day + datetime.timedelta(days=1) # Call the Calendar API events_result = service.events().list( calendarId='primary', timeMin=start_of_day.isoformat() + 'Z', timeMax=end_of_day.isoformat() + 'Z', singleEvents=True, orderBy='startTime' ).execute() events = events_result.get('items', []) if not events: return "No events scheduled for today." # Return JSON representation of events import json return json.dumps(events, indent=2) except Exception as e: logger.error(f"Error fetching today's events: {str(e)}") return f"Error fetching calendar events: {str(e)}" # Tool functions @mcp.tool() async def get_events(days: int = 7, ctx: Context = None) -> str: """Get upcoming calendar events. Args: days: Number of days to look ahead (default 7) """ if ctx: ctx.info(f"Getting calendar events for the next {days} days") if days < 1 or days > 30: error_msg = "Days parameter must be between 1 and 30." if ctx: ctx.error(error_msg) return error_msg service = await get_google_calendar_service() if not service: error_msg = "Calendar service is not available. Please check your Google API credentials." if ctx: ctx.error(error_msg) return error_msg try: # Calculate time bounds now = datetime.datetime.utcnow() end_date = now + datetime.timedelta(days=days) if ctx: ctx.info(f"Fetching events from {now.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}") # Call the Calendar API events_result = service.events().list( calendarId='primary', timeMin=now.isoformat() + 'Z', timeMax=end_date.isoformat() + 'Z', singleEvents=True, orderBy='startTime' ).execute() events = events_result.get('items', []) if not events: return f"No upcoming events found in the next {days} days." # Format the events nicely formatted_events = [] for i, event in enumerate(events): if ctx: await ctx.report_progress(i, len(events)) start = event['start'].get('dateTime', event['start'].get('date')) end = event['end'].get('dateTime', event['end'].get('date')) # Handle different date formats try: if 'T' in start: # DateTime format start_time = datetime.datetime.fromisoformat(start.replace('Z', '+00:00')) formatted_start = start_time.strftime("%Y-%m-%d %H:%M") else: # Date-only format formatted_start = start if 'T' in end: # DateTime format end_time = datetime.datetime.fromisoformat(end.replace('Z', '+00:00')) formatted_end = end_time.strftime("%Y-%m-%d %H:%M") else: # Date-only format formatted_end = end except Exception: formatted_start = start formatted_end = end formatted_events.append(f""" Event: {event.get('summary', 'Untitled Event')} Time: {formatted_start} to {formatted_end} Location: {event.get('location', 'No location specified')} Description: {event.get('description', 'No description provided')} """) return "\n---\n".join(formatted_events) except HttpError as error: error_msg = f"An error occurred while fetching calendar events: {error}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"Unexpected error fetching calendar events: {str(e)}" if ctx: ctx.error(error_msg) return error_msg @mcp.tool() async def create_event(title: str, start_time: str, end_time: str, description: str = "", location: str = "", ctx: Context = None) -> str: """Create a new calendar event. Args: title: Title of the event start_time: Start time in ISO format (YYYY-MM-DDTHH:MM:SS) or YYYY-MM-DD for all-day events end_time: End time in ISO format (YYYY-MM-DDTHH:MM:SS) or YYYY-MM-DD for all-day events description: Optional description of the event location: Optional location of the event """ if ctx: ctx.info(f"Creating calendar event: {title}") # Validate inputs if not title: error_msg = "Event title cannot be empty." if ctx: ctx.error(error_msg) return error_msg # Validate date formats date_only_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}$') datetime_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(:\d{2})?$') is_all_day = False # Check start time format if not (date_only_pattern.match(start_time) or datetime_pattern.match(start_time)): error_msg = "Invalid start time format. Use YYYY-MM-DD for all-day events or YYYY-MM-DDTHH:MM:SS for specific times." if ctx: ctx.error(error_msg) return error_msg # Check end time format if not (date_only_pattern.match(end_time) or datetime_pattern.match(end_time)): error_msg = "Invalid end time format. Use YYYY-MM-DD for all-day events or YYYY-MM-DDTHH:MM:SS for specific times." if ctx: ctx.error(error_msg) return error_msg # Determine if this is an all-day event if date_only_pattern.match(start_time) and date_only_pattern.match(end_time): is_all_day = True elif date_only_pattern.match(start_time) != date_only_pattern.match(end_time): error_msg = "Both start and end times must be in the same format (either both dates or both date-times)." if ctx: ctx.error(error_msg) return error_msg service = await get_google_calendar_service() if not service: error_msg = "Calendar service is not available. Please check your Google API credentials." if ctx: ctx.error(error_msg) return error_msg try: # Create event object based on whether it's an all-day event or not event = { 'summary': title, 'location': location, 'description': description, } timezone = 'America/Los_Angeles' # Default timezone, should be configurable if is_all_day: # For all-day events, end date should be the next day if start_time == end_time: # If start and end are the same, make it a one-day event end_date = (datetime.datetime.strptime(end_time, '%Y-%m-%d') + datetime.timedelta(days=1)).strftime('%Y-%m-%d') else: end_date = end_time event['start'] = {'date': start_time} event['end'] = {'date': end_date} else: # For specific time events event['start'] = { 'dateTime': start_time, 'timeZone': timezone, } event['end'] = { 'dateTime': end_time, 'timeZone': timezone, } if ctx: ctx.info(f"Sending event creation request to Google Calendar API") event = service.events().insert(calendarId='primary', body=event).execute() return f"Event created successfully: {event.get('htmlLink')}" except HttpError as error: error_msg = f"Calendar API error: {error}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"An error occurred while creating the event: {str(e)}" if ctx: ctx.error(error_msg) return error_msg @mcp.tool() async def get_free_time(date: str, ctx: Context = None) -> str: """Find free time slots in your calendar for a specific date. Args: date: The date to check in YYYY-MM-DD format """ if ctx: ctx.info(f"Finding free time slots for {date}") # Validate date format date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}$') if not date_pattern.match(date): error_msg = f"Invalid date format: {date}. Please use YYYY-MM-DD." if ctx: ctx.error(error_msg) return error_msg service = await get_google_calendar_service() if not service: error_msg = "Calendar service is not available. Please check your Google API credentials." if ctx: ctx.error(error_msg) return error_msg try: # Parse the requested date requested_date = datetime.datetime.strptime(date, '%Y-%m-%d') start_of_day = requested_date.replace(hour=0, minute=0, second=0, microsecond=0) end_of_day = requested_date.replace(hour=23, minute=59, second=59, microsecond=999999) # Define working hours (9 AM to 5 PM) business_start = requested_date.replace(hour=9, minute=0, second=0, microsecond=0) business_end = requested_date.replace(hour=17, minute=0, second=0, microsecond=0) if ctx: ctx.info(f"Fetching events for {date}") # Get events for that day events_result = service.events().list( calendarId='primary', timeMin=start_of_day.isoformat() + 'Z', timeMax=end_of_day.isoformat() + 'Z', singleEvents=True, orderBy='startTime' ).execute() events = events_result.get('items', []) if not events: return f"You have no events on {date}. The entire day is free!" # Collect busy times busy_times = [] for event in events: if ctx: ctx.debug(f"Processing event: {event.get('summary')}") # Skip declined events if 'attendees' in event: for attendee in event['attendees']: if attendee.get('self', False) and attendee.get('responseStatus') == 'declined': continue start_str = event['start'].get('dateTime') end_str = event['end'].get('dateTime') # Skip all-day events from free time calculation if not start_str or not end_str: continue # Parse event times event_start = datetime.datetime.fromisoformat(start_str.replace('Z', '+00:00')) event_end = datetime.datetime.fromisoformat(end_str.replace('Z', '+00:00')) # Store busy slots busy_times.append((event_start, event_end, event.get('summary', 'Busy'))) # Sort busy times busy_times.sort(key=lambda x: x[0]) # Find free slots within business hours free_slots = [] current_time = business_start for busy_start, busy_end, summary in busy_times: # If there's a gap between current time and busy start, it's a free slot if current_time < busy_start and current_time < business_end: end_time = min(busy_start, business_end) free_slots.append((current_time, end_time)) # Move current time to the end of this busy period current_time = max(current_time, busy_end) # Add any remaining time until end of business hours if current_time < business_end: free_slots.append((current_time, business_end)) # Format and return free slots if not free_slots: return f"You're fully booked on {date}. No free slots available during business hours (9 AM - 5 PM)." formatted_free_times = [] for start, end in free_slots: # Calculate duration in minutes duration = int((end - start).total_seconds() / 60) if duration < 15: # Skip very short gaps continue formatted_free_times.append(f"{start.strftime('%I:%M %p')} - {end.strftime('%I:%M %p')} ({duration} minutes)") if not formatted_free_times: return f"You have some gaps on {date}, but they're all shorter than 15 minutes." return f"Free time slots on {date}:\n\n" + "\n".join(formatted_free_times) except HttpError as error: error_msg = f"An error occurred while fetching calendar information: {error}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"Unexpected error checking free time: {str(e)}" if ctx: ctx.error(error_msg) return error_msg