Skip to main content
Glama
leeguooooo
by leeguooooo
email_sync.py27.6 kB
""" 邮件同步核心逻辑 - 支持多邮箱同步到SQLite """ import logging import email import email.utils from typing import Dict, Any, List, Optional, Tuple from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed import threading import time import re from ..account_manager import AccountManager from ..connection_manager import ConnectionManager from ..database.email_sync_db import EmailSyncDatabase from ..connection_pool import get_connection_pool from ..background.sync_config import SyncConfigManager logger = logging.getLogger(__name__) class EmailSyncManager: """邮件同步管理器""" def __init__(self, db_path: str = None, config: Dict[str, Any] = None, account_manager=None): """ 初始化同步管理器 Args: db_path: 数据库路径 config: 同步配置 account_manager: 可选的 AccountManager 实例(避免重新读取配置) """ self.account_manager = account_manager or AccountManager() self.db = EmailSyncDatabase(db_path) self.sync_lock = threading.Lock() self.sync_status = {} self.config = self._normalize_config(config) self.connection_pool = get_connection_pool() # 延迟导入避免循环依赖 self._health_monitor = None @property def health_monitor(self): """延迟加载健康监控器""" if self._health_monitor is None: from ..background.sync_health_monitor import get_health_monitor self._health_monitor = get_health_monitor() return self._health_monitor def _load_config(self) -> Dict[str, Any]: """加载同步配置""" try: manager = SyncConfigManager() config = manager.get_config() except Exception as e: logger.warning(f"Failed to load sync config via SyncConfigManager: {e}") config = {} # 兼容旧配置(无 sync 节点) sync_cfg = config.get('sync', {}) if isinstance(config.get('sync'), dict) else {} first_sync_days = sync_cfg.get('first_sync_days', config.get('first_sync_days', 180)) incremental_sync_days = sync_cfg.get('incremental_sync_days', config.get('incremental_sync_days', 7)) folders_cfg = config.get('folders', {}) exclude_folders = folders_cfg.get('exclude_folders', config.get('exclude_folders', [])) priority_folders = folders_cfg.get('priority_folders', config.get('priority_folders', [])) sync_all = folders_cfg.get('sync_all', folders_cfg.get('sync_all', True)) return { "first_sync_days": first_sync_days or 180, "incremental_sync_days": incremental_sync_days or 7, "folders": { "sync_all": sync_all, "exclude_folders": exclude_folders, "priority_folders": priority_folders } } def _normalize_config(self, config: Optional[Dict[str, Any]]) -> Dict[str, Any]: """规范化同步配置(兼容传入的全量或部分配置)""" if not config: return self._load_config() # 如果传入的就是 SyncConfigManager 的整体配置,则交给 _load_config 逻辑处理 if isinstance(config, dict) and ('sync' in config or 'folders' in config or 'first_sync_days' in config): # 将外部配置与默认加载的配置合并,避免缺失字段 base = self._load_config() # 提取嵌套的 sync 配置 sync_cfg = config.get('sync', {}) if isinstance(config.get('sync'), dict) else {} base['first_sync_days'] = sync_cfg.get('first_sync_days', config.get('first_sync_days', base['first_sync_days'])) base['incremental_sync_days'] = sync_cfg.get('incremental_sync_days', config.get('incremental_sync_days', base['incremental_sync_days'])) folders_cfg = config.get('folders', {}) base_folders = base.get('folders', {}) base_folders['sync_all'] = folders_cfg.get('sync_all', base_folders.get('sync_all', True)) base_folders['exclude_folders'] = folders_cfg.get('exclude_folders', base_folders.get('exclude_folders', [])) base_folders['priority_folders'] = folders_cfg.get('priority_folders', base_folders.get('priority_folders', [])) base['folders'] = base_folders return base # 未知格式,退回默认 return self._load_config() def sync_all_accounts(self, full_sync: bool = False, max_workers: int = 3) -> Dict[str, Any]: """ 同步所有账户邮件 Args: full_sync: 是否执行完全同步(默认增量同步) max_workers: 并发同步的最大线程数 """ accounts = self.account_manager.list_accounts() if not accounts: return {'success': False, 'error': 'No accounts configured'} logger.info(f"Starting sync for {len(accounts)} accounts (full_sync={full_sync})") results = [] start_time = datetime.now() # 并行同步多个账户 with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交同步任务 future_to_account = { executor.submit(self.sync_single_account, account['id'], full_sync): account for account in accounts } # 收集结果 for future in as_completed(future_to_account): account = future_to_account[future] try: result = future.result() result['account_email'] = account['email'] results.append(result) except Exception as e: logger.error(f"Sync failed for account {account['email']}: {e}") results.append({ 'success': False, 'account_id': account['id'], 'account_email': account['email'], 'error': str(e) }) # 汇总结果 total_added = sum(r.get('emails_added', 0) for r in results) total_updated = sum(r.get('emails_updated', 0) for r in results) successful_accounts = sum(1 for r in results if r.get('success')) sync_time = (datetime.now() - start_time).total_seconds() # 防止 WAL 无限累积,完成全局同步后尝试收缩 self._checkpoint_wal_safe(truncate=True, threshold_mb=256) return { 'success': successful_accounts > 0, 'accounts_synced': successful_accounts, 'total_accounts': len(accounts), 'emails_added': total_added, 'emails_updated': total_updated, 'sync_time': sync_time, 'results': results } def sync_single_account(self, account_id: str, full_sync: bool = False) -> Dict[str, Any]: """ 同步单个账户邮件 Args: account_id: 账户ID full_sync: 是否完全同步 """ # 验证账户存在 account = self.account_manager.get_account(account_id) if not account: error_msg = f'Account {account_id} not found' logger.error(error_msg) return {'success': False, 'error': error_msg} logger.info(f"Syncing account {account['email']} (full_sync={full_sync})") # 记录同步开始 sync_type = 'full' if full_sync else 'incremental' self.health_monitor.record_sync_start(account_id, account['email'], sync_type) start_time = datetime.now() try: # 更新账户信息到数据库 self.db.add_or_update_account(account_id, account['email'], account['provider']) # 使用连接池获取连接 with self.connection_pool.get_connection(account_id, account) as mail: # 获取所有文件夹 folders = self._get_account_folders(mail, account_id) total_added = 0 total_updated = 0 folder_errors: List[Tuple[str, str]] = [] # 同步每个文件夹 for folder_name in folders: try: added, updated = self._sync_folder( mail, account_id, folder_name, full_sync ) total_added += added total_updated += updated except Exception as e: logger.error(f"Failed to sync folder {folder_name}: {e}") folder_errors.append((folder_name, str(e))) continue # 记录文件夹错误,但不中断整个账户同步 if folder_errors: error_details = ", ".join(f"{name}: {err}" for name, err in folder_errors[:3]) logger.warning(f"Some folders failed to sync ({len(folder_errors)} folders). Sample: {error_details}") # 不抛出异常,让账户同步继续完成 # 更新账户同步状态 self._update_account_sync_status(account_id, 'completed', total_added + total_updated) # 记录成功 duration = (datetime.now() - start_time).total_seconds() self.health_monitor.record_sync_result( account_id=account_id, sync_type=sync_type, status='success', emails_synced=total_added + total_updated, duration_seconds=duration ) return { 'success': True, 'account_id': account_id, 'folders_synced': len(folders), 'emails_added': total_added, 'emails_updated': total_updated } except Exception as e: error_msg = str(e) logger.error(f"Account sync failed for {account_id}: {error_msg}") self._update_account_sync_status(account_id, 'failed', 0) # 记录失败 duration = (datetime.now() - start_time).total_seconds() self.health_monitor.record_sync_result( account_id=account_id, sync_type=sync_type, status='failed', emails_synced=0, error_message=error_msg, duration_seconds=duration ) return { 'success': False, 'account_id': account_id, 'error': error_msg } finally: # 单账户同步时也进行 WAL 收缩,防止 -wal 文件膨胀 self._checkpoint_wal_safe(truncate=False, threshold_mb=256) def _get_account_folders(self, mail, account_id: str) -> List[str]: """获取账户的所有文件夹""" try: result, folders = mail.list() if result != 'OK': raise Exception(f"Failed to list folders: {folders}") folder_names = [] for raw_folder in folders: try: decoded = raw_folder.decode('utf-8') except Exception: decoded = str(raw_folder) # IMAP LIST 响应形如: (* FLAGS) "/" "Folder" match = re.search(r'\"([^"]+)\"$', decoded) if match: folder_name = match.group(1) else: parts = decoded.split(' ') folder_name = parts[-1].strip('"') if parts else None if folder_name: folder_names.append(folder_name) # 配置化过滤与优先级 cfg_folders = self.config.get('folders', {}) exclude_cfg = set(cfg_folders.get('exclude_folders', [])) priority_cfg = cfg_folders.get('priority_folders', []) sync_all = cfg_folders.get('sync_all', True) # 过滤掉一些不需要同步的系统文件夹和不可选文件夹 excluded_folders = { '[Gmail]', # Gmail 伪文件夹,无法 SELECT '[Gmail]/All Mail', '[Gmail]/Important', '[Gmail]/Chats', 'Sent Messages', # QQ邮箱的伪文件夹 'Deleted Messages', # QQ邮箱的伪文件夹 'Drafts', # 某些邮箱的草稿箱可能无法访问 'Junk', # 垃圾邮件箱可能无法访问 'NAS &W5pl9k77UqE-' # 163邮箱的问题文件夹,EXAMINE 命令错误 } | exclude_cfg folder_names = [f for f in folder_names if f not in excluded_folders] # 如果 sync_all 关闭,则只同步 priority 列表中存在的文件夹 if not sync_all and priority_cfg: priority_set = set(priority_cfg) folder_names = [f for f in folder_names if f in priority_set] # 按优先列表排序(其余保持原顺序) if priority_cfg: priority_order = {name: idx for idx, name in enumerate(priority_cfg)} folder_names.sort(key=lambda name: priority_order.get(name, len(priority_order))) logger.info(f"Found {len(folder_names)} folders for account {account_id}") return folder_names except Exception as e: logger.error(f"Failed to get folders for account {account_id}: {e}") return ['INBOX'] # 至少同步收件箱 def _sync_folder(self, mail, account_id: str, folder_name: str, full_sync: bool = False) -> Tuple[int, int]: """ 同步文件夹邮件 Returns: Tuple[added_count, updated_count] """ logger.info(f"Syncing folder {folder_name} for account {account_id}") try: # 选择文件夹 result, data = mail.select(folder_name, readonly=True) if result != 'OK': raise Exception(f"Cannot select folder {folder_name}: {data}") total_messages = int(data[0]) if data[0] else 0 # 获取文件夹ID,如果不存在则创建 folder_id = self.db.add_or_update_folder( account_id, folder_name, folder_name, total_messages ) if not folder_id: raise Exception(f"Failed to create/get folder {folder_name}") # 确定同步范围 if full_sync: # 完全同步:获取所有邮件 search_criteria = 'ALL' logger.info(f"Full sync: processing all {total_messages} messages") else: # 检查是否是首次同步(数据库中没有该账户的邮件) is_first_sync = self._is_first_sync(account_id) last_sync_time = self.db.get_last_sync_time(account_id) if is_first_sync or not last_sync_time: # 首次同步:获取配置的天数范围内的邮件 days_back = self.config.get('first_sync_days', 180) # 默认半年 date_from_dt = datetime.now() - timedelta(days=days_back) date_from = date_from_dt.strftime("%d-%b-%Y") search_criteria = f'SINCE {date_from}' logger.info(f"First sync: processing messages since {date_from} (last {days_back} days)") else: # 增量同步:从上次同步时间往前回退一天,避免边界遗漏 buffer_hours = 24 since_dt = last_sync_time - timedelta(hours=buffer_hours) # 如果用户配置了最大增量窗口,则限制最远时间 days_back = self.config.get('incremental_sync_days', 7) max_since_dt = datetime.now() - timedelta(days=days_back) if since_dt < max_since_dt: since_dt = max_since_dt date_from = since_dt.strftime("%d-%b-%Y") search_criteria = f'SINCE {date_from}' logger.info(f"Incremental sync: processing messages since {date_from} (last sync {last_sync_time})") # 搜索邮件 result, data = mail.uid('search', None, search_criteria) if result != 'OK': logger.warning(f"UID search failed for folder {folder_name}: {data}") return 0, 0 email_ids = data[0].split() if data[0] else [] logger.info(f"Found {len(email_ids)} messages to sync in {folder_name}") if not email_ids: # 更新文件夹同步时间 self.db.add_or_update_folder( account_id, folder_name, folder_name, total_messages, last_sync=datetime.now().isoformat() ) return 0, 0 # 批量获取邮件信息 added, updated = self._fetch_and_store_emails(mail, account_id, folder_id, email_ids) # 更新文件夹同步时间 self.db.add_or_update_folder( account_id, folder_name, folder_name, total_messages, last_sync=datetime.now().isoformat() ) return added, updated except Exception as e: logger.error(f"Folder sync failed for {folder_name}: {e}") # 重新抛出异常,让上层知道同步失败 raise def _fetch_and_store_emails(self, mail, account_id: str, folder_id: int, email_ids: List[bytes]) -> Tuple[int, int]: """ 批量获取并存储邮件 Returns: Tuple[added_count, updated_count] """ added_count = 0 updated_count = 0 batch_size = 50 # 批量处理大小 # 分批处理邮件 for i in range(0, len(email_ids), batch_size): batch = email_ids[i:i + batch_size] logger.info(f"Processing batch {i//batch_size + 1}/{(len(email_ids) + batch_size - 1)//batch_size}") try: batch_added, batch_updated = self._process_email_batch( mail, account_id, folder_id, batch ) added_count += batch_added updated_count += batch_updated # 避免过度频繁的IMAP请求 time.sleep(0.1) except Exception as e: logger.error(f"Failed to process email batch: {e}") continue logger.info(f"Folder sync completed: {added_count} added, {updated_count} updated") return added_count, updated_count def _process_email_batch(self, mail, account_id: str, folder_id: int, email_ids: List[bytes]) -> Tuple[int, int]: """处理邮件批次""" added_count = 0 updated_count = 0 for email_id in email_ids: try: # 获取邮件头部信息 result, data = mail.uid('fetch', email_id, '(RFC822.HEADER FLAGS UID)') if result != 'OK': continue # 解析邮件数据 email_data = self._parse_email_data(data, account_id, folder_id) if not email_data: continue # 存储到数据库 email_db_id, is_new = self.db.add_or_update_email(email_data) if email_db_id: if is_new: added_count += 1 else: updated_count += 1 except Exception as e: logger.warning(f"Failed to process email {email_id}: {e}") continue return added_count, updated_count def _parse_email_data(self, fetch_data: List, account_id: str, folder_id: int) -> Optional[Dict[str, Any]]: """解析IMAP返回的邮件数据""" try: # 提取UID uid = None flags = [] header_data = None for response_part in fetch_data: if isinstance(response_part, tuple): # 解析flags和UID response_str = response_part[0].decode() if 'UID' in response_str: import re uid_match = re.search(r'UID (\d+)', response_str) if uid_match: uid = uid_match.group(1) if 'FLAGS' in response_str: flags_match = re.search(r'FLAGS \((.*?)\)', response_str) if flags_match: flags = flags_match.group(1).split() header_data = response_part[1] if not uid or not header_data: return None # 解析邮件头部 msg = email.message_from_bytes(header_data) # 解析发件人 from_header = msg.get('From', '') sender, sender_email = self._parse_email_address(from_header) # 解析收件人 recipients = [] for field in ['To', 'Cc', 'Bcc']: if msg.get(field): recipients.extend(self._parse_email_addresses(msg.get(field))) # 解析日期 date_sent = None date_header = msg.get('Date') if date_header: try: date_tuple = email.utils.parsedate_tz(date_header) if date_tuple: timestamp = email.utils.mktime_tz(date_tuple) date_sent = datetime.fromtimestamp(timestamp) except: pass # 解析标志 is_read = '\\Seen' in flags is_flagged = '\\Flagged' in flags return { 'account_id': account_id, 'folder_id': folder_id, 'uid': uid, 'message_id': msg.get('Message-ID', ''), 'subject': self._decode_header(msg.get('Subject', '')), 'sender': sender, 'sender_email': sender_email, 'recipients': recipients, 'date_sent': date_sent, 'is_read': is_read, 'is_flagged': is_flagged, 'has_attachments': self._has_attachments(msg), 'size_bytes': len(header_data) } except Exception as e: logger.error(f"Failed to parse email data: {e}") return None def _parse_email_address(self, address_str: str) -> Tuple[str, str]: """解析邮件地址,返回(显示名称, 邮件地址)""" try: parsed = email.utils.parseaddr(address_str) name, addr = parsed name = self._decode_header(name) if name else addr return name, addr except: return address_str, address_str def _parse_email_addresses(self, addresses_str: str) -> List[Dict[str, str]]: """解析多个邮件地址""" try: addresses = email.utils.getaddresses([addresses_str]) return [ {'name': self._decode_header(name) if name else addr, 'email': addr} for name, addr in addresses ] except: return [] def _decode_header(self, header_value: str) -> str: """解码邮件头部""" if not header_value: return "" try: from email.header import decode_header decoded_parts = decode_header(header_value) result = [] for part, encoding in decoded_parts: if isinstance(part, bytes): if encoding: try: result.append(part.decode(encoding)) except: result.append(part.decode('utf-8', errors='ignore')) else: result.append(part.decode('utf-8', errors='ignore')) else: result.append(str(part)) return ' '.join(result) except: return str(header_value) def _has_attachments(self, msg: email.message.Message) -> bool: """检查邮件是否有附件""" try: return msg.is_multipart() and any( part.get_content_disposition() == 'attachment' for part in msg.walk() ) except: return False def _update_account_sync_status(self, account_id: str, status: str, total_emails: int): """更新账户同步状态""" if not self.db.update_account_sync_status(account_id, status, total_emails): logger.error(f"Failed to update account sync status for {account_id}") def _is_first_sync(self, account_id: str) -> bool: """检查是否是账户的首次同步""" count = self.db.get_email_count_for_account(account_id) if count is None: logger.warning("Failed to check first sync status from database, defaulting to first sync") return True return count == 0 def get_sync_status(self) -> Dict[str, Any]: """获取同步状态""" return self.db.get_sync_status() def search_cached_emails(self, query: str, account_id: str = None, limit: int = 50) -> List[Dict[str, Any]]: """搜索缓存的邮件""" return self.db.search_emails(query, account_id, limit=limit) def get_recent_emails(self, account_id: str = None, limit: int = 50) -> List[Dict[str, Any]]: """获取最近邮件""" return self.db.get_recent_emails(account_id, limit) def _checkpoint_wal_safe(self, truncate: bool, threshold_mb: int): """Best-effort WAL checkpoint,避免异常抛出影响业务流程""" try: self.db.checkpoint_wal(truncate=truncate, wal_size_mb_threshold=threshold_mb) except Exception: logger.debug("Skip WAL checkpoint due to error", exc_info=True) def close(self): """关闭同步管理器""" if self.db: self.db.close()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server