Skip to main content
Glama

Google-Workspace-MCP-Server

MIT License
15
mcp_server.py18.7 kB
# mcp_server.py import base64 import os import sys from typing import Dict, Any, AsyncIterator, Optional, List from contextlib import asynccontextmanager import io from email.message import EmailMessage import google.oauth2.credentials import googleapiclient.discovery from google.auth.transport.requests import Request from googleapiclient.errors import HttpError from googleapiclient.http import MediaIoBaseUpload from mcp.server.fastmcp import FastMCP, Context from pydantic import BaseModel, Field from config import TOKEN_PATH, SCOPES SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) class EventDetails(BaseModel): summary: str = Field(description="The title or summary of the calendar event.") start_time: str = Field(description="The start time of the event in ISO 8601 format (e.g., '2025-07-05T15:00:00').") end_time: str = Field(description="The end time of the event in ISO 8601 format (e.g., '2025-07-05T16:00:00').") description: Optional[str] = Field(None, description="A detailed description for the event. Can include notes from the source email.") """ EventUpdateDetails is for updating an event. When you update an event, you often only want to change one or two things (e.g., just the title, or just the end time). If we used the original EventDetails model, the agent would be forced to provide values for all fields, even the ones it wasn't changing. By making all fields in EventUpdateDetails Optional, we allow for partial updates. The agent can provide only the fields it wants to change, making the tool much more flexible and easier to use. """ class EventUpdateDetails(BaseModel): summary: Optional[str] = Field(None, description="The new title for the event.") start_time: Optional[str] = Field(None, description="The new start time in ISO 8601 format.") end_time: Optional[str] = Field(None, description="The new end time in ISO 8601 format.") description: Optional[str] = Field(None, description="The new description for the event.") class EmailContent(BaseModel): to: str = Field(description="The recipient's email address.") subject: str = Field(description="The subject line of the email.") body: str = Field(description="The plain text body of the email.") class ListedEvent(BaseModel): id: str = Field(description="The unique ID of the event.") summary: str = Field(description="The title of the event.") start_time: str = Field(description="The start time of the event in ISO 8601 format.") end_time: str = Field(description="The end time of the event in ISO 8601 format.") class ListedDriveFile(BaseModel): id: str = Field(description="The unique ID of the file.") name: str = Field(description="The name of the file.") mime_type: str = Field(description="The MIME type of the file (e.g., 'application/vnd.google-apps.document').") # --- Lifespan Management for Credentials --- @asynccontextmanager async def credential_manager(server: FastMCP) -> AsyncIterator[Dict[str, Any]]: """ Manages loading and refreshing Google API credentials on server startup. """ creds = None if not os.path.exists(TOKEN_PATH): print(f"ERROR: Token file '{TOKEN_PATH}' not found.", file=sys.stderr) print("Please run 'python get_credentials.py' first to authorize the application.", file=sys.stderr) # Yield an empty context and let tool calls fail gracefully yield {"creds": None} return print(f"Loading credentials from {TOKEN_PATH}", file=sys.stderr) creds = google.oauth2.credentials.Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES) if creds and creds.expired and creds.refresh_token: print("Credentials expired. Refreshing...", file=sys.stderr) try: creds.refresh(Request()) # Re-save the refreshed token with open(TOKEN_PATH, 'w') as token_file: token_file.write(creds.to_json()) print("Token refreshed and saved.", file=sys.stderr) except Exception as e: print(f"ERROR: Failed to refresh token: {e}", file=sys.stderr) creds = None # Mark credentials as invalid # Make credentials available to all tool handlers via context yield {"creds": creds} print("Server shutting down.", file=sys.stderr) # Initialize the server with the lifespan manager server = FastMCP( "GsuiteMCPServer", title="Gsuite MCP Server", lifespan=credential_manager ) def get_creds_from_context(ctx: Context) -> google.oauth2.credentials.Credentials: """Helper to get credentials from the context and handle errors.""" creds = ctx.request_context.lifespan_context.get("creds") if not creds or not creds.valid: raise Exception( "Google API credentials are not available or invalid. " "Please run 'python get_credentials.py' to authenticate." ) return creds def get_email_body(payload: Dict[str, Any]) -> Optional[str]: """Recursively finds the 'text/plain' part of an email.""" if 'parts' in payload: for part in payload['parts']: if part['mimeType'] == 'text/plain' and 'data' in part['body']: return base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') # Recurse to check nested parts body = get_email_body(part) if body: return body elif payload.get('mimeType') == 'text/plain' and 'data' in payload.get('body', {}): return base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8') return None # --- GMAIL TOOLS --- @server.tool() def read_latest_gmail_email(ctx: Context) -> Dict[str, str]: """Reads the most recent email from the user's Gmail inbox.""" creds = get_creds_from_context(ctx) try: gmail_service = googleapiclient.discovery.build('gmail', 'v1', credentials=creds) messages_list = gmail_service.users().messages().list(userId='me', maxResults=1).execute() if not messages_list.get('messages'): raise Exception("No emails found.") msg_id = messages_list['messages'][0]['id'] message = gmail_service.users().messages().get(userId='me', id=msg_id, format='full').execute() email_body = get_email_body(message['payload']) if not email_body: email_body = "Could not find the body in the last email." # raise Exception("Could not find the body in the last email.") return {'snippet': message.get('snippet', ''), 'body': email_body} except HttpError as e: raise Exception(f"API Gmail error (HTTP {e.status_code}): {e.reason}") @server.tool() def read_email_by_subject(subject: str, ctx: Context) -> List[Dict[str, str]]: """ Searches for emails by subject and returns the body and snippet of the most recent matches. Args: subject: The subject line to search for. """ creds = get_creds_from_context(ctx) try: gmail_service = googleapiclient.discovery.build('gmail', 'v1', credentials=creds) # Search for messages with the given subject, get the most recent 5 results = gmail_service.users().messages().list(userId='me', q=f'subject:"{subject}"', maxResults=5).execute() messages = results.get('messages', []) if not messages: return [{"status": f"No emails found with subject: '{subject}'"}] emails = [] for msg_info in messages: msg = gmail_service.users().messages().get(userId='me', id=msg_info['id'], format='full').execute() body = get_email_body(msg['payload']) or "Could not extract plain text body." emails.append({'id': msg['id'], 'snippet': msg.get('snippet', ''), 'body': body}) return emails except HttpError as e: raise Exception(f"API Gmail error (HTTP {e.status_code}): {e.reason}") @server.tool() def send_email(email_content: EmailContent, ctx: Context) -> Dict[str, str]: """ Sends an email from the user's Gmail account. Args: email_content: A structured object containing the recipient, subject, and body. """ creds = get_creds_from_context(ctx) try: gmail_service = googleapiclient.discovery.build('gmail', 'v1', credentials=creds) message = EmailMessage() message.set_content(email_content.body) message['To'] = email_content.to message['Subject'] = email_content.subject encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() create_message = {'raw': encoded_message} send_message = gmail_service.users().messages().send(userId="me", body=create_message).execute() return {"status": "Email sent successfully", "messageId": send_message['id']} except HttpError as e: raise Exception(f"API Gmail error (HTTP {e.status_code}): {e.reason}") # --- CALENDAR TOOLS --- @server.tool() def list_calendar_events(ctx: Context, start_time: str, end_time: str, query: Optional[str] = None) -> List[ListedEvent]: """ Lists calendar events within a specified time range, optionally filtering by a search query. Args: start_time: The start of the time range in ISO 8601 format (e.g., '2025-07-05T00:00:00Z'). end_time: The end of the time range in ISO 8601 format (e.g., '2025-07-06T00:00:00Z'). query: An optional text query to filter events by (e.g., 'meeting'). """ creds = get_creds_from_context(ctx) try: calendar_service = googleapiclient.discovery.build('calendar', 'v3', credentials=creds) events_result = calendar_service.events().list( calendarId='primary', timeMin=start_time, timeMax=end_time, q=query, # TODO: study it maxResults=20, # Limit results to a reasonable number singleEvents=True, orderBy='startTime' ).execute() events = events_result.get('items', []) if not events: return [] listed_events = [] for event in events: # Handle all-day events which have 'date' instead of 'dateTime' start = event['start'].get('dateTime', event['start'].get('date')) end = event['end'].get('dateTime', event['end'].get('date')) listed_events.append( ListedEvent( id=event['id'], summary=event.get('summary', 'No Title'), start_time=start, end_time=end ) ) return listed_events except HttpError as e: raise Exception(f"API Calendar error (HTTP {e.status_code}): {e.reason}") @server.tool() def create_calendar_event(event_details: EventDetails, ctx: Context) -> Dict[str, Any]: """ Creates a Google Calendar event from structured event details. Args: event_details: A structured object containing the summary, start time, end time, and description. """ creds = get_creds_from_context(ctx) event_body = { 'summary': event_details.summary, 'description': event_details.description or f'Created from an email automation.', 'start': {'dateTime': event_details.start_time, 'timeZone': 'Europe/Rome'}, 'end': {'dateTime': event_details.end_time, 'timeZone': 'Europe/Rome'}, } try: calendar_service = googleapiclient.discovery.build('calendar', 'v3', credentials=creds) created_event = calendar_service.events().insert(calendarId='primary', body=event_body).execute() return created_event except HttpError as e: raise Exception(f"API Calendar error (HTTP {e.status_code}): {e.reason}") @server.tool() def delete_calendar_event(event_id: str, ctx: Context) -> Dict[str, str]: """ Deletes a calendar event by its ID. To get an event ID, first list or search for events. Args: event_id: The unique ID of the event to delete. """ creds = get_creds_from_context(ctx) try: calendar_service = googleapiclient.discovery.build('calendar', 'v3', credentials=creds) calendar_service.events().delete(calendarId='primary', eventId=event_id).execute() return {"status": "Event deleted successfully"} except HttpError as e: raise Exception(f"API Calendar error (HTTP {e.status_code}): {e.reason}") @server.tool() def update_calendar_event(event_id: str, update_details: EventUpdateDetails, ctx: Context) -> Dict[str, Any]: """ Updates an existing calendar event by its ID. Only provided fields will be updated. Args: event_id: The ID of the event to update. update_details: A structured object with the fields to update. """ creds = get_creds_from_context(ctx) try: calendar_service = googleapiclient.discovery.build('calendar', 'v3', credentials=creds) # First, get the existing event to ensure it exists and to merge updates event = calendar_service.events().get(calendarId='primary', eventId=event_id).execute() # Create the update body with only the fields that are provided update_body = update_details.model_dump(exclude_unset=True) if 'start_time' in update_body: event['start']['dateTime'] = update_body['start_time'] if 'end_time' in update_body: event['end']['dateTime'] = update_body['end_time'] if 'summary' in update_body: event['summary'] = update_body['summary'] if 'description' in update_body: event['description'] = update_body['description'] updated_event = calendar_service.events().update(calendarId='primary', eventId=event['id'], body=event).execute() return updated_event except HttpError as e: raise Exception(f"API Calendar error (HTTP {e.status_code}): {e.reason}") # --- GOOGLE DRIVE TOOLS --- @server.tool() def list_drive_files(query: str, ctx: Context) -> List[ListedDriveFile]: """ Searches for files in Google Drive using a query string. Args: query: The search query. Examples: "name contains 'report'", "mimeType='application/vnd.google-apps.spreadsheet'". See Google Drive API docs for full query syntax. """ creds = get_creds_from_context(ctx) try: drive_service = googleapiclient.discovery.build('drive', 'v3', credentials=creds) # TODO: Considering adding https://developers.google.com/workspace/drive/api/guides/search-files as a guide for q parameter results = drive_service.files().list( q=query, pageSize=20, # Limit results fields="nextPageToken, files(id, name, mimeType)" ).execute() files = results.get('files', []) if not files: return [] return [ ListedDriveFile( id=file['id'], name=file['name'], mime_type=file['mimeType'] ) for file in files ] except HttpError as e: raise Exception(f"API Drive error (HTTP {e.status_code}): {e.reason}") @server.tool() def create_drive_document(ctx: Context, title: str, content: Optional[str] = "") -> Dict[str, str]: """ Creates a new Google Document in the user's Drive with the given title and content. Args: title: The title of the new document. content: The initial text content for the document. """ creds = get_creds_from_context(ctx) try: drive_service = googleapiclient.discovery.build('drive', 'v3', credentials=creds) file_metadata = { 'name': title, 'mimeType': 'application/vnd.google-apps.document' } media = MediaIoBaseUpload(io.BytesIO((content or "").encode()), mimetype='text/plain', resumable=True) file = drive_service.files().create(body=file_metadata, media_body=media, fields='id,name,webViewLink').execute() return {"status": "Document created", "id": file['id'], "name": file['name'], "link": file['webViewLink']} except HttpError as e: raise Exception(f"API Drive error (HTTP {e.status_code}): {e.reason}") @server.tool() def update_drive_document(file_id: str, content: str, ctx: Context) -> Dict[str, str]: """ Overwrites the content of an existing Google Document. Args: file_id: The ID of the document to update. content: The new text content to write to the document. """ creds = get_creds_from_context(ctx) try: drive_service = googleapiclient.discovery.build('drive', 'v3', credentials=creds) media = MediaIoBaseUpload(io.BytesIO(content.encode()), mimetype='text/plain', resumable=True) updated_file = drive_service.files().update(fileId=file_id, media_body=media, fields='id,name').execute() return {"status": "Document updated", "id": updated_file['id'], "name": updated_file['name']} except HttpError as e: raise Exception(f"API Drive error (HTTP {e.status_code}): {e.reason}") # Dangerous. Use with caution. @server.tool() def delete_drive_file(file_id: str, ctx: Context) -> Dict[str, str]: """ Permanently deletes a file from Google Drive. This action cannot be undone. Args: file_id: The ID of the file to delete. """ creds = get_creds_from_context(ctx) try: drive_service = googleapiclient.discovery.build('drive', 'v3', credentials=creds) drive_service.files().delete(fileId=file_id).execute() return {"status": f"File with ID '{file_id}' deleted successfully."} except HttpError as e: raise Exception(f"API Drive error (HTTP {e.status_code}): {e.reason}") @server.tool() def move_drive_file_to_bin(file_id: str, ctx: Context) -> Dict[str, str]: """ Moves a file to the Google Drive bin (trash). The file can be restored from the bin later. Args: file_id: The ID of the file to move to the bin. """ creds = get_creds_from_context(ctx) try: drive_service = googleapiclient.discovery.build('drive', 'v3', credentials=creds) # To move a file to the bin, we update its metadata to set 'trashed' to True. body = {'trashed': True} drive_service.files().update(fileId=file_id, body=body).execute() return {"status": f"File with ID '{file_id}' moved to bin successfully."} except HttpError as e: raise Exception(f"API Drive error (HTTP {e.status_code}): {e.reason}") if __name__ == "__main__": server.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/giuseppe-coco/Google-Workspace-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server