Skip to main content
Glama
google_gmail.py20.9 kB
""" Google Gmail Integration Module Full email management including drafts, send, reply, and labels """ from googleapiclient.discovery import build from base64 import urlsafe_b64decode, urlsafe_b64encode from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from src.google_auth import get_credentials def get_gmail_service(): creds = get_credentials() return build('gmail', 'v1', credentials=creds) def list_emails(max_results=10, query=''): """List emails from Gmail inbox""" try: service = get_gmail_service() results = service.users().messages().list( userId='me', maxResults=max_results, q=query ).execute() messages = results.get('messages', []) message_details = [] for message in messages: detail = get_email_detail(message['id']) if detail['success']: message_details.append(detail['data']) return { 'success': True, 'messages': message_details, 'total': results.get('resultSizeEstimate', 0) } except Exception as error: return {'success': False, 'error': str(error)} def get_email_detail(message_id): """Get detailed information about a specific email""" try: service = get_gmail_service() message = service.users().messages().get( userId='me', id=message_id, format='full' ).execute() headers = message.get('payload', {}).get('headers', []) def get_header(name): for header in headers: if header['name'].lower() == name.lower(): return header['value'] return '' data = { 'id': message['id'], 'threadId': message.get('threadId', ''), 'subject': get_header('subject'), 'from': get_header('from'), 'to': get_header('to'), 'cc': get_header('cc'), 'date': get_header('date'), 'snippet': message.get('snippet', ''), 'body': extract_body(message.get('payload', {})), 'labelIds': message.get('labelIds', []) } return {'success': True, 'data': data} except Exception as error: return {'success': False, 'error': str(error)} def extract_body(payload): """Extract plain text body from email payload""" if not payload: return '' body_data = payload.get('body', {}).get('data') if body_data: return urlsafe_b64decode(body_data).decode('utf-8', errors='ignore') parts = payload.get('parts', []) for part in parts: if part.get('mimeType') == 'text/plain': body_data = part.get('body', {}).get('data') if body_data: return urlsafe_b64decode(body_data).decode('utf-8', errors='ignore') elif part.get('mimeType') == 'text/html': body_data = part.get('body', {}).get('data') if body_data: return urlsafe_b64decode(body_data).decode('utf-8', errors='ignore') return '' def send_email(to, subject, body, is_html=False, cc=None, bcc=None): """Send an email via Gmail""" try: service = get_gmail_service() message = MIMEText(body, 'html' if is_html else 'plain') message['to'] = to message['subject'] = subject if cc: message['cc'] = cc if isinstance(cc, str) else ', '.join(cc) if bcc: message['bcc'] = bcc if isinstance(bcc, str) else ', '.join(bcc) raw_message = urlsafe_b64encode(message.as_bytes()).decode('utf-8') send_message = service.users().messages().send( userId='me', body={'raw': raw_message} ).execute() return { 'success': True, 'messageId': send_message.get('id', ''), 'threadId': send_message.get('threadId', ''), 'message': 'Email sent successfully' } except Exception as error: return {'success': False, 'error': str(error)} def create_draft(to, subject, body, is_html=False, cc=None, bcc=None): """Create a draft email""" try: service = get_gmail_service() message = MIMEText(body, 'html' if is_html else 'plain') message['to'] = to message['subject'] = subject if cc: message['cc'] = cc if isinstance(cc, str) else ', '.join(cc) if bcc: message['bcc'] = bcc if isinstance(bcc, str) else ', '.join(bcc) raw_message = urlsafe_b64encode(message.as_bytes()).decode('utf-8') draft = service.users().drafts().create( userId='me', body={'message': {'raw': raw_message}} ).execute() return { 'success': True, 'draftId': draft.get('id', ''), 'messageId': draft.get('message', {}).get('id', ''), 'message': 'Draft created successfully' } except Exception as error: return {'success': False, 'error': str(error)} def list_drafts(max_results=20): """List all draft emails""" try: service = get_gmail_service() results = service.users().drafts().list( userId='me', maxResults=max_results ).execute() drafts = results.get('drafts', []) draft_details = [] for draft in drafts: draft_info = service.users().drafts().get( userId='me', id=draft['id'] ).execute() message = draft_info.get('message', {}) headers = message.get('payload', {}).get('headers', []) def get_header(name): for header in headers: if header['name'].lower() == name.lower(): return header['value'] return '' draft_details.append({ 'draftId': draft['id'], 'messageId': message.get('id', ''), 'to': get_header('to'), 'subject': get_header('subject'), 'snippet': message.get('snippet', '') }) return { 'success': True, 'drafts': draft_details, 'total': len(draft_details) } except Exception as error: return {'success': False, 'error': str(error)} def get_draft(draft_id): """Get a specific draft by ID""" try: service = get_gmail_service() draft = service.users().drafts().get( userId='me', id=draft_id ).execute() message = draft.get('message', {}) headers = message.get('payload', {}).get('headers', []) def get_header(name): for header in headers: if header['name'].lower() == name.lower(): return header['value'] return '' return { 'success': True, 'draft': { 'draftId': draft['id'], 'messageId': message.get('id', ''), 'to': get_header('to'), 'cc': get_header('cc'), 'subject': get_header('subject'), 'body': extract_body(message.get('payload', {})), 'snippet': message.get('snippet', '') } } except Exception as error: return {'success': False, 'error': str(error)} def update_draft(draft_id, to=None, subject=None, body=None, is_html=False, cc=None, bcc=None): """Update an existing draft""" try: service = get_gmail_service() # Get existing draft to preserve values not being updated existing = get_draft(draft_id) if not existing['success']: return existing existing_draft = existing['draft'] # Use new values or fall back to existing final_to = to or existing_draft.get('to', '') final_subject = subject or existing_draft.get('subject', '') final_body = body or existing_draft.get('body', '') message = MIMEText(final_body, 'html' if is_html else 'plain') message['to'] = final_to message['subject'] = final_subject if cc: message['cc'] = cc if isinstance(cc, str) else ', '.join(cc) if bcc: message['bcc'] = bcc if isinstance(bcc, str) else ', '.join(bcc) raw_message = urlsafe_b64encode(message.as_bytes()).decode('utf-8') updated_draft = service.users().drafts().update( userId='me', id=draft_id, body={'message': {'raw': raw_message}} ).execute() return { 'success': True, 'draftId': updated_draft.get('id', ''), 'message': 'Draft updated successfully' } except Exception as error: return {'success': False, 'error': str(error)} def send_draft(draft_id): """Send an existing draft""" try: service = get_gmail_service() sent_message = service.users().drafts().send( userId='me', body={'id': draft_id} ).execute() return { 'success': True, 'messageId': sent_message.get('id', ''), 'threadId': sent_message.get('threadId', ''), 'message': 'Draft sent successfully' } except Exception as error: return {'success': False, 'error': str(error)} def delete_draft(draft_id): """Delete a draft""" try: service = get_gmail_service() service.users().drafts().delete( userId='me', id=draft_id ).execute() return { 'success': True, 'message': 'Draft deleted successfully' } except Exception as error: return {'success': False, 'error': str(error)} def reply_to_email(message_id, body, is_html=False): """Reply to an existing email thread""" try: service = get_gmail_service() # Get original message to extract thread info original = service.users().messages().get( userId='me', id=message_id, format='full' ).execute() headers = original.get('payload', {}).get('headers', []) def get_header(name): for header in headers: if header['name'].lower() == name.lower(): return header['value'] return '' original_from = get_header('from') original_subject = get_header('subject') thread_id = original.get('threadId', '') # Add Re: prefix if not already present if not original_subject.lower().startswith('re:'): reply_subject = f'Re: {original_subject}' else: reply_subject = original_subject message = MIMEText(body, 'html' if is_html else 'plain') message['to'] = original_from message['subject'] = reply_subject message['In-Reply-To'] = message_id message['References'] = message_id raw_message = urlsafe_b64encode(message.as_bytes()).decode('utf-8') reply = service.users().messages().send( userId='me', body={ 'raw': raw_message, 'threadId': thread_id } ).execute() return { 'success': True, 'messageId': reply.get('id', ''), 'threadId': reply.get('threadId', ''), 'message': 'Reply sent successfully' } except Exception as error: return {'success': False, 'error': str(error)} def forward_email(message_id, to, additional_message=''): """Forward an email to another recipient""" try: service = get_gmail_service() # Get original message original = service.users().messages().get( userId='me', id=message_id, format='full' ).execute() headers = original.get('payload', {}).get('headers', []) def get_header(name): for header in headers: if header['name'].lower() == name.lower(): return header['value'] return '' original_from = get_header('from') original_subject = get_header('subject') original_body = extract_body(original.get('payload', {})) # Build forward subject if not original_subject.lower().startswith('fwd:'): forward_subject = f'Fwd: {original_subject}' else: forward_subject = original_subject # Build forward body forward_body = f"""{additional_message} ---------- Forwarded message --------- From: {original_from} Subject: {original_subject} {original_body} """ message = MIMEText(forward_body) message['to'] = to message['subject'] = forward_subject raw_message = urlsafe_b64encode(message.as_bytes()).decode('utf-8') forwarded = service.users().messages().send( userId='me', body={'raw': raw_message} ).execute() return { 'success': True, 'messageId': forwarded.get('id', ''), 'threadId': forwarded.get('threadId', ''), 'message': 'Email forwarded successfully' } except Exception as error: return {'success': False, 'error': str(error)} def search_emails(query, max_results=50): """Search emails in Gmail""" try: service = get_gmail_service() results = service.users().messages().list( userId='me', q=query, maxResults=max_results ).execute() messages = results.get('messages', []) return { 'success': True, 'messageIds': [msg['id'] for msg in messages], 'total': results.get('resultSizeEstimate', 0) } except Exception as error: return {'success': False, 'error': str(error)} def add_label(message_id, label_name): """Add a label to a message""" try: service = get_gmail_service() # First, get or create the label labels = service.users().labels().list(userId='me').execute() label_id = None for label in labels.get('labels', []): if label['name'].lower() == label_name.lower(): label_id = label['id'] break if not label_id: # Create the label new_label = service.users().labels().create( userId='me', body={'name': label_name} ).execute() label_id = new_label['id'] # Add label to message service.users().messages().modify( userId='me', id=message_id, body={'addLabelIds': [label_id]} ).execute() return { 'success': True, 'messageId': message_id, 'labelAdded': label_name } except Exception as error: return {'success': False, 'error': str(error)} def remove_label(message_id, label_name): """Remove a label from a message""" try: service = get_gmail_service() # Find the label ID labels = service.users().labels().list(userId='me').execute() label_id = None for label in labels.get('labels', []): if label['name'].lower() == label_name.lower(): label_id = label['id'] break if not label_id: return {'success': False, 'error': f'Label "{label_name}" not found'} # Remove label from message service.users().messages().modify( userId='me', id=message_id, body={'removeLabelIds': [label_id]} ).execute() return { 'success': True, 'messageId': message_id, 'labelRemoved': label_name } except Exception as error: return {'success': False, 'error': str(error)} def list_labels(): """List all labels in Gmail""" try: service = get_gmail_service() labels = service.users().labels().list(userId='me').execute() return { 'success': True, 'labels': [ { 'id': label['id'], 'name': label['name'], 'type': label.get('type', 'user') } for label in labels.get('labels', []) ] } except Exception as error: return {'success': False, 'error': str(error)} def mark_as_read(message_id): """Mark a message as read""" try: service = get_gmail_service() service.users().messages().modify( userId='me', id=message_id, body={'removeLabelIds': ['UNREAD']} ).execute() return { 'success': True, 'messageId': message_id, 'message': 'Marked as read' } except Exception as error: return {'success': False, 'error': str(error)} def mark_as_unread(message_id): """Mark a message as unread""" try: service = get_gmail_service() service.users().messages().modify( userId='me', id=message_id, body={'addLabelIds': ['UNREAD']} ).execute() return { 'success': True, 'messageId': message_id, 'message': 'Marked as unread' } except Exception as error: return {'success': False, 'error': str(error)} def trash_email(message_id): """Move an email to trash""" try: service = get_gmail_service() service.users().messages().trash( userId='me', id=message_id ).execute() return { 'success': True, 'messageId': message_id, 'message': 'Email moved to trash' } except Exception as error: return {'success': False, 'error': str(error)} def archive_email(message_id): """Archive an email (remove from inbox)""" try: service = get_gmail_service() service.users().messages().modify( userId='me', id=message_id, body={'removeLabelIds': ['INBOX']} ).execute() return { 'success': True, 'messageId': message_id, 'message': 'Email archived' } except Exception as error: return {'success': False, 'error': str(error)} # Helper function for job search follow-ups def create_followup_draft(company_name, contact_name, position, days_since_application=7): """Create a follow-up email draft for job applications""" try: subject = f"Following Up - {position} Application at {company_name}" body = f"""Dear {contact_name}, I hope this email finds you well. I wanted to follow up on my application for the {position} position at {company_name}, which I submitted {days_since_application} days ago. I remain very interested in this opportunity and would welcome the chance to discuss how my skills and experience align with your team's needs. Please let me know if you need any additional information from me. I look forward to hearing from you. Best regards""" # Create as draft so user can review before sending return create_draft( to='', # User will fill in subject=subject, body=body, is_html=False ) except Exception as error: return {'success': False, 'error': str(error)} def create_thank_you_draft(interviewer_name, company_name, position, interview_topics=None): """Create a thank you email draft after an interview""" try: subject = f"Thank You - {position} Interview at {company_name}" topics_text = "" if interview_topics: topics_text = f"\n\nI particularly enjoyed our discussion about {', '.join(interview_topics)}." body = f"""Dear {interviewer_name}, Thank you for taking the time to meet with me today to discuss the {position} position at {company_name}. I truly enjoyed learning more about the role and your team.{topics_text} Our conversation reinforced my enthusiasm for this opportunity, and I am confident that my skills and experience would allow me to make meaningful contributions to your team. Please don't hesitate to reach out if you need any additional information. I look forward to hearing from you about the next steps. Best regards""" return create_draft( to='', # User will fill in subject=subject, body=body, is_html=False ) except Exception as error: return {'success': False, 'error': str(error)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pbulbule13/google-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server