Skip to main content
Glama
leeguooooo
by leeguooooo
cached_operations.py10.1 kB
""" 缓存邮件操作 - 从数据目录的 email_sync.db 读取缓存数据 提供比 IMAP 实时查询快 100-500x 的缓存读取能力 """ import sqlite3 import logging from typing import List, Dict, Any, Optional from datetime import datetime, timedelta import os from ..config.paths import EMAIL_SYNC_DB logger = logging.getLogger(__name__) class CachedEmailOperations: """从同步数据库缓存读取邮件""" def __init__(self, db_path=None): """ 初始化缓存操作 Args: db_path: SQLite 数据库路径(默认使用 data/email_sync.db) """ if db_path is None: db_path = EMAIL_SYNC_DB self.db_path = db_path # Don't cache DB existence - check dynamically # Service may start before sync creates the DB if not os.path.exists(db_path): logger.info(f"Sync database not found yet: {db_path}") logger.info("Will be created automatically by background sync") def is_available(self) -> bool: """检查缓存是否可用 (动态检查,不依赖缓存状态)""" # CRITICAL: Always check filesystem, don't rely on cached state # The DB may be created after service starts return os.path.exists(self.db_path) def list_emails_cached(self, limit: int = 50, unread_only: bool = False, folder: str = 'INBOX', account_id: Optional[str] = None, max_age_minutes: int = 10) -> Optional[Dict[str, Any]]: """ 从缓存读取邮件列表 Args: limit: 邮件数量限制 unread_only: 只显示未读邮件 folder: 文件夹名称 account_id: 账户ID(None 表示所有账户) max_age_minutes: 缓存最大有效期(分钟) Returns: 邮件列表字典,如果缓存不可用或过期返回 None """ if not self.is_available(): logger.debug("Cache not available, falling back to live fetch") return None try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row # 返回字典式结果 cursor = conn.cursor() # 先获取 folder_id(需要匹配 account_id) if account_id: cursor.execute(""" SELECT id FROM folders WHERE name = ? AND account_id = ? """, (folder, account_id)) else: cursor.execute(""" SELECT id FROM folders WHERE name = ? """, (folder,)) folder_row = cursor.fetchone() if not folder_row: logger.debug(f"Folder {folder} not found for account_id={account_id} in sync database") conn.close() return None folder_id = folder_row['id'] # 检查缓存新鲜度,使用 folders 表的 last_sync cursor.execute(""" SELECT last_sync FROM folders WHERE id = ? """, (folder_id,)) row = cursor.fetchone() last_sync_str = row['last_sync'] if row else None age_minutes = 0.0 if last_sync_str: try: last_sync = datetime.fromisoformat(last_sync_str) age_minutes = (datetime.now() - last_sync).total_seconds() / 60 if age_minutes > max_age_minutes: logger.debug(f"Cache expired ({age_minutes:.1f} min > {max_age_minutes} min)") conn.close() return None except (ValueError, TypeError) as e: logger.warning(f"Failed to parse last_sync time: {e}") conn.close() return None # 从缓存读取邮件(folder_id 已经在上面获取) query = """ SELECT uid, sender_email, subject, date_sent, is_read, message_id, account_id, size_bytes FROM emails WHERE folder_id = ? """ params = [folder_id] if account_id: query += " AND account_id = ?" params.append(account_id) if unread_only: query += " AND is_read = 0" query += " ORDER BY date_sent DESC LIMIT ?" params.append(limit) cursor.execute(query, params) rows = cursor.fetchall() # 转换为邮件列表 emails = [] for row in rows: # 格式化日期 date_str = row['date_sent'] if row['date_sent'] else "" emails.append({ 'id': row['uid'], 'uid': row['uid'], # 明确的 UID 字段 'from': row['sender_email'] or '', 'subject': row['subject'] or 'No Subject', 'date': date_str, 'unread': not row['is_read'], 'message_id': row['message_id'] or '', 'account_id': row['account_id'], 'size': row['size_bytes'] or 0 }) # 获取统计信息 if account_id: cursor.execute(""" SELECT COUNT(*) as total FROM emails WHERE account_id = ? AND folder_id = ? """, (account_id, folder_id)) total = cursor.fetchone()['total'] cursor.execute(""" SELECT COUNT(*) as unread FROM emails WHERE account_id = ? AND folder_id = ? AND is_read = 0 """, (account_id, folder_id)) unread_count = cursor.fetchone()['unread'] else: cursor.execute(""" SELECT COUNT(*) as total FROM emails WHERE folder_id = ? """, (folder_id,)) total = cursor.fetchone()['total'] cursor.execute(""" SELECT COUNT(*) as unread FROM emails WHERE folder_id = ? AND is_read = 0 """, (folder_id,)) unread_count = cursor.fetchone()['unread'] conn.close() logger.info(f"Loaded {len(emails)} emails from cache (age: {age_minutes:.1f} min)") return { "emails": emails, "total_in_folder": total, "unread_count": unread_count, "folder": folder, "from_cache": True, "cache_age_minutes": age_minutes, "account_id": account_id } except sqlite3.Error as e: logger.error(f"Database error reading cache: {e}") return None except Exception as e: logger.error(f"Error reading cached emails: {e}") return None def get_sync_status(self, account_id: Optional[str] = None) -> Dict[str, Any]: """ 获取同步状态 Args: account_id: 账户ID(None 表示所有账户) Returns: 同步状态字典 """ if not self.is_available(): return { "available": False, "message": "Sync database not found" } try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() if account_id: cursor.execute(""" SELECT account_id, COUNT(*) as email_count, MAX(last_synced) as last_sync, MIN(last_synced) as oldest_sync FROM emails WHERE account_id = ? GROUP BY account_id """, (account_id,)) else: cursor.execute(""" SELECT account_id, COUNT(*) as email_count, MAX(last_synced) as last_sync, MIN(last_synced) as oldest_sync FROM emails GROUP BY account_id """) accounts = [] for row in cursor.fetchall(): last_sync = datetime.fromisoformat(row['last_sync']) if row['last_sync'] else None age_minutes = (datetime.now() - last_sync).total_seconds() / 60 if last_sync else None accounts.append({ 'account_id': row['account_id'], 'email_count': row['email_count'], 'last_sync': row['last_sync'], 'age_minutes': age_minutes, 'is_fresh': age_minutes < 10 if age_minutes else False }) conn.close() return { "available": True, "accounts": accounts, "total_emails": sum(a['email_count'] for a in accounts) } except Exception as e: logger.error(f"Error getting sync status: {e}") return { "available": False, "error": str(e) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server