Skip to main content
Glama
calendar_agent.py13.1 kB
""" Intelligent Calendar Booking Agent Automatically books calendar time from natural language requests """ import re from datetime import datetime, timedelta from typing import Dict, Any, Optional, List import pytz from dateutil import parser import structlog from ..adapters.calendar_adapter import CalendarAdapter from ..utils.llm_manager import LLMManager logger = structlog.get_logger(__name__) class CalendarBookingAgent: """ AI-powered calendar booking agent Understands natural language like: - "Block time tomorrow 1-2 PM for pickup" - "Schedule interview prep from 8-10 PM tonight with 15min reminder" - "Add a meeting next Monday at 2 PM for 1 hour" """ def __init__(self, calendar_adapter: CalendarAdapter, llm_manager: LLMManager): self.calendar = calendar_adapter self.llm = llm_manager self.default_timezone = "America/Los_Angeles" logger.info("Calendar Booking Agent initialized") async def book_time( self, request: str, timezone: Optional[str] = None ) -> Dict[str, Any]: """ Book calendar time from natural language request Args: request: Natural language booking request timezone: Timezone (default: America/Los_Angeles) Returns: Booking result with event details """ tz = timezone or self.default_timezone logger.info(f"Processing booking request: {request}") try: # Step 1: Extract time information using LLM time_info = await self._extract_time_info(request) if not time_info.get("success"): return { "success": False, "error": "Could not understand the time in your request", "suggestion": "Please specify a clear start and end time" } # Step 2: Parse times start_time = self._parse_time(time_info["start_time"], tz) end_time = self._parse_time(time_info["end_time"], tz) if not start_time or not end_time: return { "success": False, "error": "Could not parse the specified times" } # Step 3: Check for conflicts conflicts = await self._check_conflicts(start_time, end_time) if conflicts.get("has_conflicts"): return { "success": False, "error": "Time slot conflicts with existing events", "conflicts": conflicts.get("events"), "suggestion": "Choose a different time or let me find free slots" } # Step 4: Create calendar event title = time_info.get("title", "Blocked Time") description = time_info.get("description") reminders = time_info.get("reminders", []) result = self.calendar.create_event( summary=title, start_time=start_time, end_time=end_time, description=description, reminders=reminders if reminders else None, timezone=tz ) if result.get("success"): logger.info( f"Successfully booked calendar time", title=title, start=start_time, end=end_time ) return { "success": True, "message": f"✅ Booked: {title}", "event_id": result["event_id"], "event_link": result.get("event_link"), "summary": title, "start_time": start_time.strftime("%Y-%m-%d %I:%M %p %Z"), "end_time": end_time.strftime("%Y-%m-%d %I:%M %p %Z"), "reminders": reminders, "timezone": tz } else: return { "success": False, "error": result.get("error", "Failed to create calendar event") } except Exception as e: logger.error(f"Booking failed: {e}") return { "success": False, "error": str(e) } async def _extract_time_info(self, request: str) -> Dict[str, Any]: """ Use LLM to extract time information from natural language Args: request: Natural language request Returns: Extracted time information """ prompt = f""" Extract calendar event information from this request: "{request}" Return a JSON object with: - title: Event title (infer from context, e.g., "pickup", "interview prep", etc.) - start_time: Start time (in natural language or ISO format) - end_time: End time (in natural language or ISO format) - description: Optional description - reminders: List of reminder minutes [15, 60, etc.] if mentioned Current date/time context: {datetime.now().strftime("%Y-%m-%d %H:%M %Z")} Examples: Request: "Block time tomorrow 1-2 PM for pickup" Output: {{"title": "Pickup", "start_time": "tomorrow 1 PM", "end_time": "tomorrow 2 PM", "reminders": []}} Request: "Schedule interview prep from 8-10 PM tonight with 15min reminder" Output: {{"title": "Interview Prep", "start_time": "today 8 PM", "end_time": "today 10 PM", "reminders": [15]}} Request: "Add meeting next Monday at 2 PM for 1 hour" Output: {{"title": "Meeting", "start_time": "next Monday 2 PM", "end_time": "next Monday 3 PM", "reminders": []}} Now extract from: "{request}" Return ONLY the JSON object, nothing else. """ try: response = await self.llm.generate(prompt) # Parse JSON from response import json # Extract JSON from response (handle markdown code blocks) content = response.content.strip() if "```json" in content: content = content.split("```json")[1].split("```")[0].strip() elif "```" in content: content = content.split("```")[1].split("```")[0].strip() time_info = json.loads(content) time_info["success"] = True return time_info except Exception as e: logger.error(f"Failed to extract time info: {e}") return {"success": False} def _parse_time(self, time_str: str, timezone: str) -> Optional[datetime]: """ Parse time string to datetime Args: time_str: Time string (natural language or ISO) timezone: Timezone Returns: Parsed datetime or None """ try: tz = pytz.timezone(timezone) # Handle relative times now = datetime.now(tz) # Replace common words time_str = time_str.lower() if "today" in time_str: time_str = time_str.replace("today", now.strftime("%Y-%m-%d")) if "tomorrow" in time_str: tomorrow = now + timedelta(days=1) time_str = time_str.replace("tomorrow", tomorrow.strftime("%Y-%m-%d")) if "tonight" in time_str: time_str = time_str.replace("tonight", now.strftime("%Y-%m-%d")) # Handle "next Monday" etc if "next" in time_str: weekdays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] for i, day in enumerate(weekdays): if day in time_str: days_ahead = (i - now.weekday() + 7) % 7 if days_ahead == 0: days_ahead = 7 next_day = now + timedelta(days=days_ahead) time_str = time_str.replace(f"next {day}", next_day.strftime("%Y-%m-%d")) # Parse with dateutil parsed_time = parser.parse(time_str) # Make timezone aware if parsed_time.tzinfo is None: parsed_time = tz.localize(parsed_time) return parsed_time except Exception as e: logger.error(f"Failed to parse time '{time_str}': {e}") return None async def _check_conflicts( self, start_time: datetime, end_time: datetime ) -> Dict[str, Any]: """ Check for calendar conflicts Args: start_time: Proposed start time end_time: Proposed end time Returns: Conflict information """ try: # Get events in the same time range result = self.calendar.get_events( time_min=start_time - timedelta(minutes=30), time_max=end_time + timedelta(minutes=30), max_results=50 ) if not result.get("success"): return {"has_conflicts": False} events = result.get("events", []) # Check for overlaps conflicts = [] for event in events: event_start = parser.parse(event["start"]) event_end = parser.parse(event["end"]) # Check if times overlap if (start_time < event_end and end_time > event_start): conflicts.append({ "id": event["id"], "summary": event["summary"], "start": event["start"], "end": event["end"] }) return { "has_conflicts": len(conflicts) > 0, "events": conflicts } except Exception as e: logger.error(f"Failed to check conflicts: {e}") return {"has_conflicts": False} async def find_free_slots( self, date: datetime, duration_minutes: int = 60, between_hours: tuple = (9, 17), # 9 AM to 5 PM max_suggestions: int = 5 ) -> Dict[str, Any]: """ Find free time slots Args: date: Date to search duration_minutes: Required duration between_hours: Time range (start_hour, end_hour) max_suggestions: Maximum suggestions Returns: List of free slots """ try: tz = pytz.timezone(self.default_timezone) # Get events for the day day_start = date.replace(hour=between_hours[0], minute=0, second=0) day_end = date.replace(hour=between_hours[1], minute=0, second=0) result = self.calendar.get_events( time_min=day_start, time_max=day_end, max_results=50 ) if not result.get("success"): return {"success": False, "error": "Could not fetch calendar"} events = result.get("events", []) # Find gaps free_slots = [] current_time = day_start for event in sorted(events, key=lambda e: parser.parse(e["start"])): event_start = parser.parse(event["start"]) event_end = parser.parse(event["end"]) # Check gap before this event if (event_start - current_time).total_seconds() >= duration_minutes * 60: free_slots.append({ "start": current_time, "end": event_start, "duration_minutes": int((event_start - current_time).total_seconds() / 60) }) current_time = max(current_time, event_end) # Check gap after last event if (day_end - current_time).total_seconds() >= duration_minutes * 60: free_slots.append({ "start": current_time, "end": day_end, "duration_minutes": int((day_end - current_time).total_seconds() / 60) }) # Format suggestions suggestions = [] for slot in free_slots[:max_suggestions]: suggestions.append({ "start": slot["start"].strftime("%I:%M %p"), "end": slot["end"].strftime("%I:%M %p"), "duration": slot["duration_minutes"], "suggestion": f"{slot['start'].strftime('%I:%M %p')} - {slot['end'].strftime('%I:%M %p')} ({slot['duration_minutes']} min available)" }) return { "success": True, "date": date.strftime("%Y-%m-%d"), "free_slots": suggestions, "count": len(suggestions) } except Exception as e: logger.error(f"Failed to find free slots: {e}") return { "success": False, "error": str(e) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pbulbule13/mcpwithgoogle'

If you have feedback or need assistance with the MCP directory API, please join our Discord server