# coding=utf-8
"""
推送记录管理模块
管理推送记录,支持每日只推送一次和时间窗口控制
通过 storage_backend 统一存储,支持本地 SQLite 和远程云存储
"""
from datetime import datetime
from typing import Callable, Optional, Any, Tuple
import pytz
from trendradar.utils.time import DEFAULT_TIMEZONE, TimeWindowChecker
class PushRecordManager:
"""
推送记录管理器
通过 storage_backend 统一管理推送记录:
- 本地环境:使用 LocalStorageBackend,数据存储在本地 SQLite
- GitHub Actions:使用 RemoteStorageBackend,数据存储在云端
这样 once_per_day 功能在 GitHub Actions 上也能正常工作。
"""
def __init__(
self,
storage_backend: Any,
get_time_func: Optional[Callable[[], datetime]] = None,
):
"""
初始化推送记录管理器
Args:
storage_backend: 存储后端实例(LocalStorageBackend 或 RemoteStorageBackend)
get_time_func: 获取当前时间的函数(应使用配置的时区)
"""
self.storage_backend = storage_backend
self.get_time = get_time_func or self._default_get_time
print(f"[推送记录] 使用 {storage_backend.backend_name} 存储后端")
def _default_get_time(self) -> datetime:
"""默认时间获取函数(使用 storage_backend 的时区配置)"""
timezone = getattr(self.storage_backend, 'timezone', DEFAULT_TIMEZONE)
return datetime.now(pytz.timezone(timezone))
def has_pushed_today(self) -> bool:
"""
检查今天是否已经推送过
Returns:
是否已推送
"""
return self.storage_backend.has_pushed_today()
def record_push(self, report_type: str) -> bool:
"""
记录推送
Args:
report_type: 报告类型
Returns:
是否记录成功
"""
return self.storage_backend.record_push(report_type)
def is_in_time_range(self, start_time: str, end_time: str) -> bool:
"""
检查当前时间是否在指定时间范围内
Args:
start_time: 开始时间(格式:HH:MM)
end_time: 结束时间(格式:HH:MM)
Returns:
是否在时间范围内
"""
checker = TimeWindowChecker(
storage_backend=self.storage_backend,
get_time_func=self.get_time,
window_name="推送窗口",
)
return checker.is_in_time_range(start_time, end_time)
def check_push_window(self, window_config: dict) -> Tuple[bool, str]:
"""
检查推送窗口控制
Args:
window_config: 推送窗口配置
Returns:
(should_push, reason) 元组
"""
checker = TimeWindowChecker(
storage_backend=self.storage_backend,
get_time_func=self.get_time,
window_name="推送窗口",
)
return checker.check_window(
window_config=window_config,
check_once_per_day_func=self.has_pushed_today,
)
def check_ai_analysis_window(self, window_config: dict) -> Tuple[bool, str]:
"""
检查 AI 分析窗口控制
Args:
window_config: AI 分析窗口配置
Returns:
(should_analyze, reason) 元组
"""
checker = TimeWindowChecker(
storage_backend=self.storage_backend,
get_time_func=self.get_time,
window_name="AI 分析窗口",
)
return checker.check_window(
window_config=window_config,
check_once_per_day_func=self.storage_backend.has_ai_analyzed_today,
)
def get_push_status(self, window_config: dict) -> dict:
"""
获取推送状态信息
Args:
window_config: 推送窗口配置
Returns:
状态信息字典
"""
checker = TimeWindowChecker(
storage_backend=self.storage_backend,
get_time_func=self.get_time,
window_name="推送窗口",
)
status = checker.get_status(
window_config=window_config,
check_once_per_day_func=self.has_pushed_today,
)
status["window_type"] = "push"
return status
def get_ai_analysis_status(self, window_config: dict) -> dict:
"""
获取 AI 分析状态信息
Args:
window_config: AI 分析窗口配置
Returns:
状态信息字典
"""
checker = TimeWindowChecker(
storage_backend=self.storage_backend,
get_time_func=self.get_time,
window_name="AI 分析窗口",
)
status = checker.get_status(
window_config=window_config,
check_once_per_day_func=self.storage_backend.has_ai_analyzed_today,
)
status["window_type"] = "ai_analysis"
return status
def reset_push_state(self) -> bool:
"""
重置今日推送状态
Returns:
是否重置成功
"""
try:
# 通过存储后端重置推送记录
if hasattr(self.storage_backend, 'reset_push_state'):
return self.storage_backend.reset_push_state()
else:
print("[推送记录] 存储后端不支持重置推送状态")
return False
except Exception as e:
print(f"[推送记录] 重置推送状态失败: {e}")
return False
def reset_ai_analysis_state(self) -> bool:
"""
重置今日 AI 分析状态
Returns:
是否重置成功
"""
try:
if hasattr(self.storage_backend, 'reset_ai_analysis_state'):
return self.storage_backend.reset_ai_analysis_state()
else:
print("[推送记录] 存储后端不支持重置 AI 分析状态")
return False
except Exception as e:
print(f"[推送记录] 重置 AI 分析状态失败: {e}")
return False