"""
数据查询工具
实现P0核心的数据查询工具。
"""
from typing import Dict, List, Optional, Union
from ..services.data_service import DataService
from ..utils.validators import (
validate_platforms,
validate_limit,
validate_keyword,
validate_date_range,
validate_top_n,
validate_mode,
validate_date_query,
normalize_date_range
)
from ..utils.errors import MCPError
class DataQueryTools:
"""数据查询工具类"""
def __init__(self, project_root: str = None):
"""
初始化数据查询工具
Args:
project_root: 项目根目录
"""
self.data_service = DataService(project_root)
def get_latest_news(
self,
platforms: Optional[List[str]] = None,
limit: Optional[int] = None,
include_url: bool = False
) -> Dict:
"""
获取最新一批爬取的新闻数据
Args:
platforms: 平台ID列表,如 ['zhihu', 'weibo']
limit: 返回条数限制,默认20
include_url: 是否包含URL链接,默认False(节省token)
Returns:
新闻列表字典
Example:
>>> tools = DataQueryTools()
>>> result = tools.get_latest_news(platforms=['zhihu'], limit=10)
>>> print(result['total'])
10
"""
try:
# 参数验证
platforms = validate_platforms(platforms)
limit = validate_limit(limit, default=50)
# 获取数据
news_list = self.data_service.get_latest_news(
platforms=platforms,
limit=limit,
include_url=include_url
)
return {
"success": True,
"summary": {
"description": "最新一批爬取的新闻数据",
"total": len(news_list),
"returned": len(news_list),
"platforms": platforms or "全部平台"
},
"data": news_list
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def search_news_by_keyword(
self,
keyword: str,
date_range: Optional[Union[Dict, str]] = None,
platforms: Optional[List[str]] = None,
limit: Optional[int] = None
) -> Dict:
"""
按关键词搜索历史新闻
Args:
keyword: 搜索关键词(必需)
date_range: 日期范围,格式: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
platforms: 平台过滤列表
limit: 返回条数限制(可选,默认返回所有)
Returns:
搜索结果字典
Example (假设今天是 2025-11-17):
>>> tools = DataQueryTools()
>>> result = tools.search_news_by_keyword(
... keyword="人工智能",
... date_range={"start": "2025-11-08", "end": "2025-11-17"},
... limit=50
... )
>>> print(result['total'])
"""
try:
# 参数验证
keyword = validate_keyword(keyword)
date_range_tuple = validate_date_range(date_range)
platforms = validate_platforms(platforms)
if limit is not None:
limit = validate_limit(limit, default=100)
# 搜索数据
search_result = self.data_service.search_news_by_keyword(
keyword=keyword,
date_range=date_range_tuple,
platforms=platforms,
limit=limit
)
return {
**search_result,
"success": True
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def get_trending_topics(
self,
top_n: Optional[int] = None,
mode: Optional[str] = None,
extract_mode: Optional[str] = None
) -> Dict:
"""
获取热点话题统计
Args:
top_n: 返回TOP N话题,默认10
mode: 时间模式
- "daily": 当日累计数据统计
- "current": 最新一批数据统计(默认)
extract_mode: 提取模式
- "keywords": 统计预设关注词(基于 config/frequency_words.txt,默认)
- "auto_extract": 自动从新闻标题提取高频词
Returns:
话题频率统计字典
Example:
>>> tools = DataQueryTools()
>>> # 使用预设关注词
>>> result = tools.get_trending_topics(top_n=5, mode="current")
>>> # 自动提取高频词
>>> result = tools.get_trending_topics(top_n=10, extract_mode="auto_extract")
"""
try:
# 参数验证
top_n = validate_top_n(top_n, default=10)
valid_modes = ["daily", "current"]
mode = validate_mode(mode, valid_modes, default="current")
# 验证 extract_mode
if extract_mode is None:
extract_mode = "keywords"
elif extract_mode not in ["keywords", "auto_extract"]:
return {
"success": False,
"error": {
"code": "INVALID_PARAMETER",
"message": f"不支持的提取模式: {extract_mode}",
"suggestion": "支持的模式: keywords, auto_extract"
}
}
# 获取趋势话题
trending_result = self.data_service.get_trending_topics(
top_n=top_n,
mode=mode,
extract_mode=extract_mode
)
return {
**trending_result,
"success": True
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def get_news_by_date(
self,
date_range: Optional[Union[Dict[str, str], str]] = None,
platforms: Optional[List[str]] = None,
limit: Optional[int] = None,
include_url: bool = False
) -> Dict:
"""
按日期查询新闻,支持自然语言日期
Args:
date_range: 日期范围(可选,默认"今天"),支持:
- 范围对象:{"start": "2025-01-01", "end": "2025-01-07"}
- 相对日期:今天、昨天、前天、3天前
- 单日字符串:2025-10-10
platforms: 平台ID列表,如 ['zhihu', 'weibo']
limit: 返回条数限制,默认50
include_url: 是否包含URL链接,默认False(节省token)
Returns:
新闻列表字典
Example:
>>> tools = DataQueryTools()
>>> # 不指定日期,默认查询今天
>>> result = tools.get_news_by_date(platforms=['zhihu'], limit=20)
>>> # 指定日期
>>> result = tools.get_news_by_date(
... date_range="昨天",
... platforms=['zhihu'],
... limit=20
... )
>>> print(result['total'])
20
"""
try:
# 参数验证 - 默认今天
if date_range is None:
date_range = "今天"
# 规范化 date_range(处理 JSON 字符串序列化问题)
date_range = normalize_date_range(date_range)
# 处理 date_range:支持字符串或对象
if isinstance(date_range, dict):
# 范围对象,取 start 日期
date_str = date_range.get('start', '今天')
else:
date_str = date_range
target_date = validate_date_query(date_str)
platforms = validate_platforms(platforms)
limit = validate_limit(limit, default=50)
# 获取数据
news_list = self.data_service.get_news_by_date(
target_date=target_date,
platforms=platforms,
limit=limit,
include_url=include_url
)
return {
"success": True,
"summary": {
"description": f"按日期查询的新闻({target_date.strftime('%Y-%m-%d')})",
"total": len(news_list),
"returned": len(news_list),
"date": target_date.strftime("%Y-%m-%d"),
"date_range": date_range,
"platforms": platforms or "全部平台"
},
"data": news_list
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
# ========================================
# RSS 数据查询方法
# ========================================
def get_latest_rss(
self,
feeds: Optional[List[str]] = None,
days: int = 1,
limit: Optional[int] = None,
include_summary: bool = False
) -> Dict:
"""
获取最新的 RSS 数据(支持多日查询)
Args:
feeds: RSS 源 ID 列表,如 ['hacker-news', '36kr']
days: 获取最近 N 天的数据,默认 1(仅今天),最大 30 天
limit: 返回条数限制,默认50
include_summary: 是否包含摘要,默认False(节省token)
Returns:
RSS 条目列表字典
"""
try:
limit = validate_limit(limit, default=50)
rss_list = self.data_service.get_latest_rss(
feeds=feeds,
days=days,
limit=limit,
include_summary=include_summary
)
return {
"success": True,
"summary": {
"description": f"最近 {days} 天的 RSS 订阅数据" if days > 1 else "最新的 RSS 订阅数据",
"total": len(rss_list),
"returned": len(rss_list),
"days": days,
"feeds": feeds or "全部订阅源"
},
"data": rss_list
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def search_rss(
self,
keyword: str,
feeds: Optional[List[str]] = None,
days: int = 7,
limit: Optional[int] = None,
include_summary: bool = False
) -> Dict:
"""
搜索 RSS 数据
Args:
keyword: 搜索关键词
feeds: RSS 源 ID 列表
days: 搜索最近 N 天的数据,默认 7 天
limit: 返回条数限制,默认50
include_summary: 是否包含摘要
Returns:
匹配的 RSS 条目列表
"""
try:
keyword = validate_keyword(keyword)
limit = validate_limit(limit, default=50)
if days < 1 or days > 30:
days = 7
rss_list = self.data_service.search_rss(
keyword=keyword,
feeds=feeds,
days=days,
limit=limit,
include_summary=include_summary
)
return {
"success": True,
"summary": {
"description": f"RSS 搜索结果(关键词: {keyword})",
"total": len(rss_list),
"returned": len(rss_list),
"keyword": keyword,
"feeds": feeds or "全部订阅源",
"days": days
},
"data": rss_list
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def get_rss_feeds_status(self) -> Dict:
"""
获取 RSS 源状态
Returns:
RSS 源状态信息
"""
try:
status = self.data_service.get_rss_feeds_status()
return {
**status,
"success": True
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}