# coding=utf-8
"""
数据处理模块
提供数据读取和检测功能:
- read_all_today_titles: 从存储后端读取当天所有标题
- detect_latest_new_titles: 检测最新批次的新增标题
Author: TrendRadar Team
"""
from typing import Dict, List, Tuple, Optional
def read_all_today_titles_from_storage(
storage_manager,
current_platform_ids: Optional[List[str]] = None,
) -> Tuple[Dict, Dict, Dict]:
"""
从存储后端读取当天所有标题(SQLite 数据)
Args:
storage_manager: 存储管理器实例
current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
Returns:
Tuple[Dict, Dict, Dict]: (all_results, id_to_name, title_info)
"""
try:
news_data = storage_manager.get_today_all_data()
if not news_data or not news_data.items:
return {}, {}, {}
all_results = {}
final_id_to_name = {}
title_info = {}
for source_id, news_list in news_data.items.items():
# 按平台过滤
if current_platform_ids is not None and source_id not in current_platform_ids:
continue
# 获取来源名称
source_name = news_data.id_to_name.get(source_id, source_id)
final_id_to_name[source_id] = source_name
if source_id not in all_results:
all_results[source_id] = {}
title_info[source_id] = {}
for item in news_list:
title = item.title
ranks = item.ranks or [item.rank]
first_time = item.first_time or item.crawl_time
last_time = item.last_time or item.crawl_time
count = item.count
rank_timeline = item.rank_timeline
all_results[source_id][title] = {
"ranks": ranks,
"url": item.url or "",
"mobileUrl": item.mobile_url or "",
}
title_info[source_id][title] = {
"first_time": first_time,
"last_time": last_time,
"count": count,
"ranks": ranks,
"url": item.url or "",
"mobileUrl": item.mobile_url or "",
"rank_timeline": rank_timeline,
}
return all_results, final_id_to_name, title_info
except Exception as e:
print(f"[存储] 从存储后端读取数据失败: {e}")
return {}, {}, {}
def read_all_today_titles(
storage_manager,
current_platform_ids: Optional[List[str]] = None,
quiet: bool = False,
) -> Tuple[Dict, Dict, Dict]:
"""
读取当天所有标题(从存储后端)
Args:
storage_manager: 存储管理器实例
current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
quiet: 是否静默模式(不打印日志)
Returns:
Tuple[Dict, Dict, Dict]: (all_results, id_to_name, title_info)
"""
all_results, final_id_to_name, title_info = read_all_today_titles_from_storage(
storage_manager, current_platform_ids
)
if not quiet:
if all_results:
total_count = sum(len(titles) for titles in all_results.values())
print(f"[存储] 已从存储后端读取 {total_count} 条标题")
else:
print("[存储] 当天暂无数据")
return all_results, final_id_to_name, title_info
def detect_latest_new_titles_from_storage(
storage_manager,
current_platform_ids: Optional[List[str]] = None,
) -> Dict:
"""
从存储后端检测最新批次的新增标题
Args:
storage_manager: 存储管理器实例
current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
Returns:
Dict: 新增标题 {source_id: {title: title_data}}
"""
try:
# 获取最新抓取数据
latest_data = storage_manager.get_latest_crawl_data()
if not latest_data or not latest_data.items:
return {}
# 获取所有历史数据
all_data = storage_manager.get_today_all_data()
if not all_data or not all_data.items:
# 没有历史数据(第一次抓取),不应该有"新增"标题
return {}
# 获取最新批次时间
latest_time = latest_data.crawl_time
# 步骤1:收集最新批次的标题(last_crawl_time = latest_time 的标题)
latest_titles = {}
for source_id, news_list in latest_data.items.items():
if current_platform_ids is not None and source_id not in current_platform_ids:
continue
latest_titles[source_id] = {}
for item in news_list:
latest_titles[source_id][item.title] = {
"ranks": [item.rank],
"url": item.url or "",
"mobileUrl": item.mobile_url or "",
}
# 步骤2:收集历史标题
# 关键逻辑:一个标题只要其 first_crawl_time < latest_time,就是历史标题
# 这样即使同一标题有多条记录(URL 不同),只要任何一条是历史的,该标题就算历史
historical_titles = {}
for source_id, news_list in all_data.items.items():
if current_platform_ids is not None and source_id not in current_platform_ids:
continue
historical_titles[source_id] = set()
for item in news_list:
first_time = item.first_time or item.crawl_time
# 如果该记录的首次出现时间早于最新批次,则该标题是历史标题
if first_time < latest_time:
historical_titles[source_id].add(item.title)
# 检查是否是当天第一次抓取(没有任何历史标题)
# 如果所有平台的历史标题集合都为空,说明只有一个抓取批次
# 在这种情况下,将所有最新批次的标题视为"新增"(用于增量模式的第一次推送)
has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
if not has_historical_data:
# 第一次爬取:返回所有最新标题作为"新增"
return latest_titles
# 步骤3:找出新增标题 = 最新批次标题 - 历史标题
new_titles = {}
for source_id, source_latest_titles in latest_titles.items():
historical_set = historical_titles.get(source_id, set())
source_new_titles = {}
for title, title_data in source_latest_titles.items():
if title not in historical_set:
source_new_titles[title] = title_data
if source_new_titles:
new_titles[source_id] = source_new_titles
return new_titles
except Exception as e:
print(f"[存储] 从存储后端检测新标题失败: {e}")
return {}
def detect_latest_new_titles(
storage_manager,
current_platform_ids: Optional[List[str]] = None,
quiet: bool = False,
) -> Dict:
"""
检测当日最新批次的新增标题(从存储后端)
Args:
storage_manager: 存储管理器实例
current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
quiet: 是否静默模式(不打印日志)
Returns:
Dict: 新增标题 {source_id: {title: title_data}}
"""
new_titles = detect_latest_new_titles_from_storage(storage_manager, current_platform_ids)
if new_titles and not quiet:
total_new = sum(len(titles) for titles in new_titles.values())
print(f"[存储] 从存储后端检测到 {total_new} 条新增标题")
return new_titles