Skip to main content
Glama

Gmail MCP Agent

lead_nurturer.py19.8 kB
#!/usr/bin/env python3 """ Advanced Lead Nurturing System for Gmail CSV Sender Handles automated follow-ups, response tracking, and lead scoring """ import csv import json import os import time import re from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass from email.message import EmailMessage import base64 from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.errors import HttpError from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from jinja2 import Template @dataclass class Lead: email: str first_name: str company: str status: str = "new" # new, contacted, responded, interested, not_interested, scheduled last_contact: Optional[datetime] = None response_count: int = 0 follow_up_count: int = 0 lead_score: int = 0 notes: str = "" class LeadNurturer: def __init__(self, credentials_path: str = "credentials.json", token_path: str = "token.json", service: Any = None): self.service = service or self._get_service(credentials_path, token_path) self.leads = self._load_leads() self.templates = self._load_templates() self.config = self._load_config() self.sync_state = self._load_sync_state() def _get_service(self, credentials_path: str, token_path: str): """Initialize Gmail service""" SCOPES = ["https://www.googleapis.com/auth/gmail.readonly", "https://www.googleapis.com/auth/gmail.send"] creds = None if os.path.exists(token_path): creds = Credentials.from_authorized_user_file(token_path, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file(credentials_path, SCOPES) creds = flow.run_local_server(port=0) with open(token_path, "w") as token: token.write(creds.to_json()) return build("gmail", "v1", credentials=creds) def _load_leads(self) -> Dict[str, Lead]: """Load leads from CSV and tracking file""" leads = {} # Load from contacts.csv try: with open('contacts.csv', 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: email = row['to'].strip().lower() leads[email] = Lead( email=email, first_name=row['first_name'], company=row['company'] ) except FileNotFoundError: print("contacts.csv not found") # Load tracking data try: with open('lead_tracking.json', 'r') as f: tracking_data = json.load(f) for email, data in tracking_data.items(): if email in leads: leads[email].status = data.get('status', 'new') leads[email].last_contact = datetime.fromisoformat(data['last_contact']) if data.get('last_contact') else None leads[email].response_count = data.get('response_count', 0) leads[email].follow_up_count = data.get('follow_up_count', 0) leads[email].lead_score = data.get('lead_score', 0) leads[email].notes = data.get('notes', '') except FileNotFoundError: print("No existing tracking data found") return leads def _load_config(self) -> Dict[str, Any]: try: with open('nurturing_config.json', 'r') as f: return json.load(f) except Exception: return {} def reload_config(self, new_config: Dict[str, Any]): self.config = new_config or {} def _load_sync_state(self) -> Dict[str, Any]: try: with open('gmail_sync_state.json', 'r') as f: return json.load(f) except Exception: return {"last_checked_iso": None, "processed_message_ids": []} def _save_sync_state(self): try: with open('gmail_sync_state.json', 'w') as f: json.dump(self.sync_state, f, indent=2) except Exception: pass def _save_leads(self): """Save lead tracking data""" tracking_data = {} for email, lead in self.leads.items(): tracking_data[email] = { 'status': lead.status, 'last_contact': lead.last_contact.isoformat() if lead.last_contact else None, 'response_count': lead.response_count, 'follow_up_count': lead.follow_up_count, 'lead_score': lead.lead_score, 'notes': lead.notes } with open('lead_tracking.json', 'w') as f: json.dump(tracking_data, f, indent=2) def _load_templates(self) -> Dict[str, Template]: """Load email templates for different scenarios""" templates = {} # Initial outreach template templates['initial'] = Template(""" Hi {{first_name}}, Did you know many dental practices lose 20–30% of new patient inquiries because follow-ups slip through the cracks? We've built an AI agent that automatically follows up with every lead via SMS/email and books them straight into your calendar. Clients typically see 5–9 extra appointments in the first 30 days. Have time for 10-min demo call this week? Thank you, Brandon Quantra Labs """.strip()) # Follow-up 1 (3 days later) templates['followup_1'] = Template(""" Hi {{first_name}}, Following up on my message about our AI lead follow-up system for dental practices. I know you're busy, but this could be a game-changer for {{company}}. Quick question: What's your biggest challenge with patient follow-ups right now? Best, Brandon Quantra Labs """.strip()) # Follow-up 2 (1 week later) templates['followup_2'] = Template(""" Hi {{first_name}}, I understand you might not be ready to discuss this right now. Just wanted to share that Dr. Sarah Johnson at Smile Care Clinic increased her new patient bookings by 40% in the first month using our system. If you're interested in a quick 5-minute demo, just reply with "demo" and I'll send you a calendar link. No pressure - I'll stop following up after this. Best, Brandon Quantra Labs """.strip()) # Response to interest templates['interested'] = Template(""" Hi {{first_name}}, Great to hear from you! I'd love to show you how our AI system works. Here's a quick calendar link to book a 10-minute demo: [Calendar Link] Looking forward to showing you how this can help {{company}} capture more patients. Best, Brandon Quantra Labs """.strip()) return templates def check_for_responses(self): """Check Gmail for responses to our outreach with pagination and idempotency""" try: # Build search query newer_than_hours = ( self.config.get('automation', {}).get('check_responses_interval_hours', 24) ) query_parts = ["in:inbox"] if newer_than_hours: # Gmail supports newer_than with d,m,y; for hours we fallback to after timestamp after_ts = None if self.sync_state.get('last_checked_iso'): try: dt = datetime.fromisoformat(self.sync_state['last_checked_iso']) after_ts = int(dt.timestamp()) except Exception: after_ts = None if after_ts: query_parts.append(f"after:{after_ts}") else: # default to 1d window if no state query_parts.append("newer_than:1d") query = " ".join(query_parts) processed_ids = set(self.sync_state.get('processed_message_ids', [])) page_token = None while True: results = self.service.users().messages().list( userId='me', q=query, pageToken=page_token, maxResults=100 ).execute() messages = results.get('messages', []) for message in messages: msg_id = message.get('id') if not msg_id or msg_id in processed_ids: continue msg = self.service.users().messages().get(userId='me', id=msg_id, format='full').execute() headers = msg.get('payload', {}).get('headers', []) sender = None subject = None for header in headers: name = header.get('name') if name == 'From': sender = header.get('value') elif name == 'Subject': subject = header.get('value') if sender: sender_email_match = re.search(r'<(.+?)>', sender) if sender_email_match: sender_email = sender_email_match.group(1).lower() else: sender_email = sender.lower() if sender_email in self.leads: self._process_response(sender_email, subject or "", msg) processed_ids.add(msg_id) page_token = results.get('nextPageToken') if not page_token: break # Update last checked time and persist processed ids (bounded) self.sync_state['last_checked_iso'] = datetime.utcnow().isoformat() # Keep only last 500 ids to bound file size latest_ids = list(processed_ids) if len(latest_ids) > 500: latest_ids = latest_ids[-500:] self.sync_state['processed_message_ids'] = latest_ids self._save_sync_state() except HttpError as he: print(f"Gmail API error checking responses: {he}") except Exception as e: print(f"Error checking responses: {e}") def _process_response(self, email: str, subject: str, message: dict): """Process a response from a lead""" lead = self.leads[email] lead.response_count += 1 lead.last_contact = datetime.now() # Analyze response sentiment and keywords body = self._get_message_body(message) response_lower = body.lower() # Update lead score based on response interested_words = self.config.get('response_keywords', {}).get('interested', ['interested','yes','demo','call','meeting','schedule','book']) not_interested_words = self.config.get('response_keywords', {}).get('not_interested', ['not interested','no thanks','stop','unsubscribe','remove']) if any(word in response_lower for word in interested_words): lead.status = 'interested' lead.lead_score += int(self.config.get('lead_scoring', {}).get('response_bonus', 10)) lead.lead_score += int(self.config.get('lead_scoring', {}).get('interest_bonus', 5)) if self.config.get('automation', {}).get('auto_respond_to_interest', True): self._send_automated_response(email, 'interested') elif any(word in response_lower for word in not_interested_words): lead.status = 'not_interested' lead.lead_score -= 5 else: lead.lead_score += 2 # Add notes lead.notes += f"\n{datetime.now().strftime('%Y-%m-%d')}: Response received - {subject}" print(f"Processed response from {lead.first_name} at {lead.company}") def _get_message_body(self, message: dict) -> str: """Extract message body from Gmail message""" try: payload = message.get('payload', {}) # Prefer text/plain; fallback to text/html stripped def decode_part(part): data = part.get('body', {}).get('data') if not data: return "" try: return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore') except Exception: return "" if 'parts' in payload: # Walk parts recursively stack = list(payload.get('parts', [])) html_candidate = "" while stack: part = stack.pop() mime = part.get('mimeType', '') if 'parts' in part: stack.extend(part.get('parts', [])) if mime == 'text/plain': text = decode_part(part) if text: return text elif mime == 'text/html' and not html_candidate: html_candidate = decode_part(part) if html_candidate: # Strip HTML tags rudimentarily return re.sub('<[^<]+?>', '', html_candidate) else: mime = payload.get('mimeType') if mime == 'text/plain': return decode_part(payload) elif mime == 'text/html': html = decode_part(payload) return re.sub('<[^<]+?>', '', html) except Exception as e: print(f"Error extracting message body: {e}") return "" def _send_automated_response(self, email: str, template_type: str): """Send automated response based on template type""" lead = self.leads[email] template = self.templates[template_type] subject = "Re: AI Lead Follow-up System for Dental Practices" body = template.render( first_name=lead.first_name, company=lead.company ) # Send email (using existing send_message function) try: msg = EmailMessage() msg["To"] = email sender_email = self.config.get('sender_email') or self.service.users().getProfile(userId='me').execute().get('emailAddress') sender_name = self.config.get('sender_name') or '' msg["From"] = f"{sender_name} <{sender_email}>" if sender_name else sender_email msg["Subject"] = subject msg.set_content(body) encoded = base64.urlsafe_b64encode(msg.as_bytes()).decode() self.service.users().messages().send( userId="me", body={"raw": encoded} ).execute() print(f"Sent {template_type} response to {lead.first_name}") except Exception as e: print(f"Error sending response: {e}") def run_follow_up_sequence(self): """Run follow-up sequence for leads that need it""" now = datetime.now() for email, lead in self.leads.items(): if not self.config.get('automation', {}).get('auto_send_follow_ups', True): continue if lead.status in ['new', 'contacted'] and lead.last_contact: days_since_contact = (now - lead.last_contact).days cfg_schedule = self.config.get('follow_up_schedule', {}) fu1_days = int(cfg_schedule.get('followup_1_days', 3)) fu2_days = int(cfg_schedule.get('followup_2_days', 7)) max_follow = int(cfg_schedule.get('max_follow_ups', 2)) # Follow-up 1 if days_since_contact >= fu1_days and lead.follow_up_count == 0: self._send_follow_up(email, 'followup_1') lead.follow_up_count = 1 lead.last_contact = now # Follow-up 2 elif days_since_contact >= fu2_days and lead.follow_up_count == 1: self._send_follow_up(email, 'followup_2') lead.follow_up_count = 2 lead.last_contact = now if max_follow <= 2: lead.status = 'not_interested' def _send_follow_up(self, email: str, template_type: str): """Send follow-up email""" lead = self.leads[email] template = self.templates[template_type] subject = "Following up - AI Lead Follow-up System" body = template.render( first_name=lead.first_name, company=lead.company ) try: msg = EmailMessage() msg["To"] = email sender_email = self.config.get('sender_email') or self.service.users().getProfile(userId='me').execute().get('emailAddress') sender_name = self.config.get('sender_name') or '' msg["From"] = f"{sender_name} <{sender_email}>" if sender_name else sender_email msg["Subject"] = subject msg.set_content(body) encoded = base64.urlsafe_b64encode(msg.as_bytes()).decode() self.service.users().messages().send( userId="me", body={"raw": encoded} ).execute() print(f"Sent {template_type} to {lead.first_name} at {lead.company}") except Exception as e: print(f"Error sending follow-up: {e}") def generate_lead_report(self): """Generate a lead nurturing report""" total_leads = len(self.leads) contacted = sum(1 for lead in self.leads.values() if lead.status != 'new') responded = sum(1 for lead in self.leads.values() if lead.response_count > 0) interested = sum(1 for lead in self.leads.values() if lead.status == 'interested') print(f"\n📊 LEAD NURTURING REPORT") print(f"Total Leads: {total_leads}") print(f"Contacted: {contacted}") print(f"Responded: {responded}") print(f"Interested: {interested}") print(f"Response Rate: {(responded/contacted*100):.1f}%" if contacted > 0 else "Response Rate: N/A") print(f"Interest Rate: {(interested/responded*100):.1f}%" if responded > 0 else "Interest Rate: 0%") # Top leads by score top_leads = sorted(self.leads.values(), key=lambda x: x.lead_score, reverse=True)[:5] print(f"\n🏆 TOP LEADS BY SCORE:") for lead in top_leads: print(f" {lead.first_name} at {lead.company} - Score: {lead.lead_score}") def run_nurturing_cycle(self): """Run the complete nurturing cycle""" print("🔄 Starting lead nurturing cycle...") # Check for responses print("📧 Checking for responses...") self.check_for_responses() # Run follow-up sequence print("📤 Running follow-up sequence...") self.run_follow_up_sequence() # Save updated data self._save_leads() # Persist sync state already handled in check_for_responses # Generate report self.generate_lead_report() print("✅ Nurturing cycle complete!") if __name__ == "__main__": import os nurturer = LeadNurturer() nurturer.run_nurturing_cycle()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brandononchain/GMAIL-MCP-Agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server