MCP iCal Server
by Omar-V2
import sys
from datetime import datetime
from threading import Semaphore
from typing import Any
from EventKit import (
EKAlarm, # type: ignore
EKCalendar, # type: ignore
EKEntityTypeEvent, # type: ignore
EKEvent, # type: ignore
EKEventStore, # type: ignore
EKSpanFutureEvents, # type: ignore
EKSpanThisEvent, # type: ignore
)
from loguru import logger
from .models import (
CreateEventRequest,
Event,
UpdateEventRequest,
)
logger.remove()
logger.add(
sys.stderr,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}",
level="DEBUG",
)
class CalendarManager:
def __init__(self):
self.event_store = EKEventStore.alloc().init()
# Force a fresh permission check
auth_status = EKEventStore.authorizationStatusForEntityType_(EKEntityTypeEvent)
logger.debug(f"Initial Calendar authorization status: {auth_status}")
# Always request access regardless of current status
if not self._request_access():
logger.error("Calendar access request failed")
raise ValueError(
"Calendar access not granted. Please check System Settings > Privacy & Security > Calendar."
)
logger.info("Calendar access granted successfully")
def list_events(
self,
start_time: datetime,
end_time: datetime,
calendar_name: str | None = None,
) -> list[Event]:
"""List all events within a given date range
Args:
start_time: The start time of the date range
end_time: The end time of the date range
calendar_name: The name of the calendar to filter by
Returns:
list[Event]: A list of events within the date range
"""
# only list events in a particular calendar if specified, otherwise search across all calendars
calendar = self._find_calendar_by_name(calendar_name) if calendar_name else None
if calendar_name and not calendar:
raise NoSuchCalendarException(calendar_name)
calendars = [calendar] if calendar else None
logger.info(
f"Listing events between {start_time} - {end_time}, searching in: {calendar_name if calendar_name else 'all calendars'}"
)
predicate = self.event_store.predicateForEventsWithStartDate_endDate_calendars_(start_time, end_time, calendars)
events = self.event_store.eventsMatchingPredicate_(predicate)
return [Event.from_ekevent(event) for event in events]
def create_event(self, new_event: CreateEventRequest) -> Event:
"""Create a new calendar event
Args:
new_event: The event to create
Returns:
Event | None: The created event with identifier if successful, None if failed
"""
ekevent = EKEvent.eventWithEventStore_(self.event_store)
ekevent.setTitle_(new_event.title)
ekevent.setStartDate_(new_event.start_time)
ekevent.setEndDate_(new_event.end_time)
if new_event.notes:
ekevent.setNotes_(new_event.notes)
if new_event.location:
ekevent.setLocation_(new_event.location)
if new_event.url:
ekevent.setURL_(new_event.url)
if new_event.all_day:
ekevent.setAllDay_(new_event.all_day)
if new_event.alarms_minutes_offsets:
for minutes in new_event.alarms_minutes_offsets:
# actual_minutes = minutes + (9 * 60) if new_event.all_day else minutes
alarm = EKAlarm.alarmWithRelativeOffset_(-60 * minutes)
ekevent.addAlarm_(alarm)
if new_event.recurrence_rule:
ekevent.setRecurrenceRule_(new_event.recurrence_rule.to_ek_recurrence())
if new_event.calendar_name:
calendar = self._find_calendar_by_name(new_event.calendar_name)
if not calendar:
logger.error(
f"Failed to create event: The specified calendar '{new_event.calendar_name}' does not exist."
)
raise NoSuchCalendarException(new_event.calendar_name)
else:
calendar = self.event_store.defaultCalendarForNewEvents()
logger.debug(f"Using default calendar, {calendar}, for new event")
ekevent.setCalendar_(calendar)
try:
success, error = self.event_store.saveEvent_span_error_(ekevent, EKSpanThisEvent, None)
if not success:
logger.error(f"Failed to save event: {error}")
raise Exception(error)
logger.info(f"Successfully created event: {new_event.title}")
return Event.from_ekevent(ekevent)
except Exception as e:
logger.exception(e)
raise
def update_event(self, event_id: str, request: UpdateEventRequest) -> Event:
"""Update an existing event by its identifier
Args:
event_id: The unique identifier of the event to update
request: The update request containing the fields to modify
Returns:
Event | None: The updated event if successful, None if failed
"""
existing_event = self.find_event_by_id(event_id)
if not existing_event:
raise NoSuchEventException(event_id)
existing_ek_event = existing_event._raw_event
if not existing_ek_event:
raise NoSuchEventException(event_id)
if request.title is not None:
existing_ek_event.setTitle_(request.title)
if request.start_time is not None:
existing_ek_event.setStartDate_(request.start_time)
if request.end_time is not None:
existing_ek_event.setEndDate_(request.end_time)
if request.location is not None:
existing_ek_event.setLocation_(request.location)
if request.notes is not None:
existing_ek_event.setNotes_(request.notes)
if request.url is not None:
existing_ek_event.setURL_(request.url)
if request.all_day is not None:
existing_ek_event.setAllDay_(request.all_day)
# Update calendar if specified
if request.calendar_name:
calendar = self._find_calendar_by_name(request.calendar_name)
if calendar:
existing_ek_event.setCalendar_(calendar)
else:
raise NoSuchCalendarException(request.calendar_name)
# Update recurrence rule
if request.recurrence_rule is not None:
existing_ek_event.setRecurrenceRule_(request.recurrence_rule.to_ek_recurrence())
# Update alarms if specified
if request.alarms_minutes_offsets is not None:
alarms = []
for minutes in request.alarms_minutes_offsets:
# For all-day events EK considers start of day as reference point for alarms, so subtract one day
actual_minutes = minutes - 1440 if request.all_day else minutes
alarm = EKAlarm.alarmWithRelativeOffset_(-60 * actual_minutes) # Convert to seconds
alarms.append(alarm)
existing_ek_event.setAlarms_(alarms)
try:
# Use EKSpanFutureEvents to update all future events in the case the event is a recurring one
success, error = self.event_store.saveEvent_span_error_(existing_ek_event, EKSpanFutureEvents, None)
if not success:
logger.error(f"Failed to update event: {error}")
raise Exception(error)
logger.info(f"Successfully updated event: {request.title or existing_event.title}")
return Event.from_ekevent(existing_ek_event)
except Exception as e:
logger.error(f"Failed to update event: {e}")
raise
def delete_event(self, event_id: str) -> bool:
"""Delete an event by its identifier
Args:
event_id: The unique identifier of the event to delete
Returns:
bool: True if deletion was successful, False otherwise
Raises:
NoSuchEventException: If the event with the given ID doesn't exist
Exception: If there was an error deleting the event
"""
existing_event = self.find_event_by_id(event_id)
if not existing_event:
raise NoSuchEventException(event_id)
existing_ek_event = existing_event._raw_event
if not existing_ek_event:
raise NoSuchEventException(event_id)
try:
success, error = self.event_store.removeEvent_span_error_(existing_ek_event, EKSpanFutureEvents, None)
if not success:
logger.error(f"Failed to delete event: {error}")
raise Exception(error)
logger.info(f"Successfully deleted event: {existing_event.title}")
return True
except Exception as e:
logger.error(f"Failed to delete event: {e}")
raise
def find_event_by_id(self, identifier: str) -> Event | None:
"""Find an event by its identifier
Args:
identifier: The unique identifier of the event
Returns:
Event | None: The event if found, None otherwise
"""
ekevent = self.event_store.eventWithIdentifier_(identifier)
if not ekevent:
logger.info(f"No event found with ID: {identifier}")
return None
return Event.from_ekevent(ekevent)
def list_calendar_names(self) -> list[str]:
"""List all available calendar names
Returns:
list[str]: A list of calendar names
"""
calendars = self.event_store.calendars()
return [calendar.title() for calendar in calendars]
def list_calendars(self) -> list[Any]:
"""List all available calendars
Returns:
list[Any]: A list of EK calendar objects
"""
return self.event_store.calendars()
def _request_access(self) -> bool:
"""Request access to interact with the MacOS calendar"""
semaphore = Semaphore(0)
access_granted = False
def completion(granted: bool, error) -> None:
nonlocal access_granted
access_granted = granted
semaphore.release()
self.event_store.requestAccessToEntityType_completion_(0, completion)
semaphore.acquire()
return access_granted
def _find_calendar_by_id(self, calendar_id: str) -> Any | None:
"""Find a calendar by ID. Returns None if not found.
Args:
calendar_id: The ID of the calendar to find
Returns:
Any | None: The calendar if found, None otherwise
"""
for calendar in self.event_store.calendars():
if calendar.uniqueIdentifier() == calendar_id:
return calendar
logger.info(f"Calendar '{calendar_id}' not found")
return None
def _find_calendar_by_name(self, calendar_name: str) -> Any | None:
"""Find a calendar by name. Returns None if not found.
Args:
calendar_name: The name of the calendar to find
Returns:
Any | None: The calendar if found, None otherwise
"""
for calendar in self.event_store.calendars():
if calendar.title() == calendar_name:
return calendar
logger.info(f"Calendar '{calendar_name}' not found")
return None
def _create_calendar(self, calendar_name: str, source_name: str = "iCloud") -> Any | None:
"""Create a new calendar with the specified name.
Args:
calendar_name: The name for the new calendar
source_type: The type of source to use (2=Exchange, 4=MobileMe/iCloud, 5=Subscribed).
If None, uses the first available source.
Returns:
Any | None: The created calendar if successful, None if failed
Raises:
Exception: If there was an error creating the calendar or no matching source found
"""
logger.info(f"Creating new calendar: {calendar_name}")
# Create new calendar for events
new_calendar = EKCalendar.calendarForEntityType_eventStore_(EKEntityTypeEvent, self.event_store)
new_calendar.setTitle_(calendar_name)
# Set calendar source based on source_type
sources = self.event_store.sources()
selected_source = None
for source in sources:
if source.title() == source_name and source.supportsCalendarCreation():
logger.info(f"Using source: {source.title()} (type: {source.sourceType()})")
selected_source = source
break
if not selected_source:
available_sources = [(s.title(), s.sourceType()) for s in sources if source.supportsCalendarCreation()]
error_msg = f"No source found matching title {source_name}. Available sources: {available_sources}"
logger.error(error_msg)
raise ValueError(error_msg)
new_calendar.setSource_(selected_source)
try:
success, error = self.event_store.saveCalendar_commit_error_(new_calendar, True, None)
if not success:
logger.error(f"Failed to create calendar: {error}")
raise Exception(error)
logger.info(f"Successfully created calendar: {calendar_name}")
return new_calendar
except Exception as e:
logger.exception(f"Error creating calendar: {e}")
raise
def _delete_calendar(self, calendar_id: str) -> bool:
"""Delete a calendar by its name with extra verification."""
logger.info(f"Attempting to delete calendar with ID: {calendar_id}")
calendar = self._find_calendar_by_id(calendar_id)
if not calendar:
raise NoSuchCalendarException(calendar_id)
try:
# Try deletion with explicit commit
success, error = self.event_store.removeCalendar_commit_error_(calendar, True, None)
if not success:
logger.error(f"Failed to delete calendar: {error}")
raise Exception(error)
# Verify deletion
remaining_calendars = self.list_calendar_names()
if calendar_id in remaining_calendars:
logger.error(f"Calendar {calendar_id} still exists after deletion!")
raise Exception(f"Calendar {calendar_id} was not properly deleted")
logger.info(f"Successfully deleted calendar: {calendar_id}")
return True
except Exception as e:
logger.exception(f"Error deleting calendar: {e}")
raise
class NoSuchCalendarException(Exception):
def __init__(self, calendar_name: str):
super().__init__(f"Calendar: {calendar_name} does not exist")
class NoSuchEventException(Exception):
def __init__(self, event_id: str):
super().__init__(f"Event with id: {event_id} does not exist")