from pydantic import BaseModel
from typing import Optional, Any
import base64
class GmailMessageHeader(BaseModel):
name: str
value: str
class GmailMessagePartBody(BaseModel):
attachmentId: Optional[str] = None
size: int
data: Optional[str] = None
class GmailMessagePart(BaseModel):
partId: str
mimeType: str
filename: str
headers: list[GmailMessageHeader]
body: GmailMessagePartBody
parts: Optional[list["GmailMessagePart"]] = None
class GmailMessage(BaseModel):
id: str
threadId: str
labelIds: list[str]
snippet: str
historyId: str
internalDate: str
payload: GmailMessagePart
sizeEstimate: int
raw: Optional[str] = None
classificationLabelValues: Optional[list[Any]] = None
class Message(BaseModel):
sender: str
subject: str
snippet: str
thread_id: str
date: str
body: str
@staticmethod
def from_gmail_message(gmail_message: GmailMessage) -> "Message":
sender = get_header(gmail_message.payload.headers, "from")
subject = get_header(gmail_message.payload.headers, "subject")
snippet = gmail_message.snippet
thread_id = gmail_message.threadId
date = get_header(gmail_message.payload.headers, "date")
body = get_message_body(gmail_message)
if sender is None or subject is None or date is None:
raise Exception(
f"sender, subject or date cannot be empty. input message: {gmail_message}"
)
return Message(
sender=sender,
subject=subject,
snippet=snippet,
thread_id=thread_id,
date=date,
body=body,
)
class GetThreadResponse(BaseModel):
messages: list[Message]
thread_length: int
most_recent_messages: int
def get_header(headers: list[GmailMessageHeader], key: str) -> str | None:
for header in headers:
if header.name.lower() == key.lower():
return header.value
def get_message_body(message: GmailMessage) -> str:
def get_body_from_parts(parts: list[GmailMessagePart]) -> str | None:
for part in parts:
if part.body.data:
return part.body.data
if part.parts:
result = get_body_from_parts(part.parts)
if result:
return result
return None
payload = message.payload
if payload.body.data:
return base64.urlsafe_b64decode(payload.body.data).decode("utf-8")
if payload.parts:
encoded_body = get_body_from_parts(payload.parts)
if encoded_body:
return base64.urlsafe_b64decode(encoded_body).decode("utf-8")
return ""