Skip to main content
Glama
leeguooooo
by leeguooooo
multi_folder_fetch.py11.3 kB
""" Enhanced email fetching from multiple folders """ import logging from typing import List, Dict, Any, Optional from concurrent.futures import ThreadPoolExecutor, as_completed import threading from datetime import datetime from ..connection_manager import ConnectionManager from ..account_manager import AccountManager from ..legacy_operations import fetch_emails logger = logging.getLogger(__name__) # UTF-7 folder name mappings for 163 FOLDER_NAME_MAP = { '&g0l6P3ux-': '草稿箱', '&XfJT0ZAB-': '已发送', '&XfJSIJZk-': '已删除', '&V4NXPpCuTvY-': '垃圾邮件', '&dcVr0mWHTvZZOQ-': '病毒文件夹', '&Xn9USpCuTvY-': '广告邮件', '&i6KWBZCuTvY-': '订阅邮件', '&ZeB1KJCuTvY-': '通知邮件', '&lj+RzE6R-': '计费', '&XA98cw-': '开票' } class MultiFolderFetcher: """Fetch emails from multiple folders across accounts""" def __init__(self, max_workers: int = 5, account_manager=None): """ Initialize multi-folder fetcher Args: max_workers: Maximum number of concurrent operations account_manager: Optional AccountManager instance to reuse (avoids re-reading config) """ self.max_workers = max_workers self._lock = threading.Lock() self.account_manager = account_manager or AccountManager() def fetch_all_unread( self, account_id: Optional[str] = None, limit_per_folder: int = 50, include_folders: Optional[List[str]] = None, exclude_folders: Optional[List[str]] = None ) -> Dict[str, Any]: """ Fetch unread emails from all folders Args: account_id: Specific account or None for all limit_per_folder: Max emails per folder include_folders: Only include these folders (None = all) exclude_folders: Exclude these folders Returns: Combined results from all folders """ # Default exclude list if exclude_folders is None: exclude_folders = ['&g0l6P3ux-', 'Drafts', '草稿箱', 'Notes'] # Get accounts to process if account_id: account = self.account_manager.get_account(account_id) accounts = [account] if account else [] else: accounts = [ self.account_manager.get_account(acc['id']) for acc in self.account_manager.list_accounts() ] all_results = [] start_time = datetime.now() with ThreadPoolExecutor(max_workers=min(self.max_workers, len(accounts))) as executor: futures = { executor.submit( self._fetch_account_all_folders, account, limit_per_folder, include_folders, exclude_folders ): account for account in accounts if account } for future in as_completed(futures): account = futures[future] try: result = future.result() all_results.append(result) except Exception as e: logger.error(f"Failed to fetch from {account['email']}: {e}") all_results.append({ 'account': account['email'], 'success': False, 'error': str(e), 'emails': [], 'folders_info': [] }) # Combine all results all_emails = [] total_unread = 0 accounts_info = [] failed_accounts = [] for result in all_results: if result['success']: all_emails.extend(result['emails']) total_unread += result['total_unread'] accounts_info.append({ 'account': result['account'], 'folders_checked': result['folders_checked'], 'total_unread': result['total_unread'], 'folders_info': result['folders_info'] }) else: failed_accounts.append({ 'account': result['account'], 'error': result.get('error', 'Unknown error') }) # Sort by date all_emails.sort(key=lambda x: x.get('date', ''), reverse=True) elapsed_time = (datetime.now() - start_time).total_seconds() result = { 'success': True, 'emails': all_emails, 'total_emails': len(all_emails), 'total_unread': total_unread, 'accounts_info': accounts_info, 'fetch_time': elapsed_time } if failed_accounts: result['failed_accounts'] = failed_accounts logger.info(f"Multi-folder fetch completed in {elapsed_time:.2f}s: " f"{len(all_emails)} emails from {len(accounts_info)} accounts") return result def _fetch_account_all_folders( self, account: Dict[str, Any], limit_per_folder: int, include_folders: Optional[List[str]], exclude_folders: List[str] ) -> Dict[str, Any]: """Fetch unread from all folders in one account""" try: conn_mgr = ConnectionManager(account) mail = conn_mgr.connect_imap() # List all folders result, folders = mail.list() if result != 'OK': raise ValueError("Failed to list folders") all_emails = [] folders_info = [] total_unread = 0 folders_checked = 0 for folder_data in folders: try: # Parse folder name folder_str = folder_data.decode('utf-8') # Try different parsing methods folder_name = None # Method 1: Standard format with separator if ' "/" ' in folder_str: parts = folder_str.split(' "/" ') if len(parts) >= 2: folder_name = parts[-1].strip('"') # Method 2: Other separators elif '"." ' in folder_str: parts = folder_str.split('"." ') folder_name = parts[-1].strip('"') elif '" " ' in folder_str: parts = folder_str.split('" " ') folder_name = parts[-1].strip('"') else: # Method 3: Extract quoted string at end import re match = re.search(r'"([^"]+)"$', folder_str) if match: folder_name = match.group(1) if not folder_name: logger.warning(f"Could not parse folder name from: {folder_str}") continue # Check include/exclude if include_folders and folder_name not in include_folders: continue if exclude_folders and folder_name in exclude_folders: continue # Select folder and check for unread result, data = mail.select(folder_name, readonly=True) if result == 'OK': # Quick check for unread result, data = mail.search(None, 'UNSEEN') if result == 'OK' and data[0]: unread_ids = data[0].split() unread_count = len(unread_ids) if unread_count > 0: # Get human-readable folder name display_name = FOLDER_NAME_MAP.get(folder_name, folder_name) logger.info(f"{account['email']}: Found {unread_count} unread in {display_name}") # Fetch emails from this folder folder_emails = self._fetch_from_folder( account, folder_name, min(limit_per_folder, unread_count) ) # Add folder info to emails for email in folder_emails: email['folder'] = folder_name email['folder_display'] = display_name all_emails.extend(folder_emails) folders_info.append({ 'folder': folder_name, 'folder_display': display_name, 'unread': unread_count, 'fetched': len(folder_emails) }) total_unread += unread_count folders_checked += 1 except Exception as e: logger.warning(f"Failed to check folder {folder_name}: {e}") continue mail.logout() return { 'account': account['email'], 'success': True, 'emails': all_emails, 'total_unread': total_unread, 'folders_checked': folders_checked, 'folders_info': folders_info } except Exception as e: logger.error(f"Failed to fetch from {account['email']}: {e}") return { 'account': account['email'], 'success': False, 'error': str(e), 'emails': [], 'folders_info': [] } def _fetch_from_folder( self, account: Dict[str, Any], folder: str, limit: int ) -> List[Dict[str, Any]]: """Fetch emails from specific folder""" try: result = fetch_emails( limit=limit, unread_only=True, folder=folder, account_id=account['id'] ) if 'error' not in result: return result.get('emails', []) except Exception as e: logger.error(f"Failed to fetch from {folder}: {e}") return [] # Global instance multi_folder_fetcher = MultiFolderFetcher()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server