Skip to main content
Glama

stripe

Official
by stripe
emailer.py9.81 kB
# pyright: strict import imaplib import email import smtplib from email.mime.text import MIMEText from email.message import Message from email.mime.multipart import MIMEMultipart from email.utils import parseaddr from typing import List, Tuple, Callable, Union, Awaitable import asyncio import json import re from datetime import datetime from email.utils import parsedate_to_datetime class Email: def __init__( self, from_address: str, to_address: str, subject: str, body: str, id: str = "", date: datetime = datetime.now(), ): self.id = id self.to_address = to_address self.from_address = from_address self.subject = subject self.body = body self.date = date def to_message(self, reply_id: str, reply_to: str) -> MIMEMultipart: msg = MIMEMultipart() msg["From"] = self.from_address msg["To"] = self.to_address msg["Subject"] = self.subject msg["In-Reply-To"] = reply_id msg["References"] = reply_id msg["Reply-To"] = reply_to msg.attach(MIMEText(f"<html><body>{self.body}</body></html>", "html")) return msg def to_dict(self): return { "id": self.id, "to": self.to_address, "from": self.from_address, "subject": self.subject, "body": self.body, "date": self.date.strftime("%a, %d %b %Y %H:%M:%S %z"), } class Emailer: """ Emailer is an IMAP/SMTP client that can be used to fetch and respond to emails. It was mostly vibe-coded so please make improvements! TODO: add agent replies to the context """ def __init__( self, email_address: str, email_password: str, support_address: str = "", imap_server: str = "imap.gmail.com", imap_port: int = 993, smtp_server: str = "smtp.gmail.com", smtp_port: int = 587, ): # Email configuration self.email_address = email_address self.support_address = support_address if support_address else email_address self.email_password = email_password self.imap_server = imap_server self.imap_port = imap_port self.smtp_server = smtp_server self.smtp_port = smtp_port def _connect_to_email(self) -> Tuple[imaplib.IMAP4_SSL, smtplib.SMTP]: """Establish connections to email servers.""" # Connect to IMAP server imap_conn = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) imap_conn.login(self.email_address, self.email_password) # Connect to SMTP server smtp_conn = smtplib.SMTP(self.smtp_server, self.smtp_port) smtp_conn.starttls() smtp_conn.login(self.email_address, self.email_password) return imap_conn, smtp_conn def _get_body(self, email_message: Message) -> str: body: str = "" if email_message.is_multipart(): for part in email_message.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if isinstance(payload, bytes): body = payload.decode() break else: payload = email_message.get_payload(decode=True) if isinstance(payload, bytes): body = payload.decode() else: body = str(payload) return self._strip_replies(body) def _strip_replies(self, raw_body: str) -> str: lines = raw_body.split("\n") pruned: List[str] = [] for line in lines: # Stop if we see a typical reply indicator if line.strip().startswith("On ") and " wrote:" in line: break pruned.append(line) return "\n".join(pruned).strip() def _parse_email( self, imap_conn: imaplib.IMAP4_SSL, email_id: bytes ) -> Union[Email, None]: _, msg_data = imap_conn.fetch(email_id.decode(), "(BODY.PEEK[])") if not msg_data or not msg_data[0]: return None msg_resp = msg_data[0] if isinstance(msg_resp, tuple) and len(msg_resp) == 2: email_body = msg_resp[1] else: return None email_message = email.message_from_bytes(email_body) subject = email_message["subject"] or "" from_address = parseaddr(email_message.get("From", ""))[1] to_address = parseaddr(email_message.get("To", ""))[1] date_str = email_message.get("Date", "") date = datetime.now() if date_str: try: date = parsedate_to_datetime(date_str) except Exception: pass body = self._get_body(email_message) return Email( id=email_id.decode(), from_address=from_address, to_address=to_address, subject=subject, body=body, date=date, ) def _get_email_thread( self, imap_conn: imaplib.IMAP4_SSL, email_id_bytes: bytes ) -> List[Email]: email = self._parse_email(imap_conn, email_id_bytes) if not email: return [] thread = [email] # Try thread via X-GM-THRID (Gmail extension) _, thrid_data = imap_conn.fetch(email.id, "(X-GM-THRID)") match = None if thrid_data and thrid_data[0]: data = thrid_data[0] if isinstance(data, bytes): match = re.search(r"X-GM-THRID\s+(\d+)", data.decode()) else: match = re.search(r"X-GM-THRID\s+(\d+)", str(data)) if match: thread_id = match.group(1) _, thread_ids = imap_conn.search(None, f"X-GM-THRID {thread_id}") if thread_ids and thread_ids[0]: thread = [ self._parse_email(imap_conn, mid) for mid in thread_ids[0].split() ] thread = [e for e in thread if e] thread.sort(key=lambda e: e.date) return thread # Fallback: use REFERENCES header _, ref_data = imap_conn.fetch( email.id, "(BODY.PEEK[HEADER.FIELDS (REFERENCES)])" ) if ref_data and ref_data[0]: ref_line = ( ref_data[0][1].decode() if isinstance(ref_data[0][1], bytes) else "" ) refs = re.findall(r"<([^>]+)>", ref_line) for ref in refs: _, ref_ids = imap_conn.search(None, f'(HEADER Message-ID "<{ref}>")') if ref_ids and ref_ids[0]: for ref_id in ref_ids[0].split(): ref_email = self._parse_email(imap_conn, ref_id) if ref_email and ref_email.id not in [e.id for e in thread]: thread.append(ref_email) # Sort emails in the thread by date (ascending order) thread.sort(key=lambda e: e.date) return thread return thread def _get_unread_emails(self, imap_conn: imaplib.IMAP4_SSL) -> List[List[Email]]: imap_conn.select("INBOX") _, msg_nums = imap_conn.search(None, f'(UNSEEN TO "{self.support_address}")') emails: List[List[Email]] = [] for email_id in msg_nums[0].split(): thread = self._get_email_thread(imap_conn, email_id) emails.append(thread) return emails def mark_as_read(self, imap_conn: imaplib.IMAP4_SSL, message_id: str): imap_conn.store(message_id, "+FLAGS", "\\Seen") def get_email_thread(self, email_id: str) -> List[Email]: # Connect to email servers imap_conn, smtp_conn = self._connect_to_email() imap_conn.select("INBOX") # Get the thread thread = self._get_email_thread( imap_conn=imap_conn, email_id_bytes=email_id.encode() ) # Close connections imap_conn.logout() smtp_conn.quit() return thread async def process( self, respond: Callable[[List[Email]], Awaitable[Union[Email, None]]], mark_read: bool = True, ): # Connect to email servers imap_conn, smtp_conn = self._connect_to_email() # Get unread emails print("Fetching unread emails...") unread_emails = self._get_unread_emails(imap_conn) for email_thread in unread_emails: # Get the most recent email in the thread most_recent = email_thread[-1] # Generate the response response = await respond(email_thread) # If there is no response, skip this email and keep as unread # in the inbox if response is None: continue # Send the response # Get the most recent email in the thread to reply to print( f"Replying to '{response.to_address}' with:\n {json.dumps(response.body)}" ) smtp_conn.send_message( response.to_message(most_recent.id, self.support_address) ) # Mark the original email as read if mark_read: self.mark_as_read(imap_conn, most_recent.id) # Close connections imap_conn.logout() smtp_conn.quit() async def run( self, respond: Callable[[List[Email]], Awaitable[Union[Email, None]]], mark_read: bool = True, delay: int = 60, ): while True: # Process emails await self.process(respond, mark_read) # Wait before next check print(f"Sleeping for {delay}s...") await asyncio.sleep(delay)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stripe/agent-toolkit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server