"""
Legacy operations from original main.py
Preserved for backward compatibility
"""
import imaplib
import sqlite3
import email
from email.header import decode_header
import logging
from typing import Dict, Any, Optional, List, Union
from datetime import datetime
import json
from pathlib import Path
import os
from datetime import datetime
from .connection_manager import ConnectionManager
from .account_manager import AccountManager
from .operations.cached_operations import CachedEmailOperations
logger = logging.getLogger(__name__)
# Initialize account manager
account_manager = AccountManager()
def get_connection_manager(account_id: Optional[str] = None) -> ConnectionManager:
"""Get connection manager for account"""
account = account_manager.get_account(account_id)
if not account:
raise ValueError("No email account configured")
return ConnectionManager(account)
def decode_mime_words(s):
"""
Decode MIME encoded words in headers with robust charset handling.
Handles non-standard charsets like 'unknown-8bit' by falling back to latin-1.
"""
if not s:
return ""
decoded = decode_header(s)
result = ""
for text, encoding in decoded:
if isinstance(text, bytes):
# Normalize and validate encoding
if encoding:
encoding_lower = encoding.lower()
# Map known problematic encodings to safe alternatives
if encoding_lower in ('unknown-8bit', 'x-unknown', '8bit', 'unknown'):
encoding = 'latin-1' # Safe fallback that handles all byte values
else:
encoding = 'utf-8' # Default for None
try:
result += text.decode(encoding, errors='ignore')
except (LookupError, UnicodeDecodeError):
# Final fallback for truly unknown codecs
logger.debug(f"Unknown encoding '{encoding}', falling back to latin-1")
result += text.decode('latin-1', errors='replace')
else:
result += text
return result
def safe_parse_email(raw_email: bytes) -> email.message.Message:
"""
安全地解析邮件,支持编码容错
"""
try:
return email.message_from_bytes(raw_email)
except (LookupError, UnicodeDecodeError) as e:
logger.warning(f"Email parsing failed with encoding error: {e}, using latin-1 fallback")
try:
text = raw_email.decode('latin-1', errors='replace')
return email.message_from_string(text)
except Exception as fallback_error:
logger.error(f"Failed to parse email even with fallback: {fallback_error}")
raise
# Parse raw email safely
def _close_mail_safely(mail):
"""Best-effort IMAP cleanup; tolerate AUTH state."""
try:
if mail is None:
return
state = getattr(mail, "state", None)
if state == "SELECTED":
try:
mail.close()
except Exception:
pass
mail.logout()
except Exception as exc:
logger.warning(f"Error closing IMAP connection: {exc}")
def _get_sync_db_path() -> Path:
"""Locate sync cache DB path (best-effort)."""
try:
from .config.paths import EMAIL_SYNC_DB
return Path(EMAIL_SYNC_DB)
except Exception:
return Path("data/email_sync.db")
def _lookup_message_id_from_sync_db(account_id: str, uid: str):
"""Best-effort lookup of Message-ID from local sync DB by account/uid."""
db_path = _get_sync_db_path()
if not db_path.exists():
return None
try:
import sqlite3
conn = sqlite3.connect(db_path)
cursor = conn.execute(
"SELECT message_id FROM emails WHERE account_id = ? AND uid = ? ORDER BY id DESC LIMIT 1",
(account_id, str(uid)),
)
row = cursor.fetchone()
conn.close()
if row and row[0]:
return row[0]
except Exception:
pass
return None
def _mark_sync_cache_read(account_id: str, uid: str):
"""Best-effort mark local sync cache as read for given account/uid."""
db_path = _get_sync_db_path()
if not db_path.exists():
return
try:
import sqlite3
conn = sqlite3.connect(db_path)
conn.execute(
"UPDATE emails SET is_read = 1, updated_at = CURRENT_TIMESTAMP WHERE account_id = ? AND uid = ?",
(account_id, str(uid)),
)
conn.commit()
conn.close()
except Exception:
return
"""
安全地解析邮件,支持编码容错
当遇到未知编码(如 'unknown-8bit')时,使用 latin-1 作为回退编码。
latin-1 可以解码任何字节序列,确保邮件不会因编码问题而丢失。
Args:
raw_email: 原始邮件字节数据
Returns:
解析后的邮件对象
"""
try:
# 首先尝试正常解析
return email.message_from_bytes(raw_email)
except (LookupError, UnicodeDecodeError) as e:
# 编码错误时使用 latin-1 强制解码
logger.warning(f"Email parsing failed with encoding error: {e}, using latin-1 fallback")
try:
# latin-1 可以解码任何字节序列,不会抛出异常
text = raw_email.decode('latin-1', errors='replace')
return email.message_from_string(text)
except Exception as fallback_error:
# 如果仍然失败,记录错误并抛出
logger.error(f"Failed to parse email even with fallback: {fallback_error}")
raise
def _normalize_folder_name(folder: Optional[Union[str, bytes]]) -> Union[str, bytes]:
"""
Prepare folder/mailbox name for IMAP commands.
Ensures spaces and special characters are properly quoted and
non-ASCII names are IMAP UTF-7 encoded.
"""
if not folder:
return "INBOX"
if isinstance(folder, bytes):
return folder
# Strip leading/trailing whitespace but keep original if it matters
folder_str = folder.strip() or "INBOX"
# Encode to IMAP UTF-7 when non-ASCII characters are present
try:
folder_str.encode('ascii')
encoded = folder_str
except UnicodeEncodeError:
try:
encoded_bytes = imaplib.IMAP4._encode_utf7(folder_str)
encoded = encoded_bytes.decode('ascii') if isinstance(encoded_bytes, bytes) else encoded_bytes
except Exception:
encoded = folder_str
try:
return imaplib.IMAP4._quote(encoded)
except Exception:
# Fallback: add double quotes if there are spaces or special chars
if any(ch.isspace() for ch in encoded) and not (encoded.startswith('"') and encoded.endswith('"')):
return f'"{encoded}"'
return encoded
def get_mailbox_status(folder="INBOX", account_id=None):
"""Get mailbox status"""
try:
conn_mgr = get_connection_manager(account_id)
mail = conn_mgr.connect_imap()
# Select the folder
selected_folder = _normalize_folder_name(folder)
result, data = mail.select(selected_folder)
if result != 'OK':
raise Exception(f"Cannot select folder {folder}")
# Get total emails
result, data = mail.search(None, 'ALL')
total = len(data[0].split()) if data[0] else 0
# Get unread emails
result, data = mail.search(None, 'UNSEEN')
unread = len(data[0].split()) if data[0] else 0
mail.logout()
return {
"total_emails": total,
"unread_emails": unread,
"folder": folder,
"account": conn_mgr.email
}
except Exception as e:
logger.error(f"Error getting mailbox status: {e}")
return {"error": str(e)}
def get_unread_count(account_id=None):
"""Get unread email count"""
try:
result = get_mailbox_status("INBOX", account_id)
if "error" in result:
raise Exception(result["error"])
return result["unread_emails"]
except Exception as e:
logger.error(f"Error getting unread count: {e}")
raise
def check_connection():
"""Check all email connections"""
try:
accounts = account_manager.list_accounts()
if accounts:
results = []
for account_info in accounts:
account_id = account_info['id']
try:
conn_mgr = get_connection_manager(account_id)
test_result = conn_mgr.test_connection()
results.append(test_result)
except Exception as e:
results.append({
'email': account_info['email'],
'provider': account_info['provider'],
'imap': {'success': False, 'error': str(e)},
'smtp': {'success': False, 'error': 'Not tested'}
})
return {
'success': True,
'accounts': results,
'total_accounts': len(accounts)
}
else:
# Fallback to env vars
conn_mgr = get_connection_manager()
return conn_mgr.test_connection()
except Exception as e:
logger.error(f"Connection check failed: {e}")
return {'error': str(e)}
def fetch_emails(limit=50, unread_only=False, folder="INBOX", account_id=None, use_cache=False):
"""
Fetch emails from account
Args:
limit: Maximum number of emails to fetch
unread_only: Only fetch unread emails
folder: Folder name (default: INBOX)
account_id: Specific account ID (None = all accounts)
use_cache: Try to use cached data from email_sync.db (default: False)
"""
try:
resolved_account = None
if account_id:
resolved_account = account_manager.get_account(account_id)
if not resolved_account:
return {"error": f"No email account configured for {account_id}"}
# Use internal account ID for cache/DB lookups
account_id = resolved_account.get('id', account_id)
# If no specific account, fetch from all accounts
# CRITICAL: Must check this BEFORE trying cache, because cache only works for single account
if account_id is None and account_manager.list_accounts():
if use_cache:
try:
cache = CachedEmailOperations()
if cache.is_available():
accounts = account_manager.list_accounts()
combined_emails = []
accounts_info = []
total_emails = 0
total_unread = 0
for acc in accounts:
acc_id = acc['id']
acc_email = acc.get('email')
cached = cache.list_emails_cached(
limit=limit,
unread_only=unread_only,
folder=folder,
account_id=acc_id,
max_age_minutes=60
)
if cached:
emails = cached.get('emails', [])
for em in emails:
em['account'] = acc_email
em['account_id'] = acc_id
em['source'] = 'cache_sync_db'
combined_emails.extend(emails)
accounts_info.append({
'account': acc_email,
'total': cached.get('total_in_folder', 0),
'unread': cached.get('unread_count', 0),
'fetched': len(emails)
})
total_emails += cached.get('total_in_folder', 0)
total_unread += cached.get('unread_count', 0)
if combined_emails:
combined_emails.sort(key=lambda x: x.get('date', ''), reverse=True)
combined_emails = combined_emails[:limit]
return {
'emails': combined_emails,
'total_emails': total_emails,
'total_unread': total_unread,
'accounts_count': len(accounts),
'accounts_info': accounts_info,
'from_cache': True
}
else:
return {'error': 'Cache empty for all accounts', 'success': False}
logger.info("Cache unavailable or empty for multi-account, falling back to live fetch")
except Exception as exc:
logger.warning(f"Cache read failed for multi-account: {exc}, falling back to live fetch")
return fetch_emails_multi_account(limit, unread_only, folder, use_cache)
# Try cache first if requested (only for single account)
if use_cache and account_id is not None:
try:
from .operations.cached_operations import CachedEmailOperations
cache = CachedEmailOperations()
if cache.is_available():
logger.info(f"Using cached email data for account {account_id}")
result = cache.list_emails_cached(
limit=limit,
unread_only=unread_only,
folder=folder,
account_id=account_id
)
# Cache returns None if expired/missing, or dict (even if emails=[]) if valid
if result is not None:
logger.info(f"Cache hit: {len(result.get('emails', []))} emails (age: {result.get('cache_age_minutes', 0):.1f} min)")
return result
else:
return {'error': 'Cache miss or expired', 'success': False}
except Exception as e:
logger.warning(f"Cache fetch failed, falling back to live IMAP: {e}")
# Single account fetch
conn_mgr = get_connection_manager(account_id)
mail = conn_mgr.connect_imap()
try:
# Select folder and check status
selected_folder = _normalize_folder_name(folder)
result, data = mail.select(selected_folder)
if result != 'OK':
# Log as debug instead of error to reduce noise
logger.debug(f"Folder '{folder}' not available: {data}")
return {
'emails': [],
'total_in_folder': 0,
'unread_count': 0,
'error': f"Folder '{folder}' not available"
}
# CRITICAL: Use UID search instead of sequence numbers
# UIDs are stable even when messages are added/deleted
if unread_only:
result, data = mail.uid('search', None, 'UNSEEN')
else:
result, data = mail.uid('search', None, 'ALL')
email_uids = data[0].split()
total_in_folder = len(email_uids)
# Get unread count
result, data = mail.uid('search', None, 'UNSEEN')
unread_count = len(data[0].split()) if data[0] else 0
# Limit emails (UIDs)
email_uids = email_uids[-limit:]
email_uids.reverse()
emails = []
for email_uid in email_uids:
try:
# CRITICAL: Use UID FETCH to get stable identifiers
result, data = mail.uid('fetch', email_uid, '(RFC822)')
raw_email = data[0][1]
msg = safe_parse_email(raw_email)
# Parse email data
from_addr = decode_mime_words(msg.get("From", ""))
subject = decode_mime_words(msg.get("Subject", "No Subject"))
date_str = msg.get("Date", "")
# Parse date
try:
date_tuple = email.utils.parsedate_to_datetime(date_str)
date_formatted = date_tuple.strftime("%Y-%m-%d %H:%M:%S")
except:
date_formatted = date_str
# Check if unread (use UID)
result, data = mail.uid('fetch', email_uid, '(FLAGS)')
# UID fetch returns: [(b'123 (UID 456 FLAGS (\\Seen))', b'')]
# We need to extract the FLAGS from the first element
flags_str = ""
if data and data[0]:
# data[0] is a tuple: (b'...', b'')
# We want the first element of that tuple
flags_response = data[0]
if isinstance(flags_response, tuple) and flags_response[0]:
flags_str = flags_response[0].decode('utf-8') if isinstance(flags_response[0], bytes) else str(flags_response[0])
elif isinstance(flags_response, bytes):
flags_str = flags_response.decode('utf-8')
is_unread = '\\Seen' not in flags_str
# CRITICAL: Return UID as the primary ID (stable identifier)
uid_str = email_uid.decode() if isinstance(email_uid, bytes) else str(email_uid)
email_info = {
"id": uid_str, # UID - stable even when messages are added/deleted
"uid": uid_str, # Explicit UID field for clarity
"from": from_addr,
"subject": subject,
"date": date_formatted,
"unread": is_unread,
"account": conn_mgr.email,
"account_id": conn_mgr.account_id # Canonical account ID for routing
}
emails.append(email_info)
except Exception as e:
logger.warning(f"Failed to fetch email UID {email_uid}: {e}")
return {
"emails": emails,
"total_in_folder": total_in_folder,
"unread_count": unread_count,
"folder": folder,
"account": conn_mgr.email
}
finally:
try:
mail.logout()
except Exception as e:
logger.warning(f"Error closing IMAP connection: {e}")
except Exception as e:
logger.error(f"Error fetching emails: {e}")
return {"error": str(e)}
def fetch_emails_multi_account(limit=50, unread_only=False, folder="INBOX", use_cache=False):
"""Fetch emails from multiple accounts (now using parallel fetching)"""
accounts = account_manager.list_accounts()
if not accounts:
return {
'emails': [],
'total_emails': 0,
'total_unread': 0,
'accounts_count': 0,
'accounts_info': []
}
# Use parallel fetching for better performance
try:
from .operations.parallel_fetch import fetch_emails_parallel
logger.info(f"Using parallel fetch for {len(accounts)} accounts")
# Note: parallel_fetch may not support use_cache yet, will fallback if needed
return fetch_emails_parallel(accounts, limit, unread_only, folder, use_cache)
except (ImportError, TypeError) as e:
# TypeError can occur if parallel_fetch doesn't accept use_cache parameter
if isinstance(e, TypeError):
logger.warning("Parallel fetch doesn't support use_cache, falling back to sequential")
else:
logger.warning("Parallel fetch not available, falling back to sequential")
# Fallback to sequential if parallel not available
return fetch_emails_multi_account_sequential(limit, unread_only, folder, use_cache)
except Exception as e:
logger.error(f"Parallel fetch failed: {e}, falling back to sequential")
return fetch_emails_multi_account_sequential(limit, unread_only, folder, use_cache)
def fetch_emails_multi_account_sequential(limit=50, unread_only=False, folder="INBOX", use_cache=False):
"""Original sequential fetch (fallback)"""
all_emails = []
accounts_info = []
total_emails = 0
total_unread = 0
accounts = account_manager.list_accounts()
for account_info in accounts:
try:
account_id = account_info['id']
# CRITICAL: Forward use_cache to per-account fetch
result = fetch_emails(limit, unread_only, folder, account_id, use_cache)
if "error" not in result:
emails = result['emails']
for email in emails:
email['account'] = account_info['email']
all_emails.extend(emails)
acc_info = {
'account': account_info['email'],
'total': result['total_in_folder'],
'unread': result['unread_count'],
'fetched': len(emails)
}
accounts_info.append(acc_info)
total_emails += result['total_in_folder']
total_unread += result['unread_count']
except Exception as e:
logger.warning(f"Failed to fetch from {account_info['email']}: {e}")
# Sort by date
all_emails.sort(key=lambda x: x['date'], reverse=True)
# Limit total results
all_emails = all_emails[:limit]
return {
'emails': all_emails,
'total_emails': total_emails,
'total_unread': total_unread,
'accounts_count': len(accounts),
'accounts_info': accounts_info
}
def get_email_detail(email_id, folder="INBOX", account_id=None):
"""Get detailed email content"""
try:
conn_mgr = get_connection_manager(account_id)
mail = conn_mgr.connect_imap()
# 优先使用指定文件夹,Gmail 账户若失败则回退到 [Gmail]/All Mail
folders_to_try = [folder or "INBOX"]
if conn_mgr.provider == "gmail":
normalized = _normalize_folder_name(folder)
if "All Mail" not in str(normalized): # 避免重复添加
folders_to_try.append("[Gmail]/All Mail")
last_error = None
selected_folder = None
for candidate in folders_to_try:
selected_folder = _normalize_folder_name(candidate)
result, data = mail.select(selected_folder)
if result == 'OK':
break
last_error = f"Cannot select folder '{candidate}': {data}"
selected_folder = None
if not selected_folder:
raise ValueError(last_error or f"Cannot select folder '{folder}'")
# Ensure email_id is a string
if isinstance(email_id, int):
email_id = str(email_id)
# CRITICAL: Try UID first (stable), fallback to sequence number
result, data = mail.uid('fetch', email_id, '(RFC822)')
# Check if UID fetch failed or returned empty
if result != 'OK' or not data or data[0] is None:
logger.warning(f"UID fetch failed for {email_id}, trying sequence number")
result, data = mail.fetch(email_id, '(RFC822)')
# Validate data
if result != 'OK':
raise Exception(f"Failed to fetch email {email_id}: {result}")
if not data or data[0] is None:
raise Exception(f"Email {email_id} not found or has been deleted")
raw_email = data[0][1]
msg = safe_parse_email(raw_email)
# Extract headers
from_addr = decode_mime_words(msg.get("From", ""))
to_addr = decode_mime_words(msg.get("To", ""))
cc_addr = decode_mime_words(msg.get("Cc", ""))
subject = decode_mime_words(msg.get("Subject", "No Subject"))
date_str = msg.get("Date", "")
message_id = msg.get("Message-ID", "")
# Parse date
try:
date_tuple = email.utils.parsedate_to_datetime(date_str)
date_formatted = date_tuple.strftime("%Y-%m-%d %H:%M:%S")
except:
date_formatted = date_str
# Extract body
body = ""
html_body = ""
attachments = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
if "attachment" in content_disposition:
filename = part.get_filename()
if filename:
decoded_filename = decode_mime_words(filename)
attachments.append({
"filename": decoded_filename,
"size": len(part.get_payload(decode=True)),
"content_type": content_type
})
elif content_type == "text/plain" and not body:
body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
elif content_type == "text/html" and not html_body:
html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
else:
body = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
# Check if unread (try UID first, then sequence)
result, data = mail.uid('fetch', email_id, '(FLAGS)')
if result != 'OK' or not data or data[0] is None:
result, data = mail.fetch(email_id, '(FLAGS)')
# Parse FLAGS from response (handle tuple structure)
flags_str = ""
if data and data[0]:
flags_response = data[0]
if isinstance(flags_response, tuple) and flags_response[0]:
flags_str = flags_response[0].decode('utf-8') if isinstance(flags_response[0], bytes) else str(flags_response[0])
elif isinstance(flags_response, bytes):
flags_str = flags_response.decode('utf-8')
is_unread = '\\Seen' not in flags_str
return {
"id": email_id.decode() if isinstance(email_id, bytes) else email_id,
"from": from_addr,
"to": to_addr,
"cc": cc_addr,
"subject": subject,
"date": date_formatted,
"body": body or html_body,
"html_body": html_body,
"has_html": bool(html_body),
"attachments": attachments,
"attachment_count": len(attachments),
"unread": is_unread,
"message_id": message_id,
"folder": folder,
"account": conn_mgr.email,
"account_id": conn_mgr.account_id # Canonical account ID for consistent routing
}
except Exception as e:
logger.error(f"Error getting email detail: {e}")
return {"error": str(e)}
finally:
try:
if 'mail' in locals():
mail.logout()
except Exception as e:
logger.warning(f"Error closing IMAP connection in get_email_detail: {e}")
def mark_email_read(email_id, folder="INBOX", account_id=None):
"""Mark email as read"""
try:
conn_mgr = get_connection_manager(account_id)
mail = conn_mgr.connect_imap()
# 优先使用指定文件夹,Gmail 账户若失败则回退到 [Gmail]/All Mail
folders_to_try = [folder or "INBOX"]
if conn_mgr.provider == "gmail":
normalized = _normalize_folder_name(folder)
if "All Mail" not in str(normalized): # 避免重复添加
folders_to_try.append("[Gmail]/All Mail")
last_error = None
selected_folder = None
for candidate in folders_to_try:
selected_folder = _normalize_folder_name(candidate)
result, data = mail.select(selected_folder)
if result == 'OK':
break
last_error = f"Cannot select folder '{candidate}': {data}"
selected_folder = None
if not selected_folder:
raise ValueError(last_error or f"Cannot select folder '{folder}'")
# Ensure email_id is a string
if isinstance(email_id, int):
email_id = str(email_id)
flags_to_add = r'(\Seen)'
success = False
msg_id = _lookup_message_id_from_sync_db(conn_mgr.account_id, email_id) if conn_mgr.provider == "gmail" else None
# 尝试在选中的文件夹直接用 UID/sequence 设为已读
result, data = mail.uid('store', email_id, '+FLAGS', flags_to_add)
if result == 'OK' and data and data[0]:
success = True
else:
logger.warning(f"UID store failed or empty response for {email_id}, trying sequence number")
result, data = mail.store(email_id, '+FLAGS', flags_to_add)
success = result == 'OK' and data and data[0]
# Gmail 特殊处理:UID 可能因标签不同而不匹配,基于 Message-ID 再搜一遍(当前文件夹)
if not success and conn_mgr.provider == "gmail" and msg_id:
try:
search_result, search_data = mail.uid('search', None, f'HEADER Message-ID "{msg_id}"')
if search_result == 'OK' and search_data and search_data[0]:
uid_list = search_data[0].split()
for alt_uid in uid_list:
alt_uid_decoded = alt_uid.decode() if isinstance(alt_uid, bytes) else alt_uid
result, data = mail.uid('store', alt_uid_decoded, '+FLAGS', flags_to_add)
if result == 'OK' and data and data[0]:
logger.info(f"Marked via Message-ID search UID={alt_uid_decoded} msgid={msg_id}")
success = True
break
except Exception as search_exc:
logger.warning(f"Gmail Message-ID search failed: {search_exc}")
if not success:
raise Exception(f"Failed to mark email {email_id} as read")
# Best-effort update local sync cache
_mark_sync_cache_read(conn_mgr.account_id, email_id)
return {"success": True, "message": "Email marked as read", "account": conn_mgr.email}
except Exception as e:
logger.error(f"Error marking email as read: {e}")
return {"error": str(e)}
finally:
try:
if 'mail' in locals():
_close_mail_safely(mail)
except Exception as e:
logger.warning(f"Error closing IMAP connection in mark_email_read: {e}")
def delete_email(email_id, folder="INBOX", account_id=None):
"""Delete email permanently"""
try:
conn_mgr = get_connection_manager(account_id)
mail = conn_mgr.connect_imap()
# 优先使用指定文件夹,Gmail 账户若失败则回退到 [Gmail]/All Mail
folders_to_try = [folder or "INBOX"]
if conn_mgr.provider == "gmail":
normalized = _normalize_folder_name(folder)
if "All Mail" not in str(normalized): # 避免重复添加
folders_to_try.append("[Gmail]/All Mail")
last_error = None
selected_folder = None
for candidate in folders_to_try:
selected_folder = _normalize_folder_name(candidate)
result, data = mail.select(selected_folder)
if result == 'OK':
break
last_error = f"Cannot select folder '{candidate}': {data}"
selected_folder = None
if not selected_folder:
raise ValueError(last_error or f"Cannot select folder '{folder}'")
# Ensure email_id is a string
if isinstance(email_id, int):
email_id = str(email_id)
deleted_flag = r'(\Deleted)'
# Try UID first, fallback to sequence number
result, data = mail.uid('store', email_id, '+FLAGS', deleted_flag)
if result != 'OK':
logger.warning(f"UID store failed for {email_id}, trying sequence number")
result, data = mail.store(email_id, '+FLAGS', deleted_flag)
if result != 'OK':
raise Exception(f"Failed to delete email {email_id}")
mail.expunge()
return {"success": True, "message": "Email deleted", "account": conn_mgr.email}
except Exception as e:
logger.error(f"Error deleting email: {e}")
return {"error": str(e)}
finally:
try:
if 'mail' in locals():
_close_mail_safely(mail)
except Exception as e:
logger.warning(f"Error closing IMAP connection in delete_email: {e}")
def move_email_to_trash(email_id, folder="INBOX", trash_folder="Trash", account_id=None):
"""Move email to trash folder"""
try:
conn_mgr = get_connection_manager(account_id)
mail = conn_mgr.connect_imap()
# 优先使用指定文件夹,Gmail 账户若失败则回退到 [Gmail]/All Mail
folders_to_try = [folder or "INBOX"]
if conn_mgr.provider == "gmail":
normalized = _normalize_folder_name(folder)
if "All Mail" not in str(normalized): # 避免重复添加
folders_to_try.append("[Gmail]/All Mail")
last_error = None
selected_folder = None
for candidate in folders_to_try:
selected_folder = _normalize_folder_name(candidate)
result, data = mail.select(selected_folder)
if result == 'OK':
break
last_error = f"Cannot select folder '{candidate}': {data}"
selected_folder = None
if not selected_folder:
raise ValueError(last_error or f"Cannot select folder '{folder}'")
target_folder = _normalize_folder_name(trash_folder)
# Ensure email_id is a string
if isinstance(email_id, int):
email_id = str(email_id)
# Try UID copy first, fallback to sequence number
result, data = mail.uid('copy', email_id, target_folder)
if result != 'OK':
logger.warning(f"UID copy failed for {email_id}, trying sequence number")
result, data = mail.copy(email_id, target_folder)
if result == 'OK':
# Mark as deleted in original folder (use UID)
deleted_flag = r'(\Deleted)'
result, data = mail.uid('store', email_id, '+FLAGS', deleted_flag)
if result != 'OK':
mail.store(email_id, '+FLAGS', deleted_flag)
mail.expunge()
return {"success": True, "message": f"Email moved to {trash_folder}", "account": conn_mgr.email}
else:
# If trash folder doesn't exist, just delete
# Use RFC-compliant flag format (same as successful COPY path)
deleted_flag = r'(\Deleted)'
result, data = mail.uid('store', email_id, '+FLAGS', deleted_flag)
if result != 'OK':
logger.warning(f"UID store failed for {email_id}, trying sequence number")
result, data = mail.store(email_id, '+FLAGS', deleted_flag)
# Check if deletion actually succeeded
if result != 'OK':
raise Exception(f"Failed to delete email {email_id} (trash folder not found, direct delete also failed)")
mail.expunge()
return {"success": True, "message": "Email deleted (no trash folder found)", "account": conn_mgr.email}
except Exception as e:
logger.error(f"Error moving email to trash: {e}")
return {"error": str(e)}
finally:
try:
if 'mail' in locals():
_close_mail_safely(mail)
except Exception as e:
logger.warning(f"Error closing IMAP connection in move_email_to_trash: {e}")
def batch_move_to_trash(email_ids, folder="INBOX", trash_folder="Trash", account_id=None):
"""Move multiple emails to trash"""
try:
conn_mgr = get_connection_manager(account_id)
mail = conn_mgr.connect_imap()
# 优先使用指定文件夹,Gmail 账户若失败则回退到 [Gmail]/All Mail
folders_to_try = [folder or "INBOX"]
if conn_mgr.provider == "gmail":
normalized = _normalize_folder_name(folder)
if "All Mail" not in str(normalized): # 避免重复添加
folders_to_try.append("[Gmail]/All Mail")
last_error = None
selected_folder = None
for candidate in folders_to_try:
selected_folder = _normalize_folder_name(candidate)
result, data = mail.select(selected_folder)
if result == 'OK':
break
last_error = f"Cannot select folder '{candidate}': {data}"
selected_folder = None
if not selected_folder:
raise ValueError(last_error or f"Cannot select folder '{folder}'")
target_folder = _normalize_folder_name(trash_folder)
# Check if trash folder exists using normalized name
result, folders = mail.list()
trash_exists = False
if result == 'OK' and folders:
# Compare normalized folder name against IMAP LIST response
# IMAP returns: (b'(\\HasNoChildren) "/" "Trash"', ...)
for folder_response in folders:
try:
if isinstance(folder_response, bytes):
folder_str = folder_response.decode('utf-8', errors='ignore')
# Extract folder name from IMAP LIST response
# Format: (flags) "delimiter" "folder_name"
if '\"' in folder_str:
parts = folder_str.split('\"')
if len(parts) >= 4:
folder_name = parts[3] # Fourth part is the folder name
# Normalize for comparison
normalized_response = _normalize_folder_name(folder_name)
if normalized_response == target_folder or folder_name == trash_folder:
trash_exists = True
break
except:
continue
moved_count = 0
failed_ids = []
if not trash_exists:
# Just delete if no trash folder
for email_id in email_ids:
try:
# Ensure email_id is a string
if isinstance(email_id, int):
email_id = str(email_id)
# Try UID first
deleted_flag = r'(\Deleted)'
result, data = mail.uid('store', email_id, '+FLAGS', deleted_flag)
if result != 'OK':
result, data = mail.store(email_id, '+FLAGS', deleted_flag)
if result == 'OK':
moved_count += 1
else:
failed_ids.append(email_id)
except:
failed_ids.append(email_id)
mail.expunge()
return {
"success": True,
"message": f"Deleted {moved_count}/{len(email_ids)} emails (no trash folder)",
"failed_ids": failed_ids,
"account": conn_mgr.email
}
# Move to trash
for email_id in email_ids:
try:
# Ensure email_id is a string
if isinstance(email_id, int):
email_id = str(email_id)
# Try UID copy first
result, data = mail.uid('copy', email_id, target_folder)
if result != 'OK':
result, data = mail.copy(email_id, target_folder)
if result == 'OK':
deleted_flag = r'(\Deleted)'
# Mark as deleted (try UID first)
store_result, _ = mail.uid('store', email_id, '+FLAGS', deleted_flag)
if store_result != 'OK':
store_result, _ = mail.store(email_id, '+FLAGS', deleted_flag)
if store_result == 'OK':
moved_count += 1
else:
failed_ids.append(email_id)
else:
failed_ids.append(email_id)
except Exception as e:
logger.warning(f"Failed to move email {email_id}: {e}")
failed_ids.append(email_id)
mail.expunge()
result_data = {
"success": True,
"message": f"Moved {moved_count}/{len(email_ids)} emails to {trash_folder}",
"account": conn_mgr.email
}
if failed_ids:
result_data["failed_ids"] = failed_ids
return result_data
except Exception as e:
logger.error(f"Error batch moving to trash: {e}")
return {"error": str(e)}
finally:
try:
if 'mail' in locals():
_close_mail_safely(mail)
except Exception as e:
logger.warning(f"Error closing IMAP connection in batch_move_to_trash: {e}")
def batch_delete_emails(email_ids, folder="INBOX", account_id=None, shared_connection=True):
"""
Permanently delete multiple emails
Args:
email_ids: List of email IDs to delete
folder: Folder name (default: INBOX)
account_id: Account ID
shared_connection: If True, use optimized single-connection version (default: True).
If False, delegate to delete_email for maximum reliability.
The shared_connection=True mode reuses one IMAP session but still expunges after
each delete, which is more reliable than batching all STOREs before expunging
(especially for QQ mail), while being much faster than opening a new connection
per email.
"""
if not email_ids:
return {"success": True, "message": "No emails to delete", "deleted_count": 0}
# Optimized path: share connection but expunge each delete
if shared_connection:
return _batch_delete_emails_shared_connection(email_ids, folder, account_id)
# Safe fallback: delegate to delete_email (multiple connections)
deleted_count = 0
failed_ids = []
account_email = None
# Delegate to delete_email for each ID
for email_id in email_ids:
try:
result = delete_email(email_id, folder=folder, account_id=account_id)
if 'error' in result:
logger.warning(f"Failed to delete email {email_id}: {result['error']}")
failed_ids.append(email_id)
else:
deleted_count += 1
# Capture account info from first successful delete
if account_email is None and 'account' in result:
account_email = result['account']
except Exception as e:
logger.warning(f"Failed to delete email {email_id}: {e}")
failed_ids.append(email_id)
# Build result
# Success only if ALL emails were deleted (no failures)
all_succeeded = (len(failed_ids) == 0)
result_data = {
"success": all_succeeded,
"deleted_count": deleted_count,
"total_count": len(email_ids)
}
# Message depends on success/failure
if all_succeeded:
result_data["message"] = f"Successfully deleted all {deleted_count} email(s)"
elif deleted_count == 0:
result_data["message"] = f"Failed to delete all {len(email_ids)} email(s)"
else:
result_data["message"] = f"Partially deleted: {deleted_count}/{len(email_ids)} email(s) succeeded"
if account_email:
result_data["account"] = account_email
if failed_ids:
result_data["failed_ids"] = failed_ids
result_data["failed_count"] = len(failed_ids)
return result_data
def _batch_delete_emails_shared_connection(email_ids, folder="INBOX", account_id=None):
"""
Optimized batch delete using a single shared IMAP connection.
Still expunges after each delete (reliable for QQ mail), but reuses the connection
to avoid the overhead of multiple connect/logout cycles.
"""
try:
conn_mgr = get_connection_manager(account_id)
mail = conn_mgr.connect_imap()
# 优先使用指定文件夹,Gmail 账户若失败则回退到 [Gmail]/All Mail
folders_to_try = [folder or "INBOX"]
if conn_mgr.provider == "gmail":
normalized = _normalize_folder_name(folder)
if "All Mail" not in str(normalized): # 避免重复添加
folders_to_try.append("[Gmail]/All Mail")
last_error = None
selected_folder = None
for candidate in folders_to_try:
selected_folder = _normalize_folder_name(candidate)
result, data = mail.select(selected_folder)
if result == 'OK':
break
last_error = f"Cannot select folder '{candidate}': {data}"
selected_folder = None
if not selected_folder:
raise ValueError(last_error or f"Cannot select folder '{folder}'")
deleted_count = 0
failed_ids = []
for email_id in email_ids:
try:
# Ensure email_id is a string
if isinstance(email_id, int):
email_id = str(email_id)
# Use RFC-compliant flag format
deleted_flag = r'(\Deleted)'
# Try UID first, fallback to sequence number
result, data = mail.uid('store', email_id, '+FLAGS', deleted_flag)
if result != 'OK':
logger.warning(f"UID store failed for {email_id}, trying sequence number")
result, data = mail.store(email_id, '+FLAGS', deleted_flag)
if result == 'OK':
# Expunge immediately after each successful STORE
# This is the key to QQ mail compatibility
mail.expunge()
deleted_count += 1
else:
logger.warning(f"Failed to delete email {email_id}: store command failed with {result}")
failed_ids.append(email_id)
except Exception as e:
logger.warning(f"Failed to delete email {email_id}: {e}")
failed_ids.append(email_id)
# Build result
all_succeeded = (len(failed_ids) == 0)
result_data = {
"success": all_succeeded,
"deleted_count": deleted_count,
"total_count": len(email_ids),
"account": conn_mgr.email
}
# Message depends on success/failure
if all_succeeded:
result_data["message"] = f"Successfully deleted all {deleted_count} email(s)"
elif deleted_count == 0:
result_data["message"] = f"Failed to delete all {len(email_ids)} email(s)"
else:
result_data["message"] = f"Partially deleted: {deleted_count}/{len(email_ids)} email(s) succeeded"
if failed_ids:
result_data["failed_ids"] = failed_ids
result_data["failed_count"] = len(failed_ids)
return result_data
except Exception as e:
logger.error(f"Error batch deleting emails: {e}")
return {"error": str(e)}
finally:
try:
if 'mail' in locals():
_close_mail_safely(mail)
except Exception as e:
logger.warning(f"Error closing IMAP connection in batch_delete_emails: {e}")
def batch_mark_read(email_ids, folder="INBOX", account_id=None):
"""Mark multiple emails as read"""
try:
conn_mgr = get_connection_manager(account_id)
mail = conn_mgr.connect_imap()
# 优先使用指定文件夹,Gmail 账户若失败则回退到 [Gmail]/All Mail
folders_to_try = [folder or "INBOX"]
if conn_mgr.provider == "gmail":
normalized = _normalize_folder_name(folder)
if "All Mail" not in str(normalized): # 避免重复添加
folders_to_try.append("[Gmail]/All Mail")
last_error = None
selected_folder = None
for candidate in folders_to_try:
selected_folder = _normalize_folder_name(candidate)
result, data = mail.select(selected_folder)
if result == 'OK':
break
last_error = f"Cannot select folder '{candidate}': {data}"
selected_folder = None
if not selected_folder:
raise ValueError(last_error or f"Cannot select folder '{folder}'")
marked_count = 0
failed_ids = []
for email_id in email_ids:
try:
# Ensure email_id is a string
if isinstance(email_id, int):
email_id = str(email_id)
# Try UID first
seen_flag = r'(\Seen)'
result, data = mail.uid('store', email_id, '+FLAGS', seen_flag)
if result != 'OK':
result, data = mail.store(email_id, '+FLAGS', seen_flag)
if result == 'OK':
marked_count += 1
else:
logger.warning(f"Failed to mark email {email_id} as read: store command failed with {result}")
failed_ids.append(email_id)
except Exception as e:
logger.warning(f"Failed to mark email {email_id} as read: {e}")
failed_ids.append(email_id)
result_data = {
"success": True,
"message": f"Marked {marked_count}/{len(email_ids)} emails as read",
"account": conn_mgr.email
}
if failed_ids:
result_data["failed_ids"] = failed_ids
return result_data
except Exception as e:
logger.error(f"Error batch marking emails as read: {e}")
return {"error": str(e)}
finally:
try:
if 'mail' in locals():
_close_mail_safely(mail)
except Exception as e:
logger.warning(f"Error closing IMAP connection in batch_mark_read: {e}")