main.py•3.01 kB
from typing import Any, List, Optional
from datetime import datetime
import os.path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from fastmcp import FastMCP
import json
# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/calendar']
def get_calendar_service():
"""Get an authorized Google Calendar API service instance."""
creds = None
# The file token.json stores the user's access and refresh tokens
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_info(json.loads(open('token.json').read()), SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open('token.json', 'w') as token:
token.write(creds.to_json())
return build('calendar', 'v3', credentials=creds)
# Initialize FastMCP server
mcp = FastMCP("google_calendar")
@mcp.tool()
async def create_event(summary: str, start_time: str, end_time: str,
description: Optional[str] = None, location: Optional[str] = None,
attendees: Optional[List[str]] = None, timezone: str = 'America/Los_Angeles'):
"""Create a new event in Google Calendar
Args:
summary: Title of the event
start_time: Start time in ISO format (YYYY-MM-DDTHH:MM:SS)
end_time: End time in ISO format (YYYY-MM-DDTHH:MM:SS)
description: Description of the event
location: Location of the event
attendees: List of email addresses to invite
timezone: Timezone for the event
"""
service = get_calendar_service()
event = {
'summary': summary,
'location': location,
'description': description,
'start': {
'dateTime': start_time,
'timeZone': timezone,
},
'end': {
'dateTime': end_time,
'timeZone': timezone,
}
}
if attendees:
event['attendees'] = [{'email': email} for email in attendees]
event = service.events().insert(calendarId='primary', body=event).execute()
return f"Event created: {event.get('htmlLink')}"
if __name__ == "__main__":
# Test the create_event function
import asyncio
print("Creating test event...")
result = asyncio.run(create_event(
summary="Test Event",
start_time="2025-03-27T09:00:00",
end_time="2025-03-27T10:00:00"
))
print(result)
# Or run the MCP server
# mcp.run(transport='stdio')