Skip to main content
Glama
leeguooooo
by leeguooooo
inbox_organizer.py18.9 kB
#!/usr/bin/env python3 """ Inbox organization assistant. 分析最近邮件,结合 AI 过滤结果,给出垃圾邮件/营销邮件/低优先级通知的整理建议, 并对需要关注的邮件生成中文摘要。 """ import argparse import json import logging import sys from collections import Counter from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # Ensure repository root on sys.path repo_root = Path(__file__).resolve().parents[1] if str(repo_root) not in sys.path: sys.path.insert(0, str(repo_root)) from src.account_manager import AccountManager # noqa: E402 from src.services.email_service import EmailService # noqa: E402 from scripts.ai_email_filter import AIEmailFilter, FilterResult # noqa: E402 from scripts.email_translator import EmailTranslator # noqa: E402 logger = logging.getLogger(__name__) class InboxOrganizer: """High level helper that classifies and summarizes recent emails.""" ACTION_DELETE_SPAM = "delete_spam" ACTION_DELETE_MARKETING = "delete_marketing" ACTION_MARK_AS_READ = "mark_as_read" ACTION_ATTENTION = "needs_attention" ACTION_TO_SUGGESTED = { ACTION_DELETE_SPAM: "delete", ACTION_DELETE_MARKETING: "delete", ACTION_MARK_AS_READ: "mark_read", ACTION_ATTENTION: "review", } SPAM_CATEGORIES = {"spam", "junk"} MARKETING_CATEGORIES = {"marketing", "promotion", "ads", "advertisement"} NEWSLETTER_CATEGORIES = {"newsletter", "newsletters", "digest"} SYSTEM_CATEGORIES = {"system", "notification", "alert"} def __init__( self, limit: int = 15, folder: str = "INBOX", unread_only: bool = False, account_id: Optional[str] = None, ): self.limit = limit self.folder = folder self.unread_only = unread_only self.account_id = account_id self.account_manager = AccountManager() self.email_service = EmailService(self.account_manager) self.filter_service = AIEmailFilter() self.priority_threshold = float( self.filter_service.config.get("priority_threshold", 0.7) ) # Public API --------------------------------------------------------- def organize(self) -> Dict[str, Any]: """Fetch, classify, and summarize recent emails.""" fetch_result = self._fetch_recent_emails() if not fetch_result.get("success"): return fetch_result enriched_emails = fetch_result["emails"] if not enriched_emails: return { "success": True, "processed": 0, "actions": self._empty_actions(), "important_summaries": [], "summary_text": "📭 暂无邮件需要处理", "stats": { "total_emails": 0, "delete_spam": 0, "delete_marketing": 0, "mark_as_read": 0, "needs_attention": 0, }, "generated_at": datetime.utcnow().isoformat() + "Z", } filter_results = self._classify_emails(enriched_emails) actions, important_details = self._categorize_actions( enriched_emails, filter_results ) important_summaries, translator_note = self._summarize_important( important_details ) summary_text = self._compose_summary( len(enriched_emails), actions, important_summaries, translator_note ) stats = { "total_emails": len(enriched_emails), "delete_spam": len(actions[self.ACTION_DELETE_SPAM]), "delete_marketing": len(actions[self.ACTION_DELETE_MARKETING]), "mark_as_read": len(actions[self.ACTION_MARK_AS_READ]), "needs_attention": len(actions[self.ACTION_ATTENTION]), } # Remove internal-only fields before returning for bucket in important_details: bucket["email"].pop("full_body", None) for email in enriched_emails: email.pop("full_body", None) return { "success": True, "processed": len(enriched_emails), "actions": actions, "important_summaries": important_summaries, "summary_text": summary_text, "stats": stats, "duplicates": self._find_duplicates(enriched_emails), "generated_at": datetime.utcnow().isoformat() + "Z", } # Internal helpers --------------------------------------------------- def _fetch_recent_emails(self) -> Dict[str, Any]: """Fetch latest emails and enrich with body preview.""" try: result = self.email_service.list_emails( limit=self.limit, unread_only=self.unread_only, folder=self.folder, account_id=self.account_id, ) except Exception as exc: logger.error("Failed to list emails: %s", exc, exc_info=True) return {"success": False, "error": str(exc)} if result.get("error"): return {"success": False, "error": result["error"]} emails = result.get("emails", []) enriched: List[Dict[str, Any]] = [] for email in emails: detail = self._safe_get_detail(email) body_text = "" if detail and "error" not in detail: body_text = detail.get("body") or detail.get("html_body") or "" preview = self._sanitize_preview(body_text) enriched_email = { **email, "folder": email.get("folder", detail.get("folder", self.folder) if detail else self.folder), "account_id": email.get( "account_id", detail.get("account_id") if detail else email.get("account"), ), "has_attachments": bool( detail and detail.get("attachments") ), "body_preview": preview, "full_body": body_text, } enriched.append(enriched_email) return {"success": True, "emails": enriched} def _safe_get_detail(self, email: Dict[str, Any]) -> Dict[str, Any]: """Retrieve email detail with graceful error handling.""" account_ref = email.get("account_id") or email.get("account") try: return self.email_service.get_email_detail( email_id=email["id"], folder=email.get("folder", self.folder), account_id=account_ref, ) except Exception as exc: logger.warning( "Failed to fetch detail for email %s: %s", email.get("id"), exc, ) return {"error": str(exc)} def _classify_emails( self, emails: List[Dict[str, Any]] ) -> Dict[str, FilterResult]: """Run AI filter and return mapping of email_id -> FilterResult.""" filter_results = self.filter_service.filter_emails(emails) result_map: Dict[str, FilterResult] = {} for item in filter_results: if isinstance(item, FilterResult): result_map[item.email_id] = item elif isinstance(item, dict): # Fallback when filter_emails returns dict (e.g. errors) result_map[item.get("email_id", "")] = FilterResult( email_id=item.get("email_id", ""), is_important=item.get("is_important", False), priority_score=float(item.get("priority_score", 0.5)), reason=item.get("reason", "AI 过滤结果"), category=item.get("category", "general"), suggested_action=item.get("suggested_action", "none"), ) return result_map def _categorize_actions( self, emails: List[Dict[str, Any]], filter_results: Dict[str, FilterResult], ) -> Tuple[Dict[str, List[Dict[str, Any]]], List[Dict[str, Any]]]: """Determine suggested action for each email.""" actions = self._empty_actions() important_details: List[Dict[str, Any]] = [] for email in emails: filter_result = filter_results.get(email["id"]) if not filter_result: filter_result = FilterResult( email_id=email["id"], is_important=False, priority_score=0.5, reason="无过滤结果,视为一般邮件", category="general", suggested_action="mark_read", ) action_key, custom_reason = self._determine_action( email, filter_result ) reason = filter_result.reason or custom_reason if custom_reason and custom_reason not in reason: reason = f"{filter_result.reason};{custom_reason}".strip(";") action_item = { "id": email.get("id"), "subject": email.get("subject", ""), "from": email.get("from", ""), "date": email.get("date", ""), "folder": email.get("folder", self.folder), "priority_score": round( float(filter_result.priority_score or 0.0), 3 ), "category": (filter_result.category or "general").lower(), "reason": reason, "suggested_action": filter_result.suggested_action or self.ACTION_TO_SUGGESTED.get(action_key, "none"), "account_id": email.get("account_id"), } actions[action_key].append(action_item) if action_key == self.ACTION_ATTENTION: important_details.append( {"email": email.copy(), "classification": filter_result} ) # Sort buckets for readability actions[self.ACTION_DELETE_SPAM].sort( key=lambda item: item["priority_score"] ) actions[self.ACTION_DELETE_MARKETING].sort( key=lambda item: item["priority_score"] ) actions[self.ACTION_MARK_AS_READ].sort( key=lambda item: item["priority_score"] ) actions[self.ACTION_ATTENTION].sort( key=lambda item: item["priority_score"], reverse=True ) return actions, important_details def _determine_action( self, email: Dict[str, Any], filter_result: FilterResult ) -> Tuple[str, Optional[str]]: """Heuristic mapping from classification to suggested action.""" category = (filter_result.category or "general").lower() score = float(filter_result.priority_score or 0.0) subject_lower = (email.get("subject") or "").lower() if category in self.SPAM_CATEGORIES or score <= 0.2: return self.ACTION_DELETE_SPAM, "识别为垃圾邮件" if category in self.MARKETING_CATEGORIES or "unsubscribe" in subject_lower: return self.ACTION_DELETE_MARKETING, "识别为营销邮件" if category in self.NEWSLETTER_CATEGORIES: return self.ACTION_MARK_AS_READ, "例行订阅更新" if category in self.SYSTEM_CATEGORIES and score < 0.6: return self.ACTION_MARK_AS_READ, "系统/安全通知" if not filter_result.is_important and score < self.priority_threshold: return self.ACTION_MARK_AS_READ, "AI 判断重要性较低" return self.ACTION_ATTENTION, None def _summarize_important( self, important_details: List[Dict[str, Any]] ) -> Tuple[List[Dict[str, Any]], Optional[str]]: """Generate Chinese summaries for important emails.""" if not important_details: return [], None translator = EmailTranslator() payload = [] for item in important_details: email = item["email"] payload.append( { "id": email.get("id"), "from": email.get("from", ""), "subject": email.get("subject", ""), "body": email.get("full_body") or email.get("body_preview", ""), } ) translation_note = None summary_map: Dict[str, Dict[str, Any]] = {} try: translation_result = translator.translate_and_summarize(payload) if translation_result.get("success"): summary_map = { entry.get("id"): entry for entry in translation_result.get("emails", []) } else: translation_note = translation_result.get( "summary", translation_result.get("error") ) except Exception as exc: # pragma: no cover - defensive logger.warning("Summaries failed: %s", exc, exc_info=True) translation_note = str(exc) summaries: List[Dict[str, Any]] = [] for item in important_details: email = item["email"] classification = item["classification"] translated = summary_map.get(email.get("id"), {}) subject_display = translated.get("subject_zh") or email.get("subject", "") summary_text = translated.get("summary_zh") or email.get( "body_preview", "" ) summaries.append( { "id": email.get("id"), "subject": subject_display.strip(), "from": email.get("from", ""), "summary": summary_text.strip(), "priority_score": round( float(classification.priority_score or 0.0), 3 ), "category": (classification.category or "general").lower(), "reason": classification.reason, "folder": email.get("folder", self.folder), "account_id": email.get("account_id"), } ) summaries.sort(key=lambda item: item["priority_score"], reverse=True) return summaries, translation_note def _compose_summary( self, total_emails: int, actions: Dict[str, List[Dict[str, Any]]], important_summaries: List[Dict[str, Any]], translation_note: Optional[str], ) -> str: """Build human-readable Chinese summary.""" def join_subjects(items: List[Dict[str, Any]], limit: int = 3) -> str: subjects = [itm["subject"] or "(无主题)" for itm in items[:limit]] return ",".join(subjects) lines: List[str] = [] lines.append( f"已分析「{self.folder}」中最近 {total_emails} 封邮件,整理建议如下:" ) spam_items = actions[self.ACTION_DELETE_SPAM] if spam_items: lines.append( f"1、删除垃圾邮件 {len(spam_items)} 封,例如:{join_subjects(spam_items)}" ) else: lines.append("1、未发现明显垃圾邮件。") marketing_items = actions[self.ACTION_DELETE_MARKETING] if marketing_items: lines.append( f"2、删除营销/过期推广邮件 {len(marketing_items)} 封,例如:{join_subjects(marketing_items)}" ) else: lines.append("2、暂无需要清理的营销邮件。") mark_read_items = actions[self.ACTION_MARK_AS_READ] if mark_read_items: lines.append( f"3、建议直接标记为已读的日常通知 {len(mark_read_items)} 封,例如:{join_subjects(mark_read_items)}" ) else: lines.append("3、未发现需要直接标为已读的通知邮件。") if important_summaries: lines.append("4、以下邮件需要优先关注:") for idx, summary in enumerate(important_summaries, 1): lines.append( f" {idx}. {summary['subject']}({summary['from']})" ) if summary["summary"]: lines.append(f" 摘要:{summary['summary']}") else: lines.append("4、暂无需要特别关注的邮件。") if translation_note: lines.append(f"⚠️ 摘要生成提示:{translation_note}") return "\n".join(lines) def _find_duplicates(self, emails: List[Dict[str, Any]]) -> Dict[str, int]: """Identify duplicate subjects for context.""" subjects = [email.get("subject", "").strip() for email in emails if email.get("subject")] counts = Counter(subjects) duplicates = {subject: count for subject, count in counts.items() if count > 1} return duplicates def _sanitize_preview(self, text: str) -> str: """Condense whitespace and limit preview length.""" if not text: return "" cleaned = " ".join(text.strip().split()) return cleaned[:400] def _empty_actions(self) -> Dict[str, List[Dict[str, Any]]]: """Utility to create empty action buckets.""" return { self.ACTION_DELETE_SPAM: [], self.ACTION_DELETE_MARKETING: [], self.ACTION_MARK_AS_READ: [], self.ACTION_ATTENTION: [], } def main(): parser = argparse.ArgumentParser(description="整理最近邮件并给出操作建议") parser.add_argument("--limit", type=int, default=15, help="分析的邮件数量") parser.add_argument("--folder", default="INBOX", help="要分析的邮箱文件夹") parser.add_argument( "--unread-only", action="store_true", help="仅分析未读邮件" ) parser.add_argument("--account-id", help="指定账号 ID 或邮箱地址") parser.add_argument( "--text", action="store_true", help="以纯文本形式输出摘要" ) parser.add_argument( "--verbose", action="store_true", help="输出调试日志" ) args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) organizer = InboxOrganizer( limit=args.limit, folder=args.folder, unread_only=args.unread_only, account_id=args.account_id, ) result = organizer.organize() if args.text: print(result.get("summary_text", "")) else: print(json.dumps(result, indent=2, ensure_ascii=False)) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server