Skip to main content
Glama
leeguooooo
by leeguooooo
parallel_fetch.py7.12 kB
""" Parallel email fetching for better performance """ import asyncio import logging from typing import List, Dict, Any, Optional from concurrent.futures import ThreadPoolExecutor, as_completed import threading from datetime import datetime from ..legacy_operations import fetch_emails logger = logging.getLogger(__name__) class ParallelEmailFetcher: """Fetch emails from multiple accounts in parallel""" def __init__(self, max_workers: int = 5): """ Initialize parallel fetcher Args: max_workers: Maximum number of concurrent connections """ self.max_workers = max_workers self._lock = threading.Lock() def fetch_from_account( self, account_info: Dict[str, Any], limit: int, unread_only: bool, folder: str, use_cache: bool = False ) -> Dict[str, Any]: """ Fetch emails from a single account Args: use_cache: Whether to use cached data from email_sync.db Returns: Dict with emails and account info """ try: account_id = account_info['id'] account_email = account_info['email'] logger.info(f"Fetching from {account_email}...") # CRITICAL: Forward use_cache to fetch_emails result = fetch_emails(limit, unread_only, folder, account_id, use_cache) if "error" not in result: # Add account email to each email emails = result['emails'] for email in emails: email['account'] = account_email return { 'success': True, 'account': account_email, 'emails': emails, 'total': result['total_in_folder'], 'unread': result['unread_count'], 'fetched': len(emails) } else: return { 'success': False, 'account': account_email, 'error': result['error'], 'emails': [], 'total': 0, 'unread': 0, 'fetched': 0 } except Exception as e: logger.error(f"Failed to fetch from {account_info.get('email', 'unknown')}: {e}") return { 'success': False, 'account': account_info.get('email', 'unknown'), 'error': str(e), 'emails': [], 'total': 0, 'unread': 0, 'fetched': 0 } def fetch_all_parallel( self, accounts: List[Dict[str, Any]], limit: int = 50, unread_only: bool = False, folder: str = "INBOX", use_cache: bool = False ) -> Dict[str, Any]: """ Fetch emails from all accounts in parallel Args: use_cache: Whether to use cached data from email_sync.db Returns: Combined results from all accounts """ all_emails = [] accounts_info = [] total_emails = 0 total_unread = 0 failed_accounts = [] start_time = datetime.now() # Use ThreadPoolExecutor for parallel fetching with ThreadPoolExecutor(max_workers=min(self.max_workers, len(accounts))) as executor: # Submit all tasks # CRITICAL: Forward use_cache to each account fetch future_to_account = { executor.submit( self.fetch_from_account, account, limit, unread_only, folder, use_cache ): account for account in accounts } # Collect results as they complete for future in as_completed(future_to_account): account = future_to_account[future] try: result = future.result() if result['success']: # Thread-safe operations with self._lock: all_emails.extend(result['emails']) accounts_info.append({ 'account': result['account'], 'total': result['total'], 'unread': result['unread'], 'fetched': result['fetched'] }) total_emails += result['total'] total_unread += result['unread'] else: failed_accounts.append({ 'account': result['account'], 'error': result['error'] }) except Exception as e: logger.error(f"Unexpected error for account {account.get('email', 'unknown')}: {e}") failed_accounts.append({ 'account': account.get('email', 'unknown'), 'error': str(e) }) # Sort all emails by date (newest first) all_emails.sort(key=lambda x: x.get('date', ''), reverse=True) # Limit total results all_emails = all_emails[:limit] elapsed_time = (datetime.now() - start_time).total_seconds() logger.info(f"Parallel fetch completed in {elapsed_time:.2f} seconds") result = { 'emails': all_emails, 'total_emails': total_emails, 'total_unread': total_unread, 'accounts_count': len(accounts), 'accounts_info': accounts_info, 'fetch_time': elapsed_time } if failed_accounts: result['failed_accounts'] = failed_accounts result['success_rate'] = len(accounts_info) / len(accounts) * 100 return result # Global instance for reuse parallel_fetcher = ParallelEmailFetcher(max_workers=5) def fetch_emails_parallel( accounts: List[Dict[str, Any]], limit: int = 50, unread_only: bool = False, folder: str = "INBOX", use_cache: bool = False ) -> Dict[str, Any]: """ Convenience function to fetch emails in parallel Args: accounts: List of account configurations limit: Maximum number of emails to fetch per account unread_only: Only fetch unread emails folder: Folder name to fetch from use_cache: Whether to use cached data from email_sync.db Returns: Combined results from all accounts """ return parallel_fetcher.fetch_all_parallel(accounts, limit, unread_only, folder, use_cache)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server