MCP Personal Assistant Agent

  • modules
from typing import List, Dict, Any, Optional import datetime import os import logging import re from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.errors import HttpError # Import the MCP server instance from the main file from mcp_server import mcp, Context logger = logging.getLogger("mcp-pa-agent.calendar") # Helper functions async def get_google_calendar_service(): """Get an authenticated Google Calendar service if credentials are available.""" try: # In a real implementation, you would handle OAuth2 credentials properly # For demo purposes, we'll just check if the required env vars exist if not all([ os.getenv("GOOGLE_CLIENT_ID"), os.getenv("GOOGLE_CLIENT_SECRET"), os.getenv("GOOGLE_REFRESH_TOKEN") ]): return None credentials = Credentials( token=os.getenv("GOOGLE_ACCESS_TOKEN"), refresh_token=os.getenv("GOOGLE_REFRESH_TOKEN"), client_id=os.getenv("GOOGLE_CLIENT_ID"), client_secret=os.getenv("GOOGLE_CLIENT_SECRET"), token_uri="https://oauth2.googleapis.com/token", scopes=["https://www.googleapis.com/auth/calendar"] ) return build("calendar", "v3", credentials=credentials) except Exception as e: logger.error(f"Failed to get calendar service: {str(e)}") return None # Prompts @mcp.prompt() def schedule_meeting_prompt(participants: str, duration: str = "30 minutes") -> str: """Create a prompt for scheduling a meeting""" return f"Please help me schedule a {duration} meeting with {participants}. Suggest some appropriate times and draft a meeting invitation." @mcp.prompt() def check_availability_prompt(date: str) -> str: """Create a prompt for checking calendar availability""" return f"Please check my calendar for {date} and let me know when I'm free for meetings." # Resources @mcp.resource("calendar://today") async def today_events_resource() -> str: """Resource providing today's calendar events""" service = await get_google_calendar_service() if not service: return "Calendar service is not available. Please check your Google API credentials." try: # Calculate time bounds for today now = datetime.datetime.utcnow() start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0) end_of_day = start_of_day + datetime.timedelta(days=1) # Call the Calendar API events_result = service.events().list( calendarId='primary', timeMin=start_of_day.isoformat() + 'Z', timeMax=end_of_day.isoformat() + 'Z', singleEvents=True, orderBy='startTime' ).execute() events = events_result.get('items', []) if not events: return "No events scheduled for today." # Return JSON representation of events import json return json.dumps(events, indent=2) except Exception as e: logger.error(f"Error fetching today's events: {str(e)}") return f"Error fetching calendar events: {str(e)}" # Tool functions @mcp.tool() async def get_events(days: int = 7, ctx: Context = None) -> str: """Get upcoming calendar events. Args: days: Number of days to look ahead (default 7) """ if ctx: ctx.info(f"Getting calendar events for the next {days} days") if days < 1 or days > 30: error_msg = "Days parameter must be between 1 and 30." if ctx: ctx.error(error_msg) return error_msg service = await get_google_calendar_service() if not service: error_msg = "Calendar service is not available. Please check your Google API credentials." if ctx: ctx.error(error_msg) return error_msg try: # Calculate time bounds now = datetime.datetime.utcnow() end_date = now + datetime.timedelta(days=days) if ctx: ctx.info(f"Fetching events from {now.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}") # Call the Calendar API events_result = service.events().list( calendarId='primary', timeMin=now.isoformat() + 'Z', timeMax=end_date.isoformat() + 'Z', singleEvents=True, orderBy='startTime' ).execute() events = events_result.get('items', []) if not events: return f"No upcoming events found in the next {days} days." # Format the events nicely formatted_events = [] for i, event in enumerate(events): if ctx: await ctx.report_progress(i, len(events)) start = event['start'].get('dateTime', event['start'].get('date')) end = event['end'].get('dateTime', event['end'].get('date')) # Handle different date formats try: if 'T' in start: # DateTime format start_time = datetime.datetime.fromisoformat(start.replace('Z', '+00:00')) formatted_start = start_time.strftime("%Y-%m-%d %H:%M") else: # Date-only format formatted_start = start if 'T' in end: # DateTime format end_time = datetime.datetime.fromisoformat(end.replace('Z', '+00:00')) formatted_end = end_time.strftime("%Y-%m-%d %H:%M") else: # Date-only format formatted_end = end except Exception: formatted_start = start formatted_end = end formatted_events.append(f""" Event: {event.get('summary', 'Untitled Event')} Time: {formatted_start} to {formatted_end} Location: {event.get('location', 'No location specified')} Description: {event.get('description', 'No description provided')} """) return "\n---\n".join(formatted_events) except HttpError as error: error_msg = f"An error occurred while fetching calendar events: {error}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"Unexpected error fetching calendar events: {str(e)}" if ctx: ctx.error(error_msg) return error_msg @mcp.tool() async def create_event(title: str, start_time: str, end_time: str, description: str = "", location: str = "", ctx: Context = None) -> str: """Create a new calendar event. Args: title: Title of the event start_time: Start time in ISO format (YYYY-MM-DDTHH:MM:SS) or YYYY-MM-DD for all-day events end_time: End time in ISO format (YYYY-MM-DDTHH:MM:SS) or YYYY-MM-DD for all-day events description: Optional description of the event location: Optional location of the event """ if ctx: ctx.info(f"Creating calendar event: {title}") # Validate inputs if not title: error_msg = "Event title cannot be empty." if ctx: ctx.error(error_msg) return error_msg # Validate date formats date_only_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}$') datetime_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(:\d{2})?$') is_all_day = False # Check start time format if not (date_only_pattern.match(start_time) or datetime_pattern.match(start_time)): error_msg = "Invalid start time format. Use YYYY-MM-DD for all-day events or YYYY-MM-DDTHH:MM:SS for specific times." if ctx: ctx.error(error_msg) return error_msg # Check end time format if not (date_only_pattern.match(end_time) or datetime_pattern.match(end_time)): error_msg = "Invalid end time format. Use YYYY-MM-DD for all-day events or YYYY-MM-DDTHH:MM:SS for specific times." if ctx: ctx.error(error_msg) return error_msg # Determine if this is an all-day event if date_only_pattern.match(start_time) and date_only_pattern.match(end_time): is_all_day = True elif date_only_pattern.match(start_time) != date_only_pattern.match(end_time): error_msg = "Both start and end times must be in the same format (either both dates or both date-times)." if ctx: ctx.error(error_msg) return error_msg service = await get_google_calendar_service() if not service: error_msg = "Calendar service is not available. Please check your Google API credentials." if ctx: ctx.error(error_msg) return error_msg try: # Create event object based on whether it's an all-day event or not event = { 'summary': title, 'location': location, 'description': description, } timezone = 'America/Los_Angeles' # Default timezone, should be configurable if is_all_day: # For all-day events, end date should be the next day if start_time == end_time: # If start and end are the same, make it a one-day event end_date = (datetime.datetime.strptime(end_time, '%Y-%m-%d') + datetime.timedelta(days=1)).strftime('%Y-%m-%d') else: end_date = end_time event['start'] = {'date': start_time} event['end'] = {'date': end_date} else: # For specific time events event['start'] = { 'dateTime': start_time, 'timeZone': timezone, } event['end'] = { 'dateTime': end_time, 'timeZone': timezone, } if ctx: ctx.info(f"Sending event creation request to Google Calendar API") event = service.events().insert(calendarId='primary', body=event).execute() return f"Event created successfully: {event.get('htmlLink')}" except HttpError as error: error_msg = f"Calendar API error: {error}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"An error occurred while creating the event: {str(e)}" if ctx: ctx.error(error_msg) return error_msg @mcp.tool() async def get_free_time(date: str, ctx: Context = None) -> str: """Find free time slots in your calendar for a specific date. Args: date: The date to check in YYYY-MM-DD format """ if ctx: ctx.info(f"Finding free time slots for {date}") # Validate date format date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}$') if not date_pattern.match(date): error_msg = f"Invalid date format: {date}. Please use YYYY-MM-DD." if ctx: ctx.error(error_msg) return error_msg service = await get_google_calendar_service() if not service: error_msg = "Calendar service is not available. Please check your Google API credentials." if ctx: ctx.error(error_msg) return error_msg try: # Parse the requested date requested_date = datetime.datetime.strptime(date, '%Y-%m-%d') start_of_day = requested_date.replace(hour=0, minute=0, second=0, microsecond=0) end_of_day = requested_date.replace(hour=23, minute=59, second=59, microsecond=999999) # Define working hours (9 AM to 5 PM) business_start = requested_date.replace(hour=9, minute=0, second=0, microsecond=0) business_end = requested_date.replace(hour=17, minute=0, second=0, microsecond=0) if ctx: ctx.info(f"Fetching events for {date}") # Get events for that day events_result = service.events().list( calendarId='primary', timeMin=start_of_day.isoformat() + 'Z', timeMax=end_of_day.isoformat() + 'Z', singleEvents=True, orderBy='startTime' ).execute() events = events_result.get('items', []) if not events: return f"You have no events on {date}. The entire day is free!" # Collect busy times busy_times = [] for event in events: if ctx: ctx.debug(f"Processing event: {event.get('summary')}") # Skip declined events if 'attendees' in event: for attendee in event['attendees']: if attendee.get('self', False) and attendee.get('responseStatus') == 'declined': continue start_str = event['start'].get('dateTime') end_str = event['end'].get('dateTime') # Skip all-day events from free time calculation if not start_str or not end_str: continue # Parse event times event_start = datetime.datetime.fromisoformat(start_str.replace('Z', '+00:00')) event_end = datetime.datetime.fromisoformat(end_str.replace('Z', '+00:00')) # Store busy slots busy_times.append((event_start, event_end, event.get('summary', 'Busy'))) # Sort busy times busy_times.sort(key=lambda x: x[0]) # Find free slots within business hours free_slots = [] current_time = business_start for busy_start, busy_end, summary in busy_times: # If there's a gap between current time and busy start, it's a free slot if current_time < busy_start and current_time < business_end: end_time = min(busy_start, business_end) free_slots.append((current_time, end_time)) # Move current time to the end of this busy period current_time = max(current_time, busy_end) # Add any remaining time until end of business hours if current_time < business_end: free_slots.append((current_time, business_end)) # Format and return free slots if not free_slots: return f"You're fully booked on {date}. No free slots available during business hours (9 AM - 5 PM)." formatted_free_times = [] for start, end in free_slots: # Calculate duration in minutes duration = int((end - start).total_seconds() / 60) if duration < 15: # Skip very short gaps continue formatted_free_times.append(f"{start.strftime('%I:%M %p')} - {end.strftime('%I:%M %p')} ({duration} minutes)") if not formatted_free_times: return f"You have some gaps on {date}, but they're all shorter than 15 minutes." return f"Free time slots on {date}:\n\n" + "\n".join(formatted_free_times) except HttpError as error: error_msg = f"An error occurred while fetching calendar information: {error}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"Unexpected error checking free time: {str(e)}" if ctx: ctx.error(error_msg) return error_msg