Skip to main content
Glama
leeguooooo
by leeguooooo
sync_config.py13 kB
""" 同步配置管理模块 """ import json import logging from typing import Dict, Any, Optional from pathlib import Path from datetime import datetime logger = logging.getLogger(__name__) class SyncConfigManager: """同步配置管理器""" DEFAULT_CONFIG = { "sync": { "enabled": True, "auto_start": True, # 服务启动时自动开始同步 "interval_minutes": 5, # 增量同步间隔(分钟) "full_sync_hours": 24, # 完全同步间隔(小时) "startup_delay_seconds": 30, # 启动延迟,避免启动时资源冲突 }, "quiet_hours": { "enabled": False, "start": "23:00", # 静默开始时间 "end": "06:00", # 静默结束时间 "timezone": "local" # 时区设置 }, "performance": { "max_concurrent_accounts": 2, # 最大并发账户数 "batch_size": 50, # 邮件批处理大小 "request_delay_ms": 100, # 请求间隔(毫秒) "timeout_seconds": 30, # 单个操作超时时间 "max_memory_mb": 512, # 最大内存使用(MB) }, "retry": { "max_attempts": 3, # 最大重试次数 "delay_minutes": 5, # 重试延迟(分钟) "exponential_backoff": True, # 指数退避 "max_delay_minutes": 60, # 最大延迟时间 }, "storage": { "db_path": "data/email_sync.db", "compress_content": True, # 压缩邮件内容 "vacuum_interval_days": 7, # 数据库清理间隔 "backup_enabled": True, # 自动备份 "backup_keep_days": 30, # 备份保留天数 }, "cleanup": { "enabled": True, "days_to_keep": 90, # 邮件保留天数 "cleanup_interval_hours": 24, # 清理间隔 "soft_delete": True, # 软删除(标记删除而不是物理删除) "compress_old_emails": True, # 压缩旧邮件 }, "folders": { "sync_all": True, # 同步所有文件夹 "exclude_folders": [ # 排除的文件夹 "[Gmail]/All Mail", "[Gmail]/Important", "[Gmail]/Chats", "Junk", "Spam" ], "priority_folders": [ # 优先同步的文件夹 "INBOX", "Sent", "Drafts" ] }, "notifications": { "enabled": False, "new_emails": False, # 新邮件通知 "sync_completion": False, # 同步完成通知 "errors": True, # 错误通知 "webhook_url": None, # Webhook通知URL }, "logging": { "level": "INFO", "file_enabled": True, "file_path": "sync.log", "max_file_size_mb": 10, "backup_count": 5, } } def __init__(self, config_file: str = "data/sync_config.json"): """初始化配置管理器""" self.config_file = Path(config_file) self.config = self._load_config() self._validate_config() def _load_config(self) -> Dict[str, Any]: """加载配置文件""" if self.config_file.exists(): try: with open(self.config_file, 'r', encoding='utf-8') as f: user_config = json.load(f) # 深度合并配置 config = self._deep_merge(self.DEFAULT_CONFIG.copy(), user_config) logger.info(f"Loaded config from {self.config_file}") return config except Exception as e: logger.error(f"Failed to load config from {self.config_file}: {e}") logger.info("Using default configuration") # 创建默认配置文件 self.save_config(self.DEFAULT_CONFIG) return self.DEFAULT_CONFIG.copy() def _deep_merge(self, base: Dict[str, Any], update: Dict[str, Any]) -> Dict[str, Any]: """深度合并字典""" for key, value in update.items(): if key in base and isinstance(base[key], dict) and isinstance(value, dict): base[key] = self._deep_merge(base[key], value) else: base[key] = value return base def _validate_config(self): """验证配置""" errors = [] # 验证同步间隔 sync_interval = self.config.get('sync', {}).get('interval_minutes', 15) if not isinstance(sync_interval, int) or sync_interval < 1 or sync_interval > 1440: errors.append("sync.interval_minutes must be between 1 and 1440") # 验证完全同步间隔 full_sync_hours = self.config.get('sync', {}).get('full_sync_hours', 24) if not isinstance(full_sync_hours, int) or full_sync_hours < 1 or full_sync_hours > 168: errors.append("sync.full_sync_hours must be between 1 and 168") # 验证静默时间 quiet_hours = self.config.get('quiet_hours', {}) if quiet_hours.get('enabled'): for time_key in ['start', 'end']: time_str = quiet_hours.get(time_key) if time_str: try: datetime.strptime(time_str, '%H:%M') except ValueError: errors.append(f"quiet_hours.{time_key} must be in HH:MM format") # 验证性能设置 perf = self.config.get('performance', {}) max_accounts = perf.get('max_concurrent_accounts', 2) if not isinstance(max_accounts, int) or max_accounts < 1 or max_accounts > 10: errors.append("performance.max_concurrent_accounts must be between 1 and 10") batch_size = perf.get('batch_size', 50) if not isinstance(batch_size, int) or batch_size < 1 or batch_size > 1000: errors.append("performance.batch_size must be between 1 and 1000") # 验证重试设置 retry = self.config.get('retry', {}) max_attempts = retry.get('max_attempts', 3) if not isinstance(max_attempts, int) or max_attempts < 1 or max_attempts > 10: errors.append("retry.max_attempts must be between 1 and 10") # 验证清理设置 cleanup = self.config.get('cleanup', {}) if cleanup.get('enabled'): days_to_keep = cleanup.get('days_to_keep', 90) if not isinstance(days_to_keep, int) or days_to_keep < 1 or days_to_keep > 3650: errors.append("cleanup.days_to_keep must be between 1 and 3650") if errors: raise ValueError(f"Configuration validation failed: {'; '.join(errors)}") logger.info("Configuration validation passed") def save_config(self, config: Dict[str, Any] = None): """保存配置到文件""" if config is None: config = self.config try: # 添加元数据 config_with_meta = { "_metadata": { "version": "1.0", "updated_at": datetime.now().isoformat(), "description": "MCP Email Service Sync Configuration" }, **config } with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(config_with_meta, f, indent=2, ensure_ascii=False) logger.info(f"Configuration saved to {self.config_file}") except Exception as e: logger.error(f"Failed to save config: {e}") raise def update_config(self, updates: Dict[str, Any]) -> bool: """更新配置""" try: # 备份当前配置 backup_config = self.config.copy() # 深度合并更新 self.config = self._deep_merge(self.config, updates) # 验证新配置 self._validate_config() # 保存配置 self.save_config() logger.info("Configuration updated successfully") return True except Exception as e: # 恢复备份配置 self.config = backup_config logger.error(f"Failed to update config: {e}") return False def get_config(self, path: str = None) -> Any: """获取配置值""" if path is None: return self.config keys = path.split('.') value = self.config for key in keys: if isinstance(value, dict) and key in value: value = value[key] else: return None return value def set_config(self, path: str, value: Any) -> bool: """设置配置值""" keys = path.split('.') config = self.config # 导航到父级 for key in keys[:-1]: if key not in config: config[key] = {} config = config[key] # 设置值 config[keys[-1]] = value try: self._validate_config() self.save_config() return True except Exception as e: logger.error(f"Failed to set config {path}: {e}") return False def reset_to_defaults(self) -> bool: """重置为默认配置""" try: self.config = self.DEFAULT_CONFIG.copy() self.save_config() logger.info("Configuration reset to defaults") return True except Exception as e: logger.error(f"Failed to reset config: {e}") return False def export_config(self, file_path: str) -> bool: """导出配置到指定文件""" try: export_path = Path(file_path) export_data = { "exported_at": datetime.now().isoformat(), "version": "1.0", "config": self.config } with open(export_path, 'w', encoding='utf-8') as f: json.dump(export_data, f, indent=2, ensure_ascii=False) logger.info(f"Configuration exported to {export_path}") return True except Exception as e: logger.error(f"Failed to export config: {e}") return False def import_config(self, file_path: str) -> bool: """从文件导入配置""" try: import_path = Path(file_path) if not import_path.exists(): raise FileNotFoundError(f"Config file not found: {import_path}") with open(import_path, 'r', encoding='utf-8') as f: import_data = json.load(f) # 提取配置部分 if 'config' in import_data: imported_config = import_data['config'] else: imported_config = import_data # 更新配置 return self.update_config(imported_config) except Exception as e: logger.error(f"Failed to import config: {e}") return False def get_config_summary(self) -> Dict[str, Any]: """获取配置摘要""" return { "sync_enabled": self.config.get('sync', {}).get('enabled', False), "sync_interval_minutes": self.config.get('sync', {}).get('interval_minutes', 15), "full_sync_hours": self.config.get('sync', {}).get('full_sync_hours', 24), "quiet_hours_enabled": self.config.get('quiet_hours', {}).get('enabled', False), "max_concurrent_accounts": self.config.get('performance', {}).get('max_concurrent_accounts', 2), "cleanup_enabled": self.config.get('cleanup', {}).get('enabled', False), "days_to_keep": self.config.get('cleanup', {}).get('days_to_keep', 90), "db_path": self.config.get('storage', {}).get('db_path', 'data/email_sync.db'), "log_level": self.config.get('logging', {}).get('level', 'INFO'), "config_file": str(self.config_file) } # 全局配置管理器实例 _config_manager_instance = None def get_config_manager() -> SyncConfigManager: """获取全局配置管理器实例""" global _config_manager_instance if _config_manager_instance is None: _config_manager_instance = SyncConfigManager() return _config_manager_instance

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server