Skip to main content
Glama
leeguooooo
by leeguooooo
fast_fetch.py6.31 kB
""" Fast email fetching with minimal overhead """ import logging from typing import Dict, Any, List, Optional from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime import threading from ..account_manager import AccountManager from ..legacy_operations import fetch_emails logger = logging.getLogger(__name__) # Simple cache for recent fetches _fetch_cache = {} _cache_lock = threading.Lock() _cache_ttl = 300 # 5 minutes (increased from 30s) def get_cached_result(key: str) -> Optional[Dict]: """Get cached result if available and not expired""" with _cache_lock: if key in _fetch_cache: timestamp, data = _fetch_cache[key] if (datetime.now() - timestamp).total_seconds() < _cache_ttl: logger.info(f"Cache hit for {key}") return data else: del _fetch_cache[key] return None def set_cached_result(key: str, data: Dict): """Cache a result""" with _cache_lock: _fetch_cache[key] = (datetime.now(), data) # Limit cache size if len(_fetch_cache) > 10: # Remove oldest entries sorted_items = sorted(_fetch_cache.items(), key=lambda x: x[1][0]) for old_key, _ in sorted_items[:5]: del _fetch_cache[old_key] def fast_fetch_single_account(account_id: str, limit: int, unread_only: bool) -> Dict[str, Any]: """Fast fetch for a single account - only check INBOX for speed""" try: # Only check INBOX for maximum speed result = fetch_emails( limit=limit, unread_only=unread_only, folder='INBOX', account_id=account_id ) if 'error' in result: return { 'account_id': account_id, 'emails': [], 'total': 0, 'unread': 0, 'error': result['error'] } return { 'account_id': account_id, 'emails': result.get('emails', []), 'total': result.get('total_emails', 0), 'unread': result.get('unread_count', 0) } except Exception as e: logger.error(f"Fast fetch failed for {account_id}: {e}") return { 'account_id': account_id, 'emails': [], 'total': 0, 'unread': 0, 'error': str(e) } def fast_fetch_all_accounts(limit: int = 50, unread_only: bool = True, account_manager=None) -> Dict[str, Any]: """ Fast parallel fetch from all accounts Args: limit: Maximum number of emails to return unread_only: Only return unread emails account_manager: Optional AccountManager instance to reuse (avoids re-reading config) Returns: Combined results from all accounts """ # Check cache first cache_key = f"fast_fetch_{unread_only}_{limit}" cached = get_cached_result(cache_key) if cached: return cached start_time = datetime.now() if account_manager is None: account_manager = AccountManager() accounts = account_manager.list_accounts() if not accounts: return { 'emails': [], 'total_emails': 0, 'total_unread': 0, 'accounts_count': 0, 'accounts_info': [] } # Calculate limit per account limit_per_account = max(10, limit // len(accounts)) # Fetch from all accounts in parallel all_results = [] failed_accounts = [] with ThreadPoolExecutor(max_workers=min(5, len(accounts))) as executor: future_to_account = { executor.submit( fast_fetch_single_account, acc['id'], limit_per_account, unread_only ): acc for acc in accounts } for future in as_completed(future_to_account, timeout=10): account = future_to_account[future] try: result = future.result() if 'error' in result: failed_accounts.append({ 'account': account['email'], 'error': result['error'] }) else: # Add account email to result result['account_email'] = account['email'] result['provider'] = account['provider'] all_results.append(result) except Exception as e: logger.error(f"Failed to fetch from {account['email']}: {e}") failed_accounts.append({ 'account': account['email'], 'error': str(e) }) # Combine results all_emails = [] accounts_info = [] total_emails = 0 total_unread = 0 for result in all_results: emails = result.get('emails', []) # Add account info to each email for email in emails: email['account'] = result['account_email'] all_emails.extend(emails) accounts_info.append({ 'account': result['account_email'], 'provider': result['provider'], 'fetched': len(emails), 'total': result.get('total', 0), 'unread': result.get('unread', 0) }) total_emails += result.get('total', 0) total_unread += result.get('unread', 0) # Sort by date (newest first) all_emails.sort(key=lambda x: x.get('date', ''), reverse=True) # Apply global limit all_emails = all_emails[:limit] elapsed_time = (datetime.now() - start_time).total_seconds() result = { 'emails': all_emails, 'total_emails': total_emails, 'total_unread': total_unread, 'accounts_count': len(accounts), 'accounts_info': accounts_info, 'failed_accounts': failed_accounts, 'fetch_time': elapsed_time } # Cache the result set_cached_result(cache_key, result) logger.info(f"Fast fetch completed in {elapsed_time:.2f}s - {len(all_emails)} emails from {len(accounts)} accounts") return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server