"""
Gmail Tools Module
Provides Gmail API integration for subscription tracking
"""
from googleapiclient.discovery import build
from base64 import urlsafe_b64decode, urlsafe_b64encode
from email.mime.text import MIMEText
from src.google_auth import get_credentials
def get_gmail_service():
"""Create and return Gmail API service"""
creds = get_credentials()
return build('gmail', 'v1', credentials=creds)
def search_subscription_emails(query='subscription OR renewal OR billing', max_results=50):
"""
Search Gmail for subscription-related emails
Args:
query: Gmail search query
max_results: Maximum number of emails to return
"""
try:
service = get_gmail_service()
results = service.users().messages().list(
userId='me',
q=query,
maxResults=max_results
).execute()
messages = results.get('messages', [])
return {
'success': True,
'message_ids': [msg['id'] for msg in messages],
'total': results.get('resultSizeEstimate', 0),
'count': len(messages)
}
except Exception as error:
return {
'success': False,
'error': str(error)
}
def get_email_detail(message_id):
"""
Get detailed email information
Args:
message_id: Gmail message ID
"""
try:
service = get_gmail_service()
message = service.users().messages().get(
userId='me',
id=message_id,
format='full'
).execute()
headers = message.get('payload', {}).get('headers', [])
def get_header(name):
for header in headers:
if header['name'].lower() == name.lower():
return header['value']
return ''
data = {
'id': message['id'],
'thread_id': message.get('threadId', ''),
'subject': get_header('subject'),
'from': get_header('from'),
'to': get_header('to'),
'date': get_header('date'),
'snippet': message.get('snippet', ''),
'body': extract_body(message.get('payload', {}))
}
return {
'success': True,
'data': data
}
except Exception as error:
return {
'success': False,
'error': str(error)
}
def extract_body(payload):
"""Extract email body from payload"""
if not payload:
return ''
body_data = payload.get('body', {}).get('data')
if body_data:
return urlsafe_b64decode(body_data).decode('utf-8', errors='ignore')
parts = payload.get('parts', [])
for part in parts:
if part.get('mimeType') == 'text/plain':
body_data = part.get('body', {}).get('data')
if body_data:
return urlsafe_b64decode(body_data).decode('utf-8', errors='ignore')
elif part.get('mimeType') == 'text/html':
body_data = part.get('body', {}).get('data')
if body_data:
return urlsafe_b64decode(body_data).decode('utf-8', errors='ignore')
return ''
def send_renewal_alert(to, service_name, amount, renewal_date):
"""
Send proactive renewal alert via Gmail
Args:
to: Recipient email address
service_name: Name of the subscription service
amount: Renewal amount
renewal_date: Renewal date
"""
try:
service = get_gmail_service()
subject = f"Renewal Alert: {service_name} renews in 3 days"
body = f"""
Hello,
This is a proactive reminder that your {service_name} subscription will renew in 3 days.
Renewal Details:
- Service: {service_name}
- Amount: ${amount}
- Renewal Date: {renewal_date}
This is an automated alert from your Subscription Tracker.
Best regards,
Subscription Tracker MCP
"""
message = MIMEText(body, 'plain')
message['to'] = to
message['subject'] = subject
raw_message = urlsafe_b64encode(message.as_bytes()).decode('utf-8')
send_message = service.users().messages().send(
userId='me',
body={'raw': raw_message}
).execute()
return {
'success': True,
'message_id': send_message.get('id', ''),
'thread_id': send_message.get('threadId', '')
}
except Exception as error:
return {
'success': False,
'error': str(error)
}