Skip to main content
Glama
leeguooooo
by leeguooooo
parallel_search.py8.96 kB
""" Parallel search operations across multiple email accounts """ import logging from typing import List, Dict, Any, Optional from concurrent.futures import ThreadPoolExecutor, as_completed import threading from datetime import datetime from ..connection_manager import ConnectionManager from ..account_manager import AccountManager from .search_operations import SearchOperations logger = logging.getLogger(__name__) class ParallelSearchOperations: """Search emails across multiple accounts in parallel""" def __init__(self, max_workers: int = 5, account_manager=None): """ Initialize parallel search handler Args: max_workers: Maximum number of concurrent searches account_manager: Optional AccountManager instance to reuse (avoids re-reading config) """ self.max_workers = max_workers self._lock = threading.Lock() self.account_manager = account_manager or AccountManager() def search_all_accounts( self, query: Optional[str] = None, search_in: str = "all", date_from: Optional[str] = None, date_to: Optional[str] = None, folder: str = "INBOX", unread_only: bool = False, has_attachments: Optional[bool] = None, limit_per_account: int = 50, account_ids: Optional[List[str]] = None ) -> Dict[str, Any]: """ Search emails across all accounts in parallel Args: query: Search query text search_in: Where to search (subject/from/body/to/all) date_from: Start date (YYYY-MM-DD) date_to: End date (YYYY-MM-DD) folder: Folder to search in unread_only: Only search unread emails has_attachments: Filter by attachment presence limit_per_account: Max results per account account_ids: Specific account IDs to search (None = all accounts) Returns: Combined search results from all accounts """ # Get accounts to search if account_ids: accounts = [ self.account_manager.get_account(acc_id) for acc_id in account_ids if self.account_manager.get_account(acc_id) ] else: accounts = self.account_manager.list_accounts() if not accounts: return { 'success': False, 'error': 'No accounts available for search', 'emails': [], 'total_found': 0 } all_emails = [] accounts_info = [] total_found = 0 failed_accounts = [] start_time = datetime.now() # Execute searches in parallel with ThreadPoolExecutor(max_workers=min(self.max_workers, len(accounts))) as executor: # Submit search tasks future_to_account = { executor.submit( self._search_single_account, account, query, search_in, date_from, date_to, folder, unread_only, has_attachments, limit_per_account ): account for account in accounts } # Collect results for future in as_completed(future_to_account): account = future_to_account[future] try: result = future.result() if result['success']: with self._lock: # Add account email to each result for email in result['emails']: email['account'] = account['email'] email['account_id'] = account['id'] all_emails.extend(result['emails']) accounts_info.append({ 'account': account['email'], 'found': result.get('displayed', 0), 'total_in_account': result.get('total_found', 0) }) total_found += result.get('total_found', 0) else: failed_accounts.append({ 'account': account['email'], 'error': result.get('error', 'Unknown error') }) except Exception as e: logger.error(f"Search failed for {account['email']}: {e}") failed_accounts.append({ 'account': account['email'], 'error': str(e) }) # Sort combined results by date (newest first) all_emails.sort(key=lambda x: x.get('date', ''), reverse=True) # Apply global limit if needed total_limit = limit_per_account * len(accounts) all_emails = all_emails[:total_limit] elapsed_time = (datetime.now() - start_time).total_seconds() result = { 'success': len(all_emails) > 0 or len(failed_accounts) < len(accounts), 'emails': all_emails, 'total_found': total_found, 'displayed': len(all_emails), 'accounts_searched': len(accounts), 'accounts_info': accounts_info, 'search_time': elapsed_time, 'search_params': { 'query': query, 'search_in': search_in, 'date_from': date_from, 'date_to': date_to, 'folder': folder, 'unread_only': unread_only, 'has_attachments': has_attachments } } if failed_accounts: result['failed_accounts'] = failed_accounts logger.info(f"Parallel search completed in {elapsed_time:.2f}s: " f"found {len(all_emails)} emails from {len(accounts_info)} accounts") return result def _search_single_account( self, account: Dict[str, Any], query: Optional[str], search_in: str, date_from: Optional[str], date_to: Optional[str], folder: str, unread_only: bool, has_attachments: Optional[bool], limit: int ) -> Dict[str, Any]: """ Search in a single account Args: account: Account configuration ... (other args same as search_all_accounts) Returns: Search results for this account """ try: logger.info(f"Searching in {account['email']}...") # Create connection and search operations conn_mgr = ConnectionManager(account) search_ops = SearchOperations(conn_mgr) # Execute search result = search_ops.search_emails( query=query, search_in=search_in, date_from=date_from, date_to=date_to, folder=folder, unread_only=unread_only, has_attachments=has_attachments, limit=limit ) return result except Exception as e: logger.error(f"Search error for {account['email']}: {e}") return { 'success': False, 'error': str(e), 'emails': [] } def search_by_criteria_parallel( self, criteria: Dict[str, Any], account_ids: Optional[List[str]] = None ) -> Dict[str, Any]: """ Search with a criteria dictionary across multiple accounts Args: criteria: Search criteria dict account_ids: Specific accounts to search Returns: Combined search results """ return self.search_all_accounts( query=criteria.get('query'), search_in=criteria.get('search_in', 'all'), date_from=criteria.get('date_from'), date_to=criteria.get('date_to'), folder=criteria.get('folder', 'INBOX'), unread_only=criteria.get('unread_only', False), has_attachments=criteria.get('has_attachments'), limit_per_account=criteria.get('limit', 50), account_ids=account_ids ) # Global instance parallel_search = ParallelSearchOperations(max_workers=5)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server