Skip to main content
Glama
leeguooooo
by leeguooooo
email_monitor.py18.6 kB
#!/usr/bin/env python3 """ 邮件监控主控制脚本 - 整合邮件获取、AI过滤、通知发送 这是一个完整的流程脚本,可以被 n8n 或其他自动化工具调用 """ import json import sys import os import logging from typing import Dict, Any, List, Optional from pathlib import Path import subprocess import tempfile # 添加项目根目录到 Python 路径 repo_root = Path(__file__).resolve().parents[1] sys.path.insert(0, str(repo_root)) # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class EmailMonitor: """邮件监控器 - 整合所有组件""" def __init__(self, config_path: Optional[str] = None): """ 初始化邮件监控器 Args: config_path: 配置文件路径 """ self.config_path = config_path or "email_monitor_config.json" self.config = self._load_config() self.scripts_dir = Path(__file__).parent def _load_config(self) -> Dict[str, Any]: """加载配置文件""" default_config = { "email": { "fetch_limit": 20, "unread_only": True, "account_id": None, "folder": "INBOX" }, "ai_filter": { "enabled": True, "config_path": "ai_filter_config.json", "priority_threshold": 0.7 }, "notification": { "enabled": True, "config_path": "notification_config.json", "webhook_names": None # None 表示发送到所有启用的 webhook }, "deduplication": { "enabled": True, "method": "content_hash", # content_hash, email_id, subject_sender "window_hours": 24 }, "logging": { "level": "INFO", "file": "email_monitor.log" } } if os.path.exists(self.config_path): try: with open(self.config_path, 'r', encoding='utf-8') as f: user_config = json.load(f) self._deep_merge(default_config, user_config) except Exception as e: logger.warning(f"Failed to load config from {self.config_path}: {e}") else: # 创建默认配置文件 try: with open(self.config_path, 'w', encoding='utf-8') as f: json.dump(default_config, f, indent=2, ensure_ascii=False) logger.info(f"Created default config at {self.config_path}") except Exception as e: logger.warning(f"Failed to create config file: {e}") return default_config def _deep_merge(self, base: Dict, update: Dict) -> None: """深度合并字典""" for key, value in update.items(): if key in base and isinstance(base[key], dict) and isinstance(value, dict): self._deep_merge(base[key], value) else: base[key] = value def _run_script(self, script_name: str, args: List[str]) -> Dict[str, Any]: """运行脚本并返回结果""" script_path = self.scripts_dir / script_name if not script_path.exists(): raise FileNotFoundError(f"Script not found: {script_path}") try: # 构建命令 cmd = [sys.executable, str(script_path)] + args # 运行脚本 result = subprocess.run( cmd, capture_output=True, text=True, timeout=300, # 5分钟超时 cwd=repo_root ) if result.returncode != 0: logger.error(f"Script {script_name} failed with code {result.returncode}") logger.error(f"stderr: {result.stderr}") return { "success": False, "error": f"Script failed: {result.stderr}", "returncode": result.returncode } # 解析输出 try: return json.loads(result.stdout) except json.JSONDecodeError: logger.warning(f"Script {script_name} output is not valid JSON: {result.stdout}") return { "success": True, "output": result.stdout, "raw": True } except subprocess.TimeoutExpired: logger.error(f"Script {script_name} timed out") return {"success": False, "error": "Script timed out"} except Exception as e: logger.error(f"Failed to run script {script_name}: {e}") return {"success": False, "error": str(e)} def fetch_emails(self) -> Dict[str, Any]: """获取邮件列表""" email_config = self.config.get("email", {}) # 构建参数 args = { "limit": email_config.get("fetch_limit", 20), "unread_only": email_config.get("unread_only", True), "folder": email_config.get("folder", "INBOX") } if email_config.get("account_id"): args["account_id"] = email_config["account_id"] # 调用邮件获取脚本 tool_name = "list_unread_emails" if args["unread_only"] else "list_emails" result = self._run_script("call_email_tool.py", [tool_name, json.dumps(args)]) if not result.get("success", False): logger.error(f"Failed to fetch emails: {result.get('error')}") return result emails = result.get("emails", []) logger.info(f"Fetched {len(emails)} emails") return { "success": True, "emails": emails, "count": len(emails) } def filter_emails_with_ai(self, emails: List[Dict[str, Any]]) -> Dict[str, Any]: """使用 AI 过滤邮件""" if not self.config.get("ai_filter", {}).get("enabled", True): logger.info("AI filtering is disabled") return { "success": True, "results": [ { "email_id": email.get("id", ""), "is_important": True, # 不过滤时认为都重要 "priority_score": 0.8, "reason": "AI过滤已禁用", "category": "general", "suggested_action": "none" } for email in emails ] } if not emails: return {"success": True, "results": []} # 准备 AI 过滤参数 ai_config = self.config.get("ai_filter", {}) args = [json.dumps(emails)] if ai_config.get("config_path"): args.append(ai_config["config_path"]) # 调用 AI 过滤脚本 result = self._run_script("ai_email_filter.py", args) if not result.get("success", False): logger.error(f"AI filtering failed: {result.get('error')}") # 回退到简单过滤 return self._simple_filter_fallback(emails) filter_results = result.get("results", []) important_count = sum(1 for r in filter_results if r.get("is_important", False)) logger.info(f"AI filtered {len(emails)} emails, {important_count} marked as important") return result def _simple_filter_fallback(self, emails: List[Dict[str, Any]]) -> Dict[str, Any]: """AI 过滤失败时的简单回退方案""" logger.info("Using simple keyword-based filtering as fallback") important_keywords = ["urgent", "important", "meeting", "deadline", "紧急", "重要", "会议", "截止"] results = [] for email in emails: text_to_check = f"{email.get('subject', '')} {email.get('body_preview', '')}".lower() # 简单的关键词匹配 importance_score = sum(1 for keyword in important_keywords if keyword in text_to_check) priority_score = min(0.9, max(0.3, 0.5 + importance_score * 0.1)) is_important = priority_score >= 0.7 results.append({ "email_id": email.get("id", ""), "is_important": is_important, "priority_score": priority_score, "reason": "简单关键词分析(AI过滤失败回退)", "category": "general", "suggested_action": "none" }) important_count = sum(1 for r in results if r["is_important"]) return { "success": True, "total_emails": len(emails), "important_emails": important_count, "results": results } def send_notifications(self, emails: List[Dict[str, Any]], filter_results: List[Dict[str, Any]]) -> Dict[str, Any]: """发送通知""" if not self.config.get("notification", {}).get("enabled", True): logger.info("Notifications are disabled") return {"success": True, "message": "Notifications disabled"} # 筛选出重要邮件 important_emails = [] email_dict = {email.get("id", ""): email for email in emails} for filter_result in filter_results: if filter_result.get("is_important", False): email_id = filter_result["email_id"] email = email_dict.get(email_id) if email: # 合并邮件数据和过滤结果 notification_data = { "email_id": email_id, "subject": email.get("subject", ""), "sender": email.get("from", ""), "date": email.get("date", ""), "priority_score": filter_result.get("priority_score", 0.8), "reason": filter_result.get("reason", ""), "category": filter_result.get("category", "general"), "account_id": email.get("account_id"), "body_preview": email.get("body_preview", email.get("body", ""))[:200], "suggested_action": filter_result.get("suggested_action", "none") } important_emails.append(notification_data) if not important_emails: logger.info("No important emails to notify") return {"success": True, "message": "No important emails"} # 准备通知参数 notification_config = self.config.get("notification", {}) args = [json.dumps(important_emails)] webhook_names = notification_config.get("webhook_names") if webhook_names: args.append(",".join(webhook_names)) # 发送批量通知 result = self._run_script("notification_service.py", ["batch"] + args) if result.get("success", False): logger.info(f"Sent notifications for {len(important_emails)} important emails") else: logger.error(f"Failed to send notifications: {result.get('error')}") return result def run_monitoring_cycle(self) -> Dict[str, Any]: """运行一次完整的监控周期""" logger.info("Starting email monitoring cycle") try: # 1. 获取邮件 fetch_result = self.fetch_emails() if not fetch_result.get("success", False): return { "success": False, "error": "Failed to fetch emails", "details": fetch_result } emails = fetch_result.get("emails", []) if not emails: logger.info("No emails to process") return { "success": True, "message": "No emails to process", "stats": { "fetched_emails": 0, "important_emails": 0, "notifications_sent": 0 } } # 2. AI 过滤 filter_result = self.filter_emails_with_ai(emails) if not filter_result.get("success", False): return { "success": False, "error": "Failed to filter emails", "details": filter_result } filter_results = filter_result.get("results", []) important_count = sum(1 for r in filter_results if r.get("is_important", False)) # 3. 发送通知 notification_result = self.send_notifications(emails, filter_results) # 4. 统计结果 stats = { "fetched_emails": len(emails), "important_emails": important_count, "notifications_sent": len([r for r in filter_results if r.get("is_important", False)]), "filter_success": filter_result.get("success", False), "notification_success": notification_result.get("success", False) } logger.info(f"Monitoring cycle completed: {stats}") return { "success": True, "message": "Monitoring cycle completed successfully", "stats": stats, "details": { "fetch_result": fetch_result, "filter_result": filter_result, "notification_result": notification_result } } except Exception as e: logger.error(f"Monitoring cycle failed: {e}", exc_info=True) return { "success": False, "error": str(e) } def get_status(self) -> Dict[str, Any]: """获取监控状态""" try: # 检查各个组件的状态 status = { "config_loaded": bool(self.config), "scripts_available": {}, "last_run": None, "configuration": { "email_config": self.config.get("email", {}), "ai_filter_enabled": self.config.get("ai_filter", {}).get("enabled", False), "notification_enabled": self.config.get("notification", {}).get("enabled", False) } } # 检查脚本文件 required_scripts = [ "call_email_tool.py", "ai_email_filter.py", "notification_service.py" ] for script in required_scripts: script_path = self.scripts_dir / script status["scripts_available"][script] = script_path.exists() return { "success": True, "status": status } except Exception as e: return { "success": False, "error": str(e) } def main(): """主函数 - 命令行接口""" if len(sys.argv) < 2: print(json.dumps({ "error": "Usage: python email_monitor.py <command> [args...]", "commands": { "run": "Run one monitoring cycle", "status": "Get monitoring status", "test": "Test individual components", "config": "Show current configuration" }, "examples": { "run": "python email_monitor.py run", "status": "python email_monitor.py status", "test": "python email_monitor.py test fetch|filter|notify" } })) sys.exit(1) command = sys.argv[1] config_path = None # 检查是否指定了配置文件 if "--config" in sys.argv: config_index = sys.argv.index("--config") if config_index + 1 < len(sys.argv): config_path = sys.argv[config_index + 1] try: monitor = EmailMonitor(config_path) if command == "run": result = monitor.run_monitoring_cycle() print(json.dumps(result, indent=2, ensure_ascii=False)) elif command == "status": result = monitor.get_status() print(json.dumps(result, indent=2, ensure_ascii=False)) elif command == "config": print(json.dumps(monitor.config, indent=2, ensure_ascii=False)) elif command == "test": component = sys.argv[2] if len(sys.argv) > 2 else "all" if component in ["fetch", "all"]: print("Testing email fetch...") result = monitor.fetch_emails() print(json.dumps(result, indent=2, ensure_ascii=False)) if component in ["filter", "all"] and len(sys.argv) > 2: print("Testing AI filter...") # 需要先获取邮件才能测试过滤 fetch_result = monitor.fetch_emails() if fetch_result.get("success"): emails = fetch_result.get("emails", [])[:3] # 只测试前3封 filter_result = monitor.filter_emails_with_ai(emails) print(json.dumps(filter_result, indent=2, ensure_ascii=False)) if component in ["notify", "all"]: print("Testing notification...") # 发送测试通知 test_result = monitor._run_script("notification_service.py", ["test"]) print(json.dumps(test_result, indent=2, ensure_ascii=False)) else: print(json.dumps({"error": f"Unknown command: {command}"})) sys.exit(1) except Exception as e: print(json.dumps({"error": str(e)})) sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server