Skip to main content
Glama
leeguooooo
by leeguooooo
sync_manager.py16.3 kB
""" Synchronization manager for email database """ import logging import threading from typing import Dict, Any, List, Optional, Set from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed import time from ..account_manager import AccountManager from ..connection_manager import ConnectionManager from .email_database import EmailDatabase from ..operations.folder_operations import FolderOperations from ..operations.email_operations import EmailOperations logger = logging.getLogger(__name__) class SyncManager: """Manages email synchronization between IMAP servers and local database""" def __init__(self, db: EmailDatabase, account_manager: AccountManager): self.db = db self.account_manager = account_manager self._sync_threads = {} self._stop_event = threading.Event() self._sync_lock = threading.Lock() # Sync configuration self.config = { 'sync_interval': 300, # 5 minutes 'initial_days': 30, # Initial sync for last 30 days 'max_workers': 5, # Max parallel sync workers 'batch_size': 100, # Emails per batch 'sync_body_days': 7, # Sync full body for last 7 days } def start_auto_sync(self): """Start automatic synchronization in background""" def sync_loop(): while not self._stop_event.is_set(): try: self.sync_all_accounts() except Exception as e: logger.error(f"Auto sync error: {e}") # Wait for next sync interval self._stop_event.wait(self.config['sync_interval']) sync_thread = threading.Thread(target=sync_loop, daemon=True) sync_thread.start() logger.info("Auto sync started") def stop_auto_sync(self): """Stop automatic synchronization""" self._stop_event.set() logger.info("Auto sync stopped") def sync_all_accounts(self, full_sync: bool = False) -> Dict[str, Any]: """Sync all accounts""" start_time = datetime.now() results = { 'success': [], 'failed': [], 'total_synced': 0, 'duration': 0 } accounts = self.account_manager.list_accounts() with ThreadPoolExecutor(max_workers=self.config['max_workers']) as executor: future_to_account = { executor.submit(self.sync_account, acc['id'], full_sync): acc for acc in accounts } for future in as_completed(future_to_account): account = future_to_account[future] try: result = future.result() results['success'].append({ 'account': account['email'], 'synced': result['emails_synced'] }) results['total_synced'] += result['emails_synced'] except Exception as e: logger.error(f"Failed to sync {account['email']}: {e}") results['failed'].append({ 'account': account['email'], 'error': str(e) }) results['duration'] = (datetime.now() - start_time).total_seconds() logger.info(f"Sync completed: {results['total_synced']} emails in {results['duration']:.2f}s") return results def sync_account(self, account_id: str, full_sync: bool = False) -> Dict[str, Any]: """Sync a single account""" with self._sync_lock: if account_id in self._sync_threads and self._sync_threads[account_id].is_alive(): logger.warning(f"Sync already in progress for {account_id}") return {'status': 'already_running'} account = self.account_manager.get_account(account_id) if not account: raise ValueError(f"Account {account_id} not found") # Update account in database self.db.upsert_account(account) start_time = datetime.now() result = { 'account_id': account_id, 'emails_synced': 0, 'folders_synced': 0, 'errors': [] } try: conn_mgr = ConnectionManager(account) # Sync folders first folder_result = self._sync_folders(account_id, conn_mgr) result['folders_synced'] = folder_result['count'] # Get folders to sync folders = self.db.conn.execute( "SELECT * FROM folders WHERE account_id = ?", (account_id,) ).fetchall() # Sync each folder for folder in folders: try: email_count = self._sync_folder_emails( account_id, dict(folder), conn_mgr, full_sync ) result['emails_synced'] += email_count except Exception as e: logger.error(f"Failed to sync folder {folder['folder_name']}: {e}") result['errors'].append(f"Folder {folder['folder_name']}: {str(e)}") # Update last sync time self.db.conn.execute( "UPDATE accounts SET last_sync_time = ?, sync_status = ? WHERE id = ?", (datetime.now(), 'completed', account_id) ) self.db.conn.commit() # Log sync self.db.log_sync( account_id=account_id, sync_type='full' if full_sync else 'incremental', start_time=start_time, end_time=datetime.now(), emails_synced=result['emails_synced'], status='success' if not result['errors'] else 'partial' ) except Exception as e: logger.error(f"Account sync failed for {account_id}: {e}") self.db.conn.execute( "UPDATE accounts SET sync_status = ? WHERE id = ?", ('failed', account_id) ) self.db.conn.commit() self.db.log_sync( account_id=account_id, sync_type='full' if full_sync else 'incremental', start_time=start_time, end_time=datetime.now(), status='failed', error_message=str(e) ) raise return result def _sync_folders(self, account_id: str, conn_mgr: ConnectionManager) -> Dict[str, Any]: """Sync folder structure""" folder_ops = FolderOperations(conn_mgr) result = folder_ops.list_folders() if not result.get('success'): raise Exception(f"Failed to list folders: {result.get('error')}") folders = result.get('folders', []) for folder in folders: self.db.upsert_folder(account_id, folder) return {'count': len(folders)} def _sync_folder_emails(self, account_id: str, folder: Dict[str, Any], conn_mgr: ConnectionManager, full_sync: bool = False) -> int: """Sync emails in a specific folder""" folder_id = folder['id'] folder_name = folder['folder_name'] last_sync_uid = folder.get('last_sync_uid', 0) if not full_sync else 0 email_ops = EmailOperations(conn_mgr) emails_synced = 0 try: # Connect to folder with conn_mgr.get_connection() as conn: conn.select(folder_name) # Search for emails to sync if last_sync_uid > 0: # Incremental sync: get UIDs greater than last sync search_criteria = f'UID {last_sync_uid + 1}:*' else: # Full sync or initial sync: get emails from last N days since_date = datetime.now() - timedelta(days=self.config['initial_days']) search_criteria = f'SINCE {since_date.strftime("%d-%b-%Y")}' # Search for UIDs _, uid_data = conn.uid('search', None, search_criteria) uids = uid_data[0].split() if not uids: return 0 # Process in batches for i in range(0, len(uids), self.config['batch_size']): batch_uids = uids[i:i + self.config['batch_size']] # Fetch email headers and metadata uid_list = b','.join(batch_uids) fetch_data = '(UID FLAGS RFC822.SIZE BODY.PEEK[HEADER])' _, messages = conn.uid('fetch', uid_list, fetch_data) for msg_data in messages: if isinstance(msg_data, tuple): try: email_data = self._parse_email_data(msg_data) if email_data: # Store in database self.db.upsert_email(email_data, account_id, folder_id) emails_synced += 1 # Update last sync UID if email_data['uid'] > last_sync_uid: last_sync_uid = email_data['uid'] except Exception as e: logger.error(f"Failed to parse email: {e}") # Update folder's last sync UID self.db.conn.execute( "UPDATE folders SET last_sync_uid = ?, message_count = ?, " "unread_count = (SELECT COUNT(*) FROM emails WHERE folder_id = ? AND is_read = 0) " "WHERE id = ?", (last_sync_uid, len(uids), folder_id, folder_id) ) self.db.conn.commit() except Exception as e: logger.error(f"Failed to sync folder {folder_name}: {e}") raise return emails_synced def _parse_email_data(self, msg_data: tuple) -> Optional[Dict[str, Any]]: """Parse IMAP message data into email dictionary""" try: # Extract response parts msg_info = msg_data[0] msg_header = msg_data[1] # Parse flags and UID import imaplib import email from email.header import decode_header # Parse message info msg_dict = imaplib.ParseFlags(msg_info) uid = int(msg_dict.get(b'UID', 0)) flags = msg_dict.get(b'FLAGS', []) size = int(msg_dict.get(b'RFC822.SIZE', 0)) # Parse email headers email_message = email.message_from_bytes(msg_header) # Extract basic info subject = self._decode_header(email_message.get('Subject', '')) from_header = email_message.get('From', '') from_addr, from_name = email.utils.parseaddr(from_header) # Parse date date_str = email_message.get('Date', '') try: date_tuple = email.utils.parsedate_to_datetime(date_str) date = date_tuple.isoformat() if date_tuple else None except: date = None # Parse recipients to_addrs = [] for addr in email_message.get_all('To', []): to_addrs.extend([a[1] for a in email.utils.getaddresses([addr])]) cc_addrs = [] for addr in email_message.get_all('Cc', []): cc_addrs.extend([a[1] for a in email.utils.getaddresses([addr])]) return { 'uid': uid, 'message_id': email_message.get('Message-ID', ''), 'subject': subject, 'from': from_addr, 'from_name': from_name, 'to': to_addrs, 'cc': cc_addrs, 'date': date, 'size': size, 'unread': b'\\Seen' not in flags, 'flagged': b'\\Flagged' in flags, 'headers': dict(email_message.items()) } except Exception as e: logger.error(f"Failed to parse email data: {e}") return None def _decode_header(self, header: str) -> str: """Decode email header handling various encodings""" if not header: return '' from email.header import decode_header decoded_parts = [] for part, encoding in decode_header(header): if isinstance(part, bytes): try: decoded_parts.append(part.decode(encoding or 'utf-8')) except: decoded_parts.append(part.decode('utf-8', errors='replace')) else: decoded_parts.append(str(part)) return ' '.join(decoded_parts) def sync_email_body(self, email_id: int) -> bool: """Sync full email body on demand""" try: # Get email info cursor = self.db.conn.execute(""" SELECT e.*, a.*, f.folder_name FROM emails e JOIN accounts a ON e.account_id = a.id JOIN folders f ON e.folder_id = f.id WHERE e.id = ? """, (email_id,)) row = cursor.fetchone() if not row: return False email = dict(row) account = self.account_manager.get_account(email['account_id']) conn_mgr = ConnectionManager(account) # Fetch full email with conn_mgr.get_connection() as conn: conn.select(email['folder_name']) # Fetch body _, msg_data = conn.uid('fetch', str(email['uid']), '(BODY.PEEK[])') if msg_data and msg_data[0]: import email email_message = email.message_from_bytes(msg_data[0][1]) # Extract body body_text = '' body_html = '' for part in email_message.walk(): content_type = part.get_content_type() if content_type == 'text/plain' and not body_text: body_text = part.get_payload(decode=True).decode('utf-8', errors='replace') elif content_type == 'text/html' and not body_html: body_html = part.get_payload(decode=True).decode('utf-8', errors='replace') # Update database with self.db.transaction() as db_conn: db_conn.execute(""" INSERT OR REPLACE INTO email_contents (email_id, body_text, body_html) VALUES (?, ?, ?) """, (email_id, body_text, body_html)) db_conn.execute( "UPDATE emails SET sync_status = 'full' WHERE id = ?", (email_id,) ) return True except Exception as e: logger.error(f"Failed to sync email body: {e}") return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server