"""
数据解析服务
v2.0.0: 仅支持 SQLite 数据库,移除 TXT 文件支持
新存储结构:output/{type}/{date}.db
"""
import re
import sqlite3
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from datetime import datetime
import yaml
from ..utils.errors import FileParseError, DataNotFoundError
from .cache_service import get_cache
class ParserService:
"""数据解析服务类"""
def __init__(self, project_root: str = None):
"""
初始化解析服务
Args:
project_root: 项目根目录,默认为当前目录的父目录
"""
if project_root is None:
current_file = Path(__file__)
self.project_root = current_file.parent.parent.parent
else:
self.project_root = Path(project_root)
self.cache = get_cache()
@staticmethod
def clean_title(title: str) -> str:
"""清理标题文本"""
title = re.sub(r'\s+', ' ', title)
title = title.strip()
return title
def get_date_folder_name(self, date: datetime = None) -> str:
"""
获取日期字符串(ISO 格式)
Args:
date: 日期对象,默认为今天
Returns:
日期字符串(YYYY-MM-DD)
"""
if date is None:
date = datetime.now()
return date.strftime("%Y-%m-%d")
def _get_db_path(self, date: datetime = None, db_type: str = "news") -> Optional[Path]:
"""
获取数据库文件路径
新结构:output/{type}/{date}.db
Args:
date: 日期对象,默认为今天
db_type: 数据库类型 ("news" 或 "rss")
Returns:
数据库文件路径,如果不存在则返回 None
"""
date_str = self.get_date_folder_name(date)
db_path = self.project_root / "output" / db_type / f"{date_str}.db"
if db_path.exists():
return db_path
return None
def _read_from_sqlite(
self,
date: datetime = None,
platform_ids: Optional[List[str]] = None,
db_type: str = "news"
) -> Optional[Tuple[Dict, Dict, Dict]]:
"""
从 SQLite 数据库读取数据
Args:
date: 日期对象,默认为今天
platform_ids: 平台ID列表,None表示所有平台
db_type: 数据库类型 ("news" 或 "rss")
Returns:
(all_titles, id_to_name, all_timestamps) 元组,如果数据库不存在返回 None
"""
db_path = self._get_db_path(date, db_type)
if db_path is None:
return None
all_titles = {}
id_to_name = {}
all_timestamps = {}
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
if db_type == "news":
return self._read_news_from_sqlite(cursor, platform_ids, all_titles, id_to_name, all_timestamps)
elif db_type == "rss":
return self._read_rss_from_sqlite(cursor, platform_ids, all_titles, id_to_name, all_timestamps)
except Exception as e:
print(f"Warning: 从 SQLite 读取数据失败: {e}")
return None
finally:
if 'conn' in locals():
conn.close()
def _read_news_from_sqlite(
self,
cursor,
platform_ids: Optional[List[str]],
all_titles: Dict,
id_to_name: Dict,
all_timestamps: Dict
) -> Optional[Tuple[Dict, Dict, Dict]]:
"""从热榜数据库读取数据"""
# 检查表是否存在
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='news_items'
""")
if not cursor.fetchone():
return None
# 构建查询
if platform_ids:
placeholders = ','.join(['?' for _ in platform_ids])
query = f"""
SELECT n.id, n.platform_id, p.name as platform_name, n.title,
n.rank, n.url, n.mobile_url,
n.first_crawl_time, n.last_crawl_time, n.crawl_count
FROM news_items n
LEFT JOIN platforms p ON n.platform_id = p.id
WHERE n.platform_id IN ({placeholders})
"""
cursor.execute(query, platform_ids)
else:
cursor.execute("""
SELECT n.id, n.platform_id, p.name as platform_name, n.title,
n.rank, n.url, n.mobile_url,
n.first_crawl_time, n.last_crawl_time, n.crawl_count
FROM news_items n
LEFT JOIN platforms p ON n.platform_id = p.id
""")
rows = cursor.fetchall()
# 收集所有 news_item_id 用于查询历史排名
news_ids = [row['id'] for row in rows]
rank_history_map = {}
if news_ids:
placeholders = ",".join("?" * len(news_ids))
cursor.execute(f"""
SELECT news_item_id, rank FROM rank_history
WHERE news_item_id IN ({placeholders})
ORDER BY news_item_id, crawl_time
""", news_ids)
for rh_row in cursor.fetchall():
news_id = rh_row['news_item_id']
rank = rh_row['rank']
if news_id not in rank_history_map:
rank_history_map[news_id] = []
rank_history_map[news_id].append(rank)
for row in rows:
news_id = row['id']
platform_id = row['platform_id']
platform_name = row['platform_name'] or platform_id
title = row['title']
if platform_id not in id_to_name:
id_to_name[platform_id] = platform_name
if platform_id not in all_titles:
all_titles[platform_id] = {}
ranks = rank_history_map.get(news_id, [row['rank']])
all_titles[platform_id][title] = {
"ranks": ranks,
"url": row['url'] or "",
"mobileUrl": row['mobile_url'] or "",
"first_time": row['first_crawl_time'] or "",
"last_time": row['last_crawl_time'] or "",
"count": row['crawl_count'] or 1,
}
# 获取抓取时间作为 timestamps
cursor.execute("""
SELECT crawl_time, created_at FROM crawl_records
ORDER BY crawl_time
""")
for row in cursor.fetchall():
crawl_time = row['crawl_time']
created_at = row['created_at']
try:
ts = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S").timestamp()
except (ValueError, TypeError):
ts = datetime.now().timestamp()
all_timestamps[f"{crawl_time}.db"] = ts
if not all_titles:
return None
return (all_titles, id_to_name, all_timestamps)
def _read_rss_from_sqlite(
self,
cursor,
feed_ids: Optional[List[str]],
all_items: Dict,
id_to_name: Dict,
all_timestamps: Dict
) -> Optional[Tuple[Dict, Dict, Dict]]:
"""从 RSS 数据库读取数据"""
# 检查表是否存在
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='rss_items'
""")
if not cursor.fetchone():
return None
# 构建查询
if feed_ids:
placeholders = ','.join(['?' for _ in feed_ids])
query = f"""
SELECT i.id, i.feed_id, f.name as feed_name, i.title,
i.url, i.published_at, i.summary, i.author,
i.first_crawl_time, i.last_crawl_time, i.crawl_count
FROM rss_items i
LEFT JOIN rss_feeds f ON i.feed_id = f.id
WHERE i.feed_id IN ({placeholders})
ORDER BY i.published_at DESC
"""
cursor.execute(query, feed_ids)
else:
cursor.execute("""
SELECT i.id, i.feed_id, f.name as feed_name, i.title,
i.url, i.published_at, i.summary, i.author,
i.first_crawl_time, i.last_crawl_time, i.crawl_count
FROM rss_items i
LEFT JOIN rss_feeds f ON i.feed_id = f.id
ORDER BY i.published_at DESC
""")
rows = cursor.fetchall()
for row in rows:
feed_id = row['feed_id']
feed_name = row['feed_name'] or feed_id
title = row['title']
if feed_id not in id_to_name:
id_to_name[feed_id] = feed_name
if feed_id not in all_items:
all_items[feed_id] = {}
all_items[feed_id][title] = {
"url": row['url'] or "",
"published_at": row['published_at'] or "",
"summary": row['summary'] or "",
"author": row['author'] or "",
"first_time": row['first_crawl_time'] or "",
"last_time": row['last_crawl_time'] or "",
"count": row['crawl_count'] or 1,
}
# 获取抓取时间
cursor.execute("""
SELECT crawl_time, created_at FROM rss_crawl_records
ORDER BY crawl_time
""")
for row in cursor.fetchall():
crawl_time = row['crawl_time']
created_at = row['created_at']
try:
ts = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S").timestamp()
except (ValueError, TypeError):
ts = datetime.now().timestamp()
all_timestamps[f"{crawl_time}.db"] = ts
if not all_items:
return None
return (all_items, id_to_name, all_timestamps)
def read_all_titles_for_date(
self,
date: datetime = None,
platform_ids: Optional[List[str]] = None,
db_type: str = "news"
) -> Tuple[Dict, Dict, Dict]:
"""
读取指定日期的所有数据(带缓存)
Args:
date: 日期对象,默认为今天
platform_ids: 平台/Feed ID列表,None表示所有
db_type: 数据库类型 ("news" 或 "rss")
Returns:
(all_titles, id_to_name, all_timestamps) 元组
Raises:
DataNotFoundError: 数据不存在
"""
date_str = self.get_date_folder_name(date)
platform_key = ','.join(sorted(platform_ids)) if platform_ids else 'all'
cache_key = f"read_all:{db_type}:{date_str}:{platform_key}"
is_today = (date is None) or (date.date() == datetime.now().date())
ttl = 900 if is_today else 900
cached = self.cache.get(cache_key, ttl=ttl)
if cached:
return cached
result = self._read_from_sqlite(date, platform_ids, db_type)
if result:
self.cache.set(cache_key, result)
return result
raise DataNotFoundError(
f"未找到 {date_str} 的 {db_type} 数据",
suggestion="请先运行爬虫或检查日期是否正确"
)
def parse_yaml_config(self, config_path: str = None) -> dict:
"""
解析YAML配置文件
Args:
config_path: 配置文件路径,默认为 config/config.yaml
Returns:
配置字典
Raises:
FileParseError: 配置文件解析错误
"""
if config_path is None:
config_path = self.project_root / "config" / "config.yaml"
else:
config_path = Path(config_path)
if not config_path.exists():
raise FileParseError(str(config_path), "配置文件不存在")
try:
with open(config_path, "r", encoding="utf-8") as f:
config_data = yaml.safe_load(f)
return config_data
except Exception as e:
raise FileParseError(str(config_path), str(e))
def parse_frequency_words(self, words_file: str = None) -> List[Dict]:
"""
解析关键词配置文件
复用 trendradar.core.frequency 的解析逻辑,支持:
- # 开头的注释行
- 空行分隔词组
- [组别名] 作为词组第一行,给整组指定别名
- +前缀必须词、!前缀过滤词、@数量限制
- /pattern/ 正则表达式语法
- => 别名 显示名称语法
- [GLOBAL_FILTER] 全局过滤区域
显示名称优先级:组别名 > 行别名拼接 > 关键词拼接
Args:
words_file: 关键词文件路径,默认为 config/frequency_words.txt
Returns:
词组列表
Raises:
FileParseError: 文件解析错误
"""
from trendradar.core.frequency import load_frequency_words
if words_file is None:
words_file = str(self.project_root / "config" / "frequency_words.txt")
else:
words_file = str(words_file)
try:
word_groups, filter_words, global_filters = load_frequency_words(words_file)
return word_groups
except FileNotFoundError:
return []
except Exception as e:
raise FileParseError(words_file, str(e))
def get_available_dates(self, db_type: str = "news") -> List[str]:
"""
获取可用的日期列表
Args:
db_type: 数据库类型 ("news" 或 "rss")
Returns:
日期字符串列表(YYYY-MM-DD 格式,降序排列)
"""
db_dir = self.project_root / "output" / db_type
if not db_dir.exists():
return []
dates = []
for db_file in db_dir.glob("*.db"):
date_match = re.match(r'(\d{4}-\d{2}-\d{2})\.db$', db_file.name)
if date_match:
dates.append(date_match.group(1))
return sorted(dates, reverse=True)
def get_available_date_range(self, db_type: str = "news") -> Tuple[Optional[datetime], Optional[datetime]]:
"""
获取可用的日期范围
Args:
db_type: 数据库类型 ("news" 或 "rss")
Returns:
(最早日期, 最新日期) 元组,如果没有数据则返回 (None, None)
"""
dates = self.get_available_dates(db_type)
if not dates:
return (None, None)
earliest = datetime.strptime(dates[-1], "%Y-%m-%d")
latest = datetime.strptime(dates[0], "%Y-%m-%d")
return (earliest, latest)