# coding=utf-8
"""
RSS 抓取器
负责从配置的 RSS 源抓取数据并转换为标准格式
"""
import time
import random
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Optional, Tuple, Callable
import requests
from .parser import RSSParser, ParsedRSSItem
from trendradar.storage.base import RSSItem, RSSData
from trendradar.utils.time import get_configured_time, is_within_days, DEFAULT_TIMEZONE
@dataclass
class RSSFeedConfig:
"""RSS 源配置"""
id: str # 源 ID
name: str # 显示名称
url: str # RSS URL
max_items: int = 0 # 最大条目数(0=不限制)
enabled: bool = True # 是否启用
max_age_days: Optional[int] = None # 文章最大年龄(天),覆盖全局设置;None=使用全局,0=禁用过滤
class RSSFetcher:
"""RSS 抓取器"""
def __init__(
self,
feeds: List[RSSFeedConfig],
request_interval: int = 2000,
timeout: int = 15,
use_proxy: bool = False,
proxy_url: str = "",
timezone: str = DEFAULT_TIMEZONE,
freshness_enabled: bool = True,
default_max_age_days: int = 3,
):
"""
初始化抓取器
Args:
feeds: RSS 源配置列表
request_interval: 请求间隔(毫秒)
timeout: 请求超时(秒)
use_proxy: 是否使用代理
proxy_url: 代理 URL
timezone: 时区配置(如 'Asia/Shanghai')
freshness_enabled: 是否启用新鲜度过滤
default_max_age_days: 默认最大文章年龄(天)
"""
self.feeds = [f for f in feeds if f.enabled]
self.request_interval = request_interval
self.timeout = timeout
self.use_proxy = use_proxy
self.proxy_url = proxy_url
self.timezone = timezone
self.freshness_enabled = freshness_enabled
self.default_max_age_days = default_max_age_days
self.parser = RSSParser()
self.session = self._create_session()
def _create_session(self) -> requests.Session:
"""创建请求会话"""
session = requests.Session()
session.headers.update({
"User-Agent": "TrendRadar/2.0 RSS Reader (https://github.com/trendradar)",
"Accept": "application/feed+json, application/json, application/rss+xml, application/atom+xml, application/xml, text/xml, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
})
if self.use_proxy and self.proxy_url:
session.proxies = {
"http": self.proxy_url,
"https": self.proxy_url,
}
return session
def _filter_by_freshness(
self,
items: List[RSSItem],
feed: RSSFeedConfig,
) -> Tuple[List[RSSItem], int]:
"""
根据新鲜度过滤文章
Args:
items: 待过滤的文章列表
feed: RSS 源配置
Returns:
(过滤后的文章列表, 被过滤的文章数)
"""
# 如果全局禁用,直接返回
if not self.freshness_enabled:
return items, 0
# 确定此 feed 的 max_age_days
max_days = feed.max_age_days
if max_days is None:
max_days = self.default_max_age_days
# 如果设为 0,禁用此 feed 的过滤
if max_days == 0:
return items, 0
# 过滤逻辑:无发布时间的文章保留
filtered = []
for item in items:
if not item.published_at:
# 无发布时间,保留
filtered.append(item)
elif is_within_days(item.published_at, max_days, self.timezone):
# 在指定天数内,保留
filtered.append(item)
# 否则过滤掉
filtered_count = len(items) - len(filtered)
return filtered, filtered_count
def fetch_feed(self, feed: RSSFeedConfig) -> Tuple[List[RSSItem], Optional[str]]:
"""
抓取单个 RSS 源
Args:
feed: RSS 源配置
Returns:
(条目列表, 错误信息) 元组
"""
try:
response = self.session.get(feed.url, timeout=self.timeout)
response.raise_for_status()
parsed_items = self.parser.parse(response.text, feed.url)
# 限制条目数量(0=不限制)
if feed.max_items > 0:
parsed_items = parsed_items[:feed.max_items]
# 转换为 RSSItem(使用配置的时区)
now = get_configured_time(self.timezone)
crawl_time = now.strftime("%H:%M")
items = []
for parsed in parsed_items:
item = RSSItem(
title=parsed.title,
feed_id=feed.id,
feed_name=feed.name,
url=parsed.url,
published_at=parsed.published_at or "",
summary=parsed.summary or "",
author=parsed.author or "",
crawl_time=crawl_time,
first_time=crawl_time,
last_time=crawl_time,
count=1,
)
items.append(item)
# 注意:新鲜度过滤已移至推送阶段(_convert_rss_items_to_list)
# 这样所有文章都会存入数据库,但旧文章不会推送
print(f"[RSS] {feed.name}: 获取 {len(items)} 条")
return items, None
except requests.Timeout:
error = f"请求超时 ({self.timeout}s)"
print(f"[RSS] {feed.name}: {error}")
return [], error
except requests.RequestException as e:
error = f"请求失败: {e}"
print(f"[RSS] {feed.name}: {error}")
return [], error
except ValueError as e:
error = f"解析失败: {e}"
print(f"[RSS] {feed.name}: {error}")
return [], error
except Exception as e:
error = f"未知错误: {e}"
print(f"[RSS] {feed.name}: {error}")
return [], error
def fetch_all(self) -> RSSData:
"""
抓取所有 RSS 源
Returns:
RSSData 对象
"""
all_items: Dict[str, List[RSSItem]] = {}
id_to_name: Dict[str, str] = {}
failed_ids: List[str] = []
# 使用配置的时区
now = get_configured_time(self.timezone)
crawl_time = now.strftime("%H:%M")
crawl_date = now.strftime("%Y-%m-%d")
print(f"[RSS] 开始抓取 {len(self.feeds)} 个 RSS 源...")
for i, feed in enumerate(self.feeds):
# 请求间隔(带随机波动)
if i > 0:
interval = self.request_interval / 1000
jitter = random.uniform(-0.2, 0.2) * interval
time.sleep(interval + jitter)
items, error = self.fetch_feed(feed)
id_to_name[feed.id] = feed.name
if error:
failed_ids.append(feed.id)
else:
all_items[feed.id] = items
total_items = sum(len(items) for items in all_items.values())
print(f"[RSS] 抓取完成: {len(all_items)} 个源成功, {len(failed_ids)} 个失败, 共 {total_items} 条")
return RSSData(
date=crawl_date,
crawl_time=crawl_time,
items=all_items,
id_to_name=id_to_name,
failed_ids=failed_ids,
)
@classmethod
def from_config(cls, config: Dict) -> "RSSFetcher":
"""
从配置字典创建抓取器
Args:
config: 配置字典,格式如下:
{
"enabled": true,
"request_interval": 2000,
"freshness_filter": {
"enabled": true,
"max_age_days": 3
},
"feeds": [
{"id": "hacker-news", "name": "Hacker News", "url": "...", "max_age_days": 1}
]
}
Returns:
RSSFetcher 实例
"""
# 读取新鲜度过滤配置
freshness_config = config.get("freshness_filter", {})
freshness_enabled = freshness_config.get("enabled", True) # 默认启用
default_max_age_days = freshness_config.get("max_age_days", 3) # 默认3天
feeds = []
for feed_config in config.get("feeds", []):
# 读取并验证单个 feed 的 max_age_days(可选)
max_age_days_raw = feed_config.get("max_age_days")
max_age_days = None
if max_age_days_raw is not None:
try:
max_age_days = int(max_age_days_raw)
if max_age_days < 0:
feed_id = feed_config.get("id", "unknown")
print(f"[警告] RSS feed '{feed_id}' 的 max_age_days 为负数,将使用全局默认值")
max_age_days = None
except (ValueError, TypeError):
feed_id = feed_config.get("id", "unknown")
print(f"[警告] RSS feed '{feed_id}' 的 max_age_days 格式错误:{max_age_days_raw}")
max_age_days = None
feed = RSSFeedConfig(
id=feed_config.get("id", ""),
name=feed_config.get("name", ""),
url=feed_config.get("url", ""),
max_items=feed_config.get("max_items", 0), # 0=不限制
enabled=feed_config.get("enabled", True),
max_age_days=max_age_days, # None=使用全局,0=禁用,>0=覆盖
)
if feed.id and feed.url:
feeds.append(feed)
return cls(
feeds=feeds,
request_interval=config.get("request_interval", 2000),
timeout=config.get("timeout", 15),
use_proxy=config.get("use_proxy", False),
proxy_url=config.get("proxy_url", ""),
timezone=config.get("timezone", DEFAULT_TIMEZONE),
freshness_enabled=freshness_enabled,
default_max_age_days=default_max_age_days,
)