Skip to main content
Glama
leeguooooo
by leeguooooo
folder_scan.py8.42 kB
""" Scan all folders for unread emails """ import logging from typing import Dict, Any, List, Optional from concurrent.futures import ThreadPoolExecutor, as_completed import threading from ..connection_manager import ConnectionManager from ..account_manager import AccountManager logger = logging.getLogger(__name__) class FolderScanner: """Scan all folders in email accounts for unread messages""" def __init__(self, max_workers: int = 3): self.max_workers = max_workers self._lock = threading.Lock() self.account_manager = AccountManager() def scan_all_folders(self, account_id: Optional[str] = None) -> Dict[str, Any]: """ Scan all folders in specified account(s) for unread emails Args: account_id: Specific account to scan, or None for all accounts Returns: Dict with folder scan results """ if account_id: account = self.account_manager.get_account(account_id) if not account: return {'success': False, 'error': f'Account not found: {account_id}'} accounts = [account] else: accounts = [ self.account_manager.get_account(acc['id']) for acc in self.account_manager.list_accounts() ] all_results = [] with ThreadPoolExecutor(max_workers=min(self.max_workers, len(accounts))) as executor: futures = { executor.submit(self._scan_account_folders, account): account for account in accounts if account } for future in as_completed(futures): account = futures[future] try: result = future.result() all_results.append(result) except Exception as e: logger.error(f"Failed to scan {account['email']}: {e}") all_results.append({ 'account': account['email'], 'success': False, 'error': str(e) }) return { 'success': True, 'accounts_scanned': len(all_results), 'results': all_results } def _scan_account_folders(self, account: Dict[str, Any]) -> Dict[str, Any]: """Scan all folders in a single account""" try: conn_mgr = ConnectionManager(account) mail = conn_mgr.connect_imap() # List all folders result, folders = mail.list() if result != 'OK': raise ValueError("Failed to list folders") folder_results = [] total_unread = 0 for folder_data in folders: try: # Parse folder name parts = folder_data.decode('utf-8').split(' "." ') if len(parts) >= 2: folder_name = parts[-1].strip('"') # Skip certain system folders skip_folders = ['[Gmail]', 'Notes', 'Junk E-mail'] if any(skip in folder_name for skip in skip_folders): continue # Select folder result, data = mail.select(folder_name, readonly=True) if result == 'OK': total_messages = int(data[0].decode()) if data[0] else 0 # Search for unread result, data = mail.search(None, 'UNSEEN') if result == 'OK': unread_ids = data[0].split() if data[0] else [] unread_count = len(unread_ids) if unread_count > 0: folder_results.append({ 'folder': folder_name, 'total': total_messages, 'unread': unread_count }) total_unread += unread_count logger.info(f"{account['email']}: {folder_name} has {unread_count} unread") except Exception as e: logger.warning(f"Failed to scan folder {folder_name}: {e}") continue mail.logout() return { 'account': account['email'], 'provider': account.get('provider'), 'success': True, 'total_unread': total_unread, 'folders_with_unread': folder_results } except Exception as e: logger.error(f"Failed to scan folders for {account['email']}: {e}") return { 'account': account['email'], 'success': False, 'error': str(e) } def fetch_unread_from_all_folders( self, account_id: Optional[str] = None, limit_per_folder: int = 10 ) -> Dict[str, Any]: """ Fetch unread emails from all folders Args: account_id: Specific account or None for all limit_per_folder: Max emails to fetch per folder Returns: Dict with emails from all folders """ # First scan to find folders with unread scan_result = self.scan_all_folders(account_id) if not scan_result['success']: return scan_result all_emails = [] for account_result in scan_result['results']: if not account_result.get('success'): continue account_email = account_result['account'] folders_with_unread = account_result.get('folders_with_unread', []) if not folders_with_unread: continue # Get account data for fetching account = None for acc in self.account_manager.list_accounts(): if acc['email'] == account_email: account = self.account_manager.get_account(acc['id']) break if not account: continue # Fetch from each folder with unread for folder_info in folders_with_unread: folder_name = folder_info['folder'] try: emails = self._fetch_from_folder( account, folder_name, limit_per_folder ) all_emails.extend(emails) except Exception as e: logger.error(f"Failed to fetch from {folder_name}: {e}") # Sort by date all_emails.sort(key=lambda x: x.get('date', ''), reverse=True) return { 'success': True, 'emails': all_emails, 'total_found': len(all_emails), 'scan_summary': scan_result } def _fetch_from_folder( self, account: Dict[str, Any], folder: str, limit: int ) -> List[Dict[str, Any]]: """Fetch unread emails from a specific folder""" from ..legacy_operations import fetch_emails # Use existing fetch function with specific folder result = fetch_emails( limit=limit, unread_only=True, folder=folder, account_id=account['id'] ) if 'error' not in result: emails = result.get('emails', []) # Add folder info to each email for email in emails: email['folder'] = folder email['account'] = account['email'] return emails return [] # Global instance folder_scanner = FolderScanner()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server