# type: ignore
# doing this to appease type checkers because the gmail client is dynamic and untyped
# we make this 'safe' by using pydantic validation on the public apis
import os
import base64
from email.mime.text import MIMEText
from typing import Any
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build, Resource
from models import (
GmailMessage,
GetThreadResponse,
Message,
)
SCOPES = [
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.compose",
]
class Gmail:
service: Resource
def __init__(self):
self.service = self._init_gmail_service()
def get_unread_messages(self, max_results: int = 10) -> list[Message]:
response = (
self.service.users()
.messages()
.list(userId="me", q="is:unread", maxResults=max_results)
.execute()
)
return [
Message.from_gmail_message(GmailMessage(**self._get_message(message["id"])))
for message in response.get("messages", [])
]
def _get_message(self, id: int) -> dict[str, Any]:
return (
self.service.users()
.messages()
.get(userId="me", id=id, format="full")
.execute()
)
def get_thread(self, thread_id: str, max_messages: int = 10) -> GetThreadResponse:
thread = (
self.service.users()
.threads()
.get(userId="me", id=thread_id, format="full")
.execute()
)
messages = thread.get("messages", [])
if len(messages) > max_messages:
selected_messages = [
GmailMessage(**message) for message in messages[-max_messages:]
]
else:
selected_messages = [GmailMessage(**message) for message in messages]
return GetThreadResponse(
messages=[
Message.from_gmail_message(message) for message in selected_messages
],
thread_length=len(messages),
most_recent_messages=len(selected_messages),
)
def create_draft(self, thread_id: str, draft_body: str) -> dict[str, Any]:
thread = (
self.service.users()
.threads()
.get(userId="me", id=thread_id, format="full")
.execute()
)
messages = thread.get("messages", [])
if not messages:
raise ValueError(f"Thread {thread_id} has no messages")
last_message = messages[-1]
headers = {h["name"]: h["value"] for h in last_message["payload"]["headers"]}
message = MIMEText(draft_body)
message["To"] = headers.get("From", "")
message["Subject"] = headers.get("Subject", "")
message["In-Reply-To"] = headers.get("Message-ID", "")
message["References"] = headers.get("References", headers.get("Message-ID", ""))
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
draft = {
"message": {
"raw": raw_message,
"threadId": thread_id,
}
}
result = (
self.service.users()
.drafts()
.create(userId="me", body=draft)
.execute()
)
return result
def _init_gmail_service(self) -> Resource:
creds = None
if os.path.exists("token.json"):
creds = Credentials.from_authorized_user_file("token.json", SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
"credentials.json", SCOPES
)
creds = flow.run_local_server(port=0)
with open("token.json", "w") as token:
token.write(creds.to_json())
return build("gmail", "v1", credentials=creds)