"""
后台定时同步调度器
"""
import logging
import threading
import time
import schedule
from typing import Dict, Any, Optional, Callable
from datetime import datetime, timedelta
import json
from pathlib import Path
from ..operations.email_sync import EmailSyncManager
from .sync_config import SyncConfigManager
logger = logging.getLogger(__name__)
class SyncScheduler:
"""邮件同步调度器"""
def __init__(self, config_file: str = "data/sync_config.json"):
"""初始化调度器"""
self.config_file = Path(config_file)
self.config = self._load_config()
self.sync_manager = EmailSyncManager(
self.config.get('storage', {}).get('db_path', 'data/email_sync.db'),
self.config
)
self.scheduler_thread = None
self.running = False
self.stop_event = threading.Event()
self.last_sync_times = {}
self.sync_callbacks = []
def _load_config(self) -> Dict[str, Any]:
"""加载同步配置"""
try:
manager = SyncConfigManager(str(self.config_file))
raw_config = manager.get_config()
except Exception as e:
logger.error(f"Failed to load config via SyncConfigManager: {e}, using legacy load")
raw_config = {}
# 兼容旧配置(扁平化)
sync_cfg = raw_config.get('sync', {}) if isinstance(raw_config.get('sync'), dict) else {}
normalized = {
"sync": {
"enabled": sync_cfg.get('enabled', raw_config.get('enabled', True)),
"interval_minutes": sync_cfg.get('interval_minutes', raw_config.get('sync_interval_minutes', 15)),
"full_sync_hours": sync_cfg.get('full_sync_hours', raw_config.get('full_sync_interval_hours', 24)),
"startup_delay_seconds": sync_cfg.get('startup_delay_seconds', raw_config.get('startup_delay_seconds', 30)),
},
"quiet_hours": raw_config.get('quiet_hours', {
"enabled": False,
"start": "23:00",
"end": "06:00"
}),
"retry": raw_config.get('retry', raw_config.get('retry_settings', {
"max_retries": 3,
"retry_delay_minutes": 5
})),
"performance": raw_config.get('performance', {
"max_concurrent_accounts": 2,
"batch_size": 50,
"request_delay_ms": 100
}),
"auto_cleanup": raw_config.get('cleanup', raw_config.get('auto_cleanup', {
"enabled": True,
"days_to_keep": 90,
"cleanup_interval_hours": 24
})),
"storage": raw_config.get('storage', {
"db_path": "data/email_sync.db"
})
}
# 保存回文件(保持兼容)
self._save_config(normalized)
return normalized
def _save_config(self, config: Dict[str, Any]):
"""保存配置"""
try:
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Failed to save config: {e}")
def start_scheduler(self):
"""启动调度器"""
if self.running:
logger.warning("Scheduler is already running")
return
if not self.config.get('sync', {}).get('enabled', True):
logger.info("Sync scheduler is disabled in config")
return
logger.info("Starting email sync scheduler")
self.running = True
self.stop_event.clear()
# 设置定时任务
self._setup_schedules()
# 启动调度线程
self.scheduler_thread = threading.Thread(target=self._scheduler_worker, daemon=True)
self.scheduler_thread.start()
logger.info(f"Scheduler started with {len(schedule.jobs)} jobs")
# 启动时立即执行一次同步(在后台线程中异步执行)
startup_delay = self.config.get('sync', {}).get('startup_delay_seconds', 30)
logger.info(f"Will trigger initial sync in {startup_delay} seconds")
def delayed_initial_sync():
"""延迟启动初始同步"""
time.sleep(startup_delay)
if self.running:
logger.info("Triggering initial sync on startup")
try:
self._scheduled_sync(full_sync=False)
except Exception as e:
logger.error(f"Initial sync failed: {e}")
initial_sync_thread = threading.Thread(target=delayed_initial_sync, daemon=True)
initial_sync_thread.start()
def stop_scheduler(self):
"""停止调度器"""
if not self.running:
return
logger.info("Stopping email sync scheduler")
self.running = False
self.stop_event.set()
# 清除所有计划任务
schedule.clear()
# 等待线程结束
if self.scheduler_thread and self.scheduler_thread.is_alive():
self.scheduler_thread.join(timeout=5)
logger.info("Scheduler stopped")
def _setup_schedules(self):
"""设置定时任务"""
sync_cfg = self.config.get('sync', {})
interval_minutes = sync_cfg.get('interval_minutes', 15)
full_sync_hours = sync_cfg.get('full_sync_hours', 24)
# 增量同步任务
schedule.every(interval_minutes).minutes.do(self._scheduled_sync, full_sync=False)
# 完全同步任务(按小时间隔)
if full_sync_hours and full_sync_hours > 0:
schedule.every(full_sync_hours).hours.do(self._scheduled_sync, full_sync=True)
# 清理任务(如果启用)
if self.config.get('auto_cleanup', {}).get('enabled', False):
cleanup_hours = self.config.get('auto_cleanup', {}).get('cleanup_interval_hours', 24)
schedule.every(cleanup_hours).hours.do(self._cleanup_old_emails)
logger.info(f"Scheduled incremental sync every {interval_minutes} minutes")
logger.info(f"Scheduled full sync daily at 02:00")
def _scheduler_worker(self):
"""调度器工作线程"""
while self.running and not self.stop_event.is_set():
try:
schedule.run_pending()
time.sleep(30) # 每30秒检查一次
except Exception as e:
logger.error(f"Scheduler worker error: {e}")
time.sleep(60) # 出错后等待1分钟再继续
def _scheduled_sync(self, full_sync: bool = False):
"""执行定时同步"""
if not self._should_sync_now():
logger.info("Skipping sync due to quiet hours")
return
sync_type = "full" if full_sync else "incremental"
logger.info(f"Starting scheduled {sync_type} sync")
try:
# 执行同步
result = self._sync_with_retry(full_sync)
# 记录同步时间
self.last_sync_times[sync_type] = datetime.now()
# 通知回调
self._notify_callbacks('sync_completed', {
'type': sync_type,
'result': result,
'timestamp': datetime.now()
})
logger.info(f"Scheduled {sync_type} sync completed: "
f"{result.get('emails_added', 0)} added, "
f"{result.get('emails_updated', 0)} updated")
except Exception as e:
logger.error(f"Scheduled {sync_type} sync failed: {e}")
self._notify_callbacks('sync_failed', {
'type': sync_type,
'error': str(e),
'timestamp': datetime.now()
})
def _sync_with_retry(self, full_sync: bool) -> Dict[str, Any]:
"""带重试的同步"""
retry_cfg = self.config.get('retry', {})
max_retries = retry_cfg.get('max_retries', 3)
retry_delay = retry_cfg.get('retry_delay_minutes', 5)
last_error = None
for attempt in range(max_retries + 1):
try:
if attempt > 0:
logger.info(f"Sync retry attempt {attempt}/{max_retries}")
time.sleep(retry_delay * 60) # 转换为秒
max_workers = self.config.get('performance', {}).get('max_concurrent_accounts', 2)
result = self.sync_manager.sync_all_accounts(
full_sync=full_sync,
max_workers=max_workers
)
if result.get('success'):
return result
else:
last_error = result.get('error', 'Unknown sync error')
except Exception as e:
last_error = str(e)
logger.warning(f"Sync attempt {attempt + 1} failed: {e}")
raise Exception(f"Sync failed after {max_retries} retries: {last_error}")
def _should_sync_now(self) -> bool:
"""检查当前是否应该执行同步"""
quiet_hours = self.config.get('quiet_hours', {})
if not quiet_hours.get('enabled', False):
return True
now = datetime.now().time()
start_time = datetime.strptime(quiet_hours.get('start', '23:00'), '%H:%M').time()
end_time = datetime.strptime(quiet_hours.get('end', '06:00'), '%H:%M').time()
# 处理跨午夜的情况
if start_time <= end_time:
# 同一天内的时间段
return not (start_time <= now <= end_time)
else:
# 跨午夜的时间段
return not (now >= start_time or now <= end_time)
def _cleanup_old_emails(self):
"""清理旧邮件"""
if not self.config.get('auto_cleanup', {}).get('enabled', False):
return
days_to_keep = self.config.get('auto_cleanup', {}).get('days_to_keep', 90)
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
cutoff_date_str = cutoff_date.isoformat() # Convert to ISO string for SQLite TEXT column
logger.info(f"Starting cleanup of emails older than {days_to_keep} days (before {cutoff_date_str})")
try:
# 软删除旧邮件
cursor = self.sync_manager.db.conn.execute("""
UPDATE emails SET is_deleted = TRUE, updated_at = CURRENT_TIMESTAMP
WHERE date_sent < ? AND is_deleted = FALSE
""", (cutoff_date_str,))
deleted_count = cursor.rowcount
self.sync_manager.db.conn.commit()
logger.info(f"Cleanup completed: {deleted_count} emails marked as deleted")
# 通知回调
self._notify_callbacks('cleanup_completed', {
'deleted_count': deleted_count,
'cutoff_date': cutoff_date,
'timestamp': datetime.now()
})
except Exception as e:
logger.error(f"Cleanup failed: {e}")
self._notify_callbacks('cleanup_failed', {
'error': str(e),
'timestamp': datetime.now()
})
def force_sync(self, full_sync: bool = False) -> Dict[str, Any]:
"""强制立即同步"""
logger.info(f"Force sync requested (full_sync={full_sync})")
try:
result = self._sync_with_retry(full_sync)
# 更新最后同步时间
sync_type = "full" if full_sync else "incremental"
self.last_sync_times[sync_type] = datetime.now()
return result
except Exception as e:
logger.error(f"Force sync failed: {e}")
return {
'success': False,
'error': str(e)
}
def get_sync_status(self) -> Dict[str, Any]:
"""获取同步状态"""
status = {
'scheduler_running': self.running,
'config': self.config,
'last_sync_times': {
k: v.isoformat() if isinstance(v, datetime) else v
for k, v in self.last_sync_times.items()
},
'next_jobs': []
}
# 获取下次执行时间
for job in schedule.jobs:
status['next_jobs'].append({
'job': str(job.job_func),
'next_run': job.next_run.isoformat() if job.next_run else None,
'interval': str(job.interval)
})
# 获取数据库状态
db_status = self.sync_manager.get_sync_status()
status.update(db_status)
return status
def update_config(self, new_config: Dict[str, Any]) -> bool:
"""更新配置"""
try:
# 验证配置
self._validate_config(new_config)
# 备份当前配置
old_config = self.config.copy()
# 更新配置
# 合并 sync 嵌套配置
if 'sync' in new_config:
self.config.setdefault('sync', {}).update(new_config['sync'])
else:
# 兼容扁平字段
if 'sync_interval_minutes' in new_config:
self.config.setdefault('sync', {})['interval_minutes'] = new_config['sync_interval_minutes']
if 'full_sync_interval_hours' in new_config:
self.config.setdefault('sync', {})['full_sync_hours'] = new_config['full_sync_interval_hours']
if 'enabled' in new_config:
self.config.setdefault('sync', {})['enabled'] = new_config['enabled']
for key in ['quiet_hours', 'retry', 'performance', 'auto_cleanup', 'storage']:
if key in new_config:
if isinstance(new_config[key], dict):
self.config.setdefault(key, {}).update(new_config[key])
else:
self.config[key] = new_config[key]
self._save_config(self.config)
# 如果调度器正在运行,重新设置任务
if self.running:
schedule.clear()
self._setup_schedules()
logger.info("Scheduler jobs updated with new config")
logger.info("Configuration updated successfully")
return True
except Exception as e:
logger.error(f"Failed to update config: {e}")
# 恢复原配置
self.config = old_config
return False
def _validate_config(self, config: Dict[str, Any]):
"""验证配置"""
sync_cfg = config.get('sync', config)
interval = sync_cfg.get('interval_minutes') if isinstance(sync_cfg, dict) else config.get('sync_interval_minutes')
if interval is not None:
if not isinstance(interval, int) or interval < 1 or interval > 1440:
raise ValueError("sync.interval_minutes must be between 1 and 1440")
full_interval = sync_cfg.get('full_sync_hours') if isinstance(sync_cfg, dict) else config.get('full_sync_interval_hours')
if full_interval is not None:
if not isinstance(full_interval, int) or full_interval < 1 or full_interval > 168:
raise ValueError("sync.full_sync_hours must be between 1 and 168")
# 验证时间格式
quiet_hours = config.get('quiet_hours', {})
if quiet_hours.get('enabled'):
for time_key in ['start', 'end']:
if time_key in quiet_hours:
try:
datetime.strptime(quiet_hours[time_key], '%H:%M')
except ValueError:
raise ValueError(f"Invalid time format for quiet_hours.{time_key}, use HH:MM")
def add_callback(self, callback: Callable[[str, Dict[str, Any]], None]):
"""添加同步事件回调"""
self.sync_callbacks.append(callback)
def remove_callback(self, callback: Callable[[str, Dict[str, Any]], None]):
"""移除同步事件回调"""
if callback in self.sync_callbacks:
self.sync_callbacks.remove(callback)
def _notify_callbacks(self, event_type: str, data: Dict[str, Any]):
"""通知所有回调"""
for callback in self.sync_callbacks:
try:
callback(event_type, data)
except Exception as e:
logger.error(f"Callback error: {e}")
def close(self):
"""关闭调度器"""
self.stop_scheduler()
if self.sync_manager:
self.sync_manager.close()
# 全局调度器实例
_scheduler_instance = None
def get_scheduler() -> SyncScheduler:
"""获取全局调度器实例"""
global _scheduler_instance
if _scheduler_instance is None:
_scheduler_instance = SyncScheduler()
return _scheduler_instance
def start_background_sync():
"""启动后台同步"""
scheduler = get_scheduler()
scheduler.start_scheduler()
return scheduler
def stop_background_sync():
"""停止后台同步"""
global _scheduler_instance
if _scheduler_instance:
_scheduler_instance.stop_scheduler()
_scheduler_instance = None