Skip to main content
Glama
leeguooooo
by leeguooooo
parallel_operations.py15.3 kB
""" Parallel operations for batch email processing with safety checks """ import logging from typing import List, Dict, Any, Optional, Callable from concurrent.futures import ThreadPoolExecutor, as_completed import threading from datetime import datetime from ..connection_manager import ConnectionManager from ..account_manager import AccountManager from ..legacy_operations import _close_mail_safely logger = logging.getLogger(__name__) class ParallelOperations: """Execute batch operations in parallel with account safety""" def __init__(self, max_workers: int = 5, account_manager=None): """ Initialize parallel operations handler Args: max_workers: Maximum number of concurrent operations account_manager: Optional AccountManager instance to reuse (avoids re-reading config) """ self.max_workers = max_workers self._lock = threading.Lock() self.account_manager = account_manager or AccountManager() def execute_batch_operation( self, operation_func: Callable, email_ids: List[str], folder: str = "INBOX", account_id: Optional[str] = None, **kwargs ) -> Dict[str, Any]: """ Execute a batch operation on emails with proper account isolation Args: operation_func: Function to execute (delete, mark_read, etc.) email_ids: List of email IDs folder: Email folder account_id: Specific account ID (required for safety) **kwargs: Additional arguments for operation_func Returns: Combined results with success/failure counts """ if not account_id: return { 'success': False, 'error': 'Account ID is required for batch operations to ensure safety' } # Verify account exists account = self.account_manager.get_account(account_id) if not account: return { 'success': False, 'error': f'Account not found: {account_id}' } # Get connection manager for specific account conn_mgr = ConnectionManager(account) # Execute operation try: result = operation_func( connection_manager=conn_mgr, email_ids=email_ids, folder=folder, **kwargs ) # Add account info to result result['account'] = account['email'] result['account_id'] = account_id return result except Exception as e: logger.error(f"Batch operation failed for {account['email']}: {e}") return { 'success': False, 'error': str(e), 'account': account['email'], 'account_id': account_id } def execute_multi_account_operation( self, operation_func: Callable, accounts: List[Dict[str, Any]], **kwargs ) -> Dict[str, Any]: """ Execute an operation across multiple accounts in parallel Args: operation_func: Function to execute per account accounts: List of account configurations **kwargs: Arguments passed to operation_func Returns: Combined results from all accounts """ all_results = [] failed_accounts = [] success_count = 0 start_time = datetime.now() # Use ThreadPoolExecutor for parallel execution with ThreadPoolExecutor(max_workers=min(self.max_workers, len(accounts))) as executor: # Submit tasks for each account future_to_account = { executor.submit( self._execute_for_account, operation_func, account, **kwargs ): account for account in accounts } # Collect results as they complete for future in as_completed(future_to_account): account = future_to_account[future] try: result = future.result() with self._lock: if result.get('success', False): all_results.append(result) success_count += 1 else: failed_accounts.append({ 'account': account['email'], 'error': result.get('error', 'Unknown error') }) except Exception as e: logger.error(f"Operation failed for {account['email']}: {e}") with self._lock: failed_accounts.append({ 'account': account['email'], 'error': str(e) }) elapsed_time = (datetime.now() - start_time).total_seconds() return { 'success': success_count > 0, 'results': all_results, 'failed_accounts': failed_accounts, 'success_count': success_count, 'total_accounts': len(accounts), 'execution_time': elapsed_time } def _execute_for_account( self, operation_func: Callable, account: Dict[str, Any], **kwargs ) -> Dict[str, Any]: """ Execute operation for a single account Args: operation_func: Function to execute account: Account configuration **kwargs: Additional arguments Returns: Operation result """ try: conn_mgr = ConnectionManager(account) result = operation_func( connection_manager=conn_mgr, account_id=account['id'], **kwargs ) # Ensure account info is in result if 'account' not in result: result['account'] = account['email'] if 'account_id' not in result: result['account_id'] = account['id'] return result except Exception as e: logger.error(f"Account operation failed for {account['email']}: {e}") return { 'success': False, 'error': str(e), 'account': account['email'], 'account_id': account['id'] } # Parallel batch operation implementations class ParallelBatchOperations: """Specific batch operations with parallel support""" @staticmethod def batch_delete_emails( connection_manager: ConnectionManager, email_ids: List[str], folder: str = "INBOX", **kwargs ) -> Dict[str, Any]: """ Delete multiple emails (with account safety) Args: connection_manager: Connection for specific account email_ids: List of email IDs to delete folder: Email folder Returns: Dict with deletion results """ try: mail = connection_manager.connect_imap() try: result, data = mail.select(folder) if result != 'OK': logger.debug(f"Folder '{folder}' not available for account {account_id}: {data}") return {'success': False, 'error': f'Folder {folder} not available'} deleted_count = 0 failed_ids = [] for email_id in email_ids: try: result, data = mail.store(email_id, '+FLAGS', '\\Deleted') if result == 'OK': deleted_count += 1 else: failed_ids.append(email_id) except Exception as e: logger.warning(f"Failed to delete email {email_id}: {e}") failed_ids.append(email_id) # Expunge to permanently delete mail.expunge() result_data = { 'success': True, 'message': f'Deleted {deleted_count}/{len(email_ids)} emails', 'deleted_count': deleted_count, 'total_requested': len(email_ids), 'folder': folder } if failed_ids: result_data['failed_ids'] = failed_ids return result_data finally: _close_mail_safely(mail) except Exception as e: logger.error(f"Batch delete failed: {e}") return { 'success': False, 'error': str(e), 'deleted_count': 0, 'total_requested': len(email_ids) } @staticmethod def batch_mark_emails( connection_manager: ConnectionManager, email_ids: List[str], folder: str = "INBOX", mark_as: str = "read", **kwargs ) -> Dict[str, Any]: """ Mark multiple emails as read/unread Args: connection_manager: Connection for specific account email_ids: List of email IDs folder: Email folder mark_as: "read" or "unread" Returns: Dict with marking results """ try: mail = connection_manager.connect_imap() try: result, data = mail.select(folder) if result != 'OK': logger.debug(f"Folder '{folder}' not available for account {account_id}: {data}") return {'success': False, 'error': f'Folder {folder} not available'} marked_count = 0 failed_ids = [] # Determine flag operation if mark_as == "read": flag_op = '+FLAGS' flag = '\\Seen' else: flag_op = '-FLAGS' flag = '\\Seen' for email_id in email_ids: try: result, data = mail.store(email_id, flag_op, flag) if result == 'OK': marked_count += 1 else: failed_ids.append(email_id) except Exception as e: logger.warning(f"Failed to mark email {email_id}: {e}") failed_ids.append(email_id) result_data = { 'success': True, 'message': f'Marked {marked_count}/{len(email_ids)} emails as {mark_as}', 'marked_count': marked_count, 'total_requested': len(email_ids), 'mark_as': mark_as, 'folder': folder } if failed_ids: result_data['failed_ids'] = failed_ids return result_data finally: _close_mail_safely(mail) except Exception as e: logger.error(f"Batch mark failed: {e}") return { 'success': False, 'error': str(e), 'marked_count': 0, 'total_requested': len(email_ids) } @staticmethod def batch_move_emails( connection_manager: ConnectionManager, email_ids: List[str], source_folder: str = "INBOX", target_folder: str = "Trash", **kwargs ) -> Dict[str, Any]: """ Move multiple emails to another folder Args: connection_manager: Connection for specific account email_ids: List of email IDs source_folder: Source folder target_folder: Target folder Returns: Dict with move results """ try: mail = connection_manager.connect_imap() try: result, data = mail.select(source_folder) if result != 'OK': logger.debug(f"Source folder '{source_folder}' not available for account {account_id}: {data}") return {'success': False, 'error': f'Source folder {source_folder} not available'} moved_count = 0 failed_ids = [] for email_id in email_ids: try: # Copy to target folder result, data = mail.copy(email_id, target_folder) if result == 'OK': # Mark as deleted in source result, data = mail.store(email_id, '+FLAGS', '\\Deleted') if result == 'OK': moved_count += 1 else: failed_ids.append(email_id) else: failed_ids.append(email_id) except Exception as e: logger.warning(f"Failed to move email {email_id}: {e}") failed_ids.append(email_id) # Expunge to remove from source mail.expunge() result_data = { 'success': True, 'message': f'Moved {moved_count}/{len(email_ids)} emails to {target_folder}', 'moved_count': moved_count, 'total_requested': len(email_ids), 'source_folder': source_folder, 'target_folder': target_folder } if failed_ids: result_data['failed_ids'] = failed_ids return result_data finally: _close_mail_safely(mail) except Exception as e: logger.error(f"Batch move failed: {e}") return { 'success': False, 'error': str(e), 'moved_count': 0, 'total_requested': len(email_ids) } # Global instance for reuse parallel_ops = ParallelOperations(max_workers=5) batch_ops = ParallelBatchOperations()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server