"""
TrendRadar MCP Server - FastMCP 2.0 实现
使用 FastMCP 2.0 提供生产级 MCP 工具服务器。
支持 stdio 和 HTTP 两种传输模式。
"""
import asyncio
import json
from typing import List, Optional, Dict, Union
from fastmcp import FastMCP
from .tools.data_query import DataQueryTools
from .tools.analytics import AnalyticsTools
from .tools.search_tools import SearchTools
from .tools.config_mgmt import ConfigManagementTools
from .tools.system import SystemManagementTools
from .tools.storage_sync import StorageSyncTools
from .tools.article_reader import ArticleReaderTools
from .tools.notification import NotificationTools
from .utils.date_parser import DateParser
from .utils.errors import MCPError
# 创建 FastMCP 2.0 应用
mcp = FastMCP('trendradar-news')
# 全局工具实例(在第一次请求时初始化)
_tools_instances = {}
def _get_tools(project_root: Optional[str] = None):
"""获取或创建工具实例(单例模式)"""
if not _tools_instances:
_tools_instances['data'] = DataQueryTools(project_root)
_tools_instances['analytics'] = AnalyticsTools(project_root)
_tools_instances['search'] = SearchTools(project_root)
_tools_instances['config'] = ConfigManagementTools(project_root)
_tools_instances['system'] = SystemManagementTools(project_root)
_tools_instances['storage'] = StorageSyncTools(project_root)
_tools_instances['article'] = ArticleReaderTools(project_root)
_tools_instances['notification'] = NotificationTools(project_root)
return _tools_instances
# ==================== MCP Resources ====================
@mcp.resource("config://platforms")
async def get_platforms_resource() -> str:
"""
获取支持的平台列表
返回 config.yaml 中配置的所有平台信息,包括 ID 和名称。
"""
tools = _get_tools()
config = await asyncio.to_thread(
tools['config'].get_current_config, section="crawler"
)
return json.dumps({
"platforms": config.get("platforms", []),
"description": "TrendRadar 支持的热榜平台列表"
}, ensure_ascii=False, indent=2)
@mcp.resource("config://rss-feeds")
async def get_rss_feeds_resource() -> str:
"""
获取 RSS 订阅源列表
返回当前配置的所有 RSS 源信息。
"""
tools = _get_tools()
status = await asyncio.to_thread(tools['data'].get_rss_feeds_status)
return json.dumps({
"feeds": status.get("today_feeds", {}),
"description": "TrendRadar 支持的 RSS 订阅源列表"
}, ensure_ascii=False, indent=2)
@mcp.resource("data://available-dates")
async def get_available_dates_resource() -> str:
"""
获取可用的数据日期范围
返回本地存储中可查询的日期列表。
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['storage'].list_available_dates, source="local"
)
return json.dumps({
"dates": result.get("data", {}).get("local", {}).get("dates", []),
"description": "本地存储中可查询的日期列表"
}, ensure_ascii=False, indent=2)
@mcp.resource("config://keywords")
async def get_keywords_resource() -> str:
"""
获取关注词配置
返回 frequency_words.txt 中配置的关注词分组。
"""
tools = _get_tools()
config = await asyncio.to_thread(
tools['config'].get_current_config, section="keywords"
)
return json.dumps({
"word_groups": config.get("word_groups", []),
"total_groups": config.get("total_groups", 0),
"description": "TrendRadar 关注词配置"
}, ensure_ascii=False, indent=2)
# ==================== 日期解析工具(优先调用)====================
@mcp.tool
async def resolve_date_range(
expression: str
) -> str:
"""
【推荐优先调用】将自然语言日期表达式解析为标准日期范围
**为什么需要这个工具?**
用户经常使用"本周"、"最近7天"等自然语言表达日期,但 AI 模型自己计算日期
可能导致不一致的结果。此工具在服务器端使用精确的当前时间计算,确保所有
AI 模型获得一致的日期范围。
**推荐使用流程:**
1. 用户说"分析AI本周的情感倾向"
2. AI 调用 resolve_date_range("本周") → 获取精确日期范围
3. AI 调用 analyze_sentiment(topic="ai", date_range=上一步返回的date_range)
Args:
expression: 自然语言日期表达式,支持:
- 单日: "今天", "昨天", "today", "yesterday"
- 周: "本周", "上周", "this week", "last week"
- 月: "本月", "上月", "this month", "last month"
- 最近N天: "最近7天", "最近30天", "last 7 days", "last 30 days"
- 动态: "最近5天", "last 10 days"(任意天数)
Returns:
JSON格式的日期范围,可直接用于其他工具的 date_range 参数:
{
"success": true,
"expression": "本周",
"date_range": {
"start": "2025-11-18",
"end": "2025-11-26"
},
"current_date": "2025-11-26",
"description": "本周(周一到周日,11-18 至 11-26)"
}
Examples:
用户:"分析AI本周的情感倾向"
AI调用步骤:
1. resolve_date_range("本周")
→ {"date_range": {"start": "2025-11-18", "end": "2025-11-26"}, ...}
2. analyze_sentiment(topic="ai", date_range={"start": "2025-11-18", "end": "2025-11-26"})
用户:"看看最近7天的特斯拉新闻"
AI调用步骤:
1. resolve_date_range("最近7天")
→ {"date_range": {"start": "2025-11-20", "end": "2025-11-26"}, ...}
2. search_news(query="特斯拉", date_range={"start": "2025-11-20", "end": "2025-11-26"})
"""
try:
result = await asyncio.to_thread(DateParser.resolve_date_range_expression, expression)
return json.dumps(result, ensure_ascii=False, indent=2)
except MCPError as e:
return json.dumps({
"success": False,
"error": e.to_dict()
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}, ensure_ascii=False, indent=2)
# ==================== 数据查询工具 ====================
@mcp.tool
async def get_latest_news(
platforms: Optional[List[str]] = None,
limit: int = 50,
include_url: bool = False
) -> str:
"""
获取最新一批爬取的新闻数据,快速了解当前热点
Args:
platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
limit: 返回条数限制,默认50,最大1000
include_url: 是否包含URL链接,默认False(节省token)
Returns:
JSON格式的新闻列表
**数据展示建议**
- 默认展示全部返回数据,除非用户明确要求总结
- 用户说"总结"或"挑重点"时才进行筛选
- 用户问"为什么只显示部分"说明需要完整数据
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['data'].get_latest_news,
platforms=platforms, limit=limit, include_url=include_url
)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def get_trending_topics(
top_n: int = 10,
mode: str = 'current',
extract_mode: str = 'keywords'
) -> str:
"""
获取热点话题统计
Args:
top_n: 返回TOP N话题,默认10
mode: 时间模式
- "daily": 当日累计数据统计
- "current": 最新一批数据统计(默认)
extract_mode: 提取模式
- "keywords": 统计预设关注词(基于 config/frequency_words.txt,默认)
- "auto_extract": 自动从新闻标题提取高频词(无需预设,自动发现热点)
Returns:
JSON格式的话题频率统计列表
Examples:
- 使用预设关注词: get_trending_topics(mode="current")
- 自动提取热点: get_trending_topics(extract_mode="auto_extract", top_n=20)
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['data'].get_trending_topics,
top_n=top_n, mode=mode, extract_mode=extract_mode
)
return json.dumps(result, ensure_ascii=False, indent=2)
# ==================== RSS 数据查询工具 ====================
@mcp.tool
async def get_latest_rss(
feeds: Optional[List[str]] = None,
days: int = 1,
limit: int = 50,
include_summary: bool = False
) -> str:
"""
获取最新的 RSS 订阅数据(支持多日查询)
RSS 数据与热榜新闻分开存储,按时间流展示,适合获取特定来源的最新内容。
Args:
feeds: RSS 源 ID 列表,如 ['hacker-news', '36kr'],不指定则返回所有源
days: 获取最近 N 天的数据,默认 1(仅今天),最大 30 天
limit: 返回条数限制,默认50,最大500
include_summary: 是否包含文章摘要,默认False(节省token)
Returns:
JSON格式的 RSS 条目列表
Examples:
- get_latest_rss()
- get_latest_rss(days=7, feeds=['hacker-news'])
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['data'].get_latest_rss,
feeds=feeds, days=days, limit=limit, include_summary=include_summary
)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def search_rss(
keyword: str,
feeds: Optional[List[str]] = None,
days: int = 7,
limit: int = 50,
include_summary: bool = False
) -> str:
"""
搜索 RSS 数据
在 RSS 订阅数据中搜索包含指定关键词的文章。
Args:
keyword: 搜索关键词(必需)
feeds: RSS 源 ID 列表,如 ['hacker-news', '36kr']
- 不指定时:搜索所有 RSS 源
days: 搜索最近 N 天的数据,默认 7 天,最大 30 天
limit: 返回条数限制,默认50
include_summary: 是否包含文章摘要,默认False
Returns:
JSON格式的匹配 RSS 条目列表
Examples:
- search_rss(keyword="AI")
- search_rss(keyword="machine learning", feeds=['hacker-news'], days=14)
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['data'].search_rss,
keyword=keyword,
feeds=feeds,
days=days,
limit=limit,
include_summary=include_summary
)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def get_rss_feeds_status() -> str:
"""
获取 RSS 源状态信息
查看当前配置的 RSS 源及其数据统计信息。
Returns:
JSON格式的 RSS 源状态,包含:
- available_dates: 有 RSS 数据的日期列表
- total_dates: 总日期数
- today_feeds: 今日各 RSS 源的数据统计
- {feed_id}: { name, item_count }
- generated_at: 生成时间
Examples:
- get_rss_feeds_status() # 查看所有 RSS 源状态
"""
tools = _get_tools()
result = await asyncio.to_thread(tools['data'].get_rss_feeds_status)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def get_news_by_date(
date_range: Optional[Union[Dict[str, str], str]] = None,
platforms: Optional[List[str]] = None,
limit: int = 50,
include_url: bool = False
) -> str:
"""
获取指定日期的新闻数据,用于历史数据分析和对比
Args:
date_range: 日期范围,支持多种格式:
- 范围对象: {"start": "2025-01-01", "end": "2025-01-07"}
- 自然语言: "今天", "昨天", "本周", "最近7天"
- 单日字符串: "2025-01-15"
- 默认值: "今天"
platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
limit: 返回条数限制,默认50,最大1000
include_url: 是否包含URL链接,默认False(节省token)
Returns:
JSON格式的新闻列表,包含标题、平台、排名等信息
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['data'].get_news_by_date,
date_range=date_range,
platforms=platforms,
limit=limit,
include_url=include_url
)
return json.dumps(result, ensure_ascii=False, indent=2)
# ==================== 高级数据分析工具 ====================
@mcp.tool
async def analyze_topic_trend(
topic: str,
analysis_type: str = "trend",
date_range: Optional[Union[Dict[str, str], str]] = None,
granularity: str = "day",
spike_threshold: float = 3.0,
time_window: int = 24,
lookahead_hours: int = 6,
confidence_threshold: float = 0.7
) -> str:
"""
统一话题趋势分析工具 - 整合多种趋势分析模式
建议:使用自然语言日期时,先调用 resolve_date_range 获取精确日期范围。
Args:
topic: 话题关键词(必需)
analysis_type: 分析类型
- "trend": 热度趋势分析(默认)
- "lifecycle": 生命周期分析
- "viral": 异常热度检测
- "predict": 话题预测
date_range: 日期范围,格式 {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"},默认最近7天
granularity: 时间粒度,默认"day"
spike_threshold: 热度突增倍数阈值(viral模式),默认3.0
time_window: 检测时间窗口小时数(viral模式),默认24
lookahead_hours: 预测未来小时数(predict模式),默认6
confidence_threshold: 置信度阈值(predict模式),默认0.7
Returns:
JSON格式的趋势分析结果
Examples:
- analyze_topic_trend(topic="AI", date_range={"start": "2025-01-01", "end": "2025-01-07"})
- analyze_topic_trend(topic="特斯拉", analysis_type="lifecycle")
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['analytics'].analyze_topic_trend_unified,
topic=topic,
analysis_type=analysis_type,
date_range=date_range,
granularity=granularity,
threshold=spike_threshold,
time_window=time_window,
lookahead_hours=lookahead_hours,
confidence_threshold=confidence_threshold
)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def analyze_data_insights(
insight_type: str = "platform_compare",
topic: Optional[str] = None,
date_range: Optional[Union[Dict[str, str], str]] = None,
min_frequency: int = 3,
top_n: int = 20
) -> str:
"""
统一数据洞察分析工具 - 整合多种数据分析模式
Args:
insight_type: 洞察类型,可选值:
- "platform_compare": 平台对比分析(对比不同平台对话题的关注度)
- "platform_activity": 平台活跃度统计(统计各平台发布频率和活跃时间)
- "keyword_cooccur": 关键词共现分析(分析关键词同时出现的模式)
topic: 话题关键词(可选,platform_compare模式适用)
date_range: **【对象类型】** 日期范围(可选)
- **格式**: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
- **示例**: {"start": "2025-01-01", "end": "2025-01-07"}
- **重要**: 必须是对象格式,不能传递整数
min_frequency: 最小共现频次(keyword_cooccur模式),默认3
top_n: 返回TOP N结果(keyword_cooccur模式),默认20
Returns:
JSON格式的数据洞察分析结果
Examples:
- analyze_data_insights(insight_type="platform_compare", topic="人工智能")
- analyze_data_insights(insight_type="platform_activity", date_range={"start": "2025-01-01", "end": "2025-01-07"})
- analyze_data_insights(insight_type="keyword_cooccur", min_frequency=5, top_n=15)
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['analytics'].analyze_data_insights_unified,
insight_type=insight_type,
topic=topic,
date_range=date_range,
min_frequency=min_frequency,
top_n=top_n
)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def analyze_sentiment(
topic: Optional[str] = None,
platforms: Optional[List[str]] = None,
date_range: Optional[Union[Dict[str, str], str]] = None,
limit: int = 50,
sort_by_weight: bool = True,
include_url: bool = False
) -> str:
"""
分析新闻的情感倾向和热度趋势
建议:使用自然语言日期时,先调用 resolve_date_range 获取精确日期范围。
Args:
topic: 话题关键词(可选)
platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
date_range: 日期范围,格式 {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"},默认今天
limit: 返回新闻数量,默认50,最大100(会对标题去重)
sort_by_weight: 是否按热度权重排序,默认True
include_url: 是否包含URL链接,默认False(节省token)
Returns:
JSON格式的分析结果,包含情感分布、热度趋势和相关新闻
Examples:
- analyze_sentiment(topic="AI", date_range={"start": "2025-01-01", "end": "2025-01-07"})
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['analytics'].analyze_sentiment,
topic=topic,
platforms=platforms,
date_range=date_range,
limit=limit,
sort_by_weight=sort_by_weight,
include_url=include_url
)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def find_related_news(
reference_title: str,
date_range: Optional[Union[Dict[str, str], str]] = None,
threshold: float = 0.5,
limit: int = 50,
include_url: bool = False
) -> str:
"""
查找与指定新闻标题相关的其他新闻(支持当天和历史数据)
Args:
reference_title: 参考新闻标题(完整或部分)
date_range: 日期范围(可选)
- 不指定: 只查询今天的数据
- "today", "yesterday", "last_week", "last_month": 预设值
- {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}: 自定义范围
threshold: 相似度阈值,0-1之间,默认0.5(越高匹配越严格)
limit: 返回条数限制,默认50
include_url: 是否包含URL链接,默认False(节省token)
Returns:
JSON格式的相关新闻列表,按相似度排序
Examples:
- find_related_news(reference_title="特斯拉降价")
- find_related_news(reference_title="AI突破", date_range="last_week")
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['search'].find_related_news_unified,
reference_title=reference_title,
date_range=date_range,
threshold=threshold,
limit=limit,
include_url=include_url
)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def generate_summary_report(
report_type: str = "daily",
date_range: Optional[Union[Dict[str, str], str]] = None
) -> str:
"""
每日/每周摘要生成器 - 自动生成热点摘要报告
Args:
report_type: 报告类型(daily/weekly)
date_range: **【对象类型】** 自定义日期范围(可选)
- **格式**: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
- **示例**: {"start": "2025-01-01", "end": "2025-01-07"}
- **重要**: 必须是对象格式,不能传递整数
Returns:
JSON格式的摘要报告,包含Markdown格式内容
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['analytics'].generate_summary_report,
report_type=report_type,
date_range=date_range
)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def aggregate_news(
date_range: Optional[Union[Dict[str, str], str]] = None,
platforms: Optional[List[str]] = None,
similarity_threshold: float = 0.7,
limit: int = 50,
include_url: bool = False
) -> str:
"""
跨平台新闻聚合 - 对相似新闻进行去重合并
将不同平台报道的同一事件合并为一条聚合新闻,显示跨平台覆盖情况和综合热度。
Args:
date_range: 日期范围,不指定则查询今天
platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
similarity_threshold: 相似度阈值,0.3-1.0,默认0.7(越高越严格)
limit: 返回聚合新闻数量,默认50
include_url: 是否包含URL链接,默认False
Returns:
JSON格式的聚合结果,包含去重统计、聚合新闻列表和平台覆盖统计
Examples:
- aggregate_news()
- aggregate_news(similarity_threshold=0.8)
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['analytics'].aggregate_news,
date_range=date_range,
platforms=platforms,
similarity_threshold=similarity_threshold,
limit=limit,
include_url=include_url
)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def compare_periods(
period1: Union[Dict[str, str], str],
period2: Union[Dict[str, str], str],
topic: Optional[str] = None,
compare_type: str = "overview",
platforms: Optional[List[str]] = None,
top_n: int = 10
) -> str:
"""
时期对比分析 - 比较两个时间段的新闻数据
对比不同时期的热点话题、平台活跃度、新闻数量等维度。
**使用场景:**
- 对比本周和上周的热点变化
- 分析某个话题在两个时期的热度差异
- 查看各平台活跃度的周期性变化
Args:
period1: 第一个时间段(基准期)
- {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}: 日期范围
- "today", "yesterday", "this_week", "last_week", "this_month", "last_month": 预设值
period2: 第二个时间段(对比期,格式同 period1)
topic: 可选的话题关键词(聚焦特定话题的对比)
compare_type: 对比类型
- "overview": 总体概览(默认)- 新闻数量、关键词变化、TOP新闻
- "topic_shift": 话题变化分析 - 上升话题、下降话题、新出现话题
- "platform_activity": 平台活跃度对比 - 各平台新闻数量变化
platforms: 平台过滤列表,如 ['zhihu', 'weibo']
top_n: 返回 TOP N 结果,默认10
Returns:
JSON格式的对比分析结果,包含:
- periods: 两个时期的日期范围
- compare_type: 对比类型
- overview/topic_shift/platform_comparison: 具体对比结果(根据类型)
Examples:
- compare_periods(period1="last_week", period2="this_week") # 周环比
- compare_periods(period1="last_month", period2="this_month", compare_type="topic_shift")
- compare_periods(
period1={"start": "2025-01-01", "end": "2025-01-07"},
period2={"start": "2025-01-08", "end": "2025-01-14"},
topic="人工智能"
)
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['analytics'].compare_periods,
period1=period1,
period2=period2,
topic=topic,
compare_type=compare_type,
platforms=platforms,
top_n=top_n
)
return json.dumps(result, ensure_ascii=False, indent=2)
# ==================== 智能检索工具 ====================
@mcp.tool
async def search_news(
query: str,
search_mode: str = "keyword",
date_range: Optional[Union[Dict[str, str], str]] = None,
platforms: Optional[List[str]] = None,
limit: int = 50,
sort_by: str = "relevance",
threshold: float = 0.6,
include_url: bool = False,
include_rss: bool = False,
rss_limit: int = 20
) -> str:
"""
统一搜索接口,支持多种搜索模式,可同时搜索热榜和RSS
建议:使用自然语言日期时,先调用 resolve_date_range 获取精确日期范围。
Args:
query: 搜索关键词或内容片段
search_mode: 搜索模式
- "keyword": 精确关键词匹配(默认)
- "fuzzy": 模糊内容匹配
- "entity": 实体名称搜索(人物/地点/机构)
date_range: 日期范围,格式 {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"},默认今天
platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
limit: 热榜返回条数限制,默认50
sort_by: 排序方式 - "relevance"(相关度)/ "weight"(权重)/ "date"(日期)
threshold: 相似度阈值(仅fuzzy模式),0-1,默认0.6
include_url: 是否包含URL链接,默认False
include_rss: 是否同时搜索RSS数据,默认False
rss_limit: RSS返回条数限制,默认20
Returns:
JSON格式的搜索结果,包含热榜新闻列表和可选的RSS结果
Examples:
- search_news(query="AI")
- search_news(query="AI", include_rss=True)
- search_news(query="特斯拉", date_range={"start": "2025-01-01", "end": "2025-01-07"})
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['search'].search_news_unified,
query=query,
search_mode=search_mode,
date_range=date_range,
platforms=platforms,
limit=limit,
sort_by=sort_by,
threshold=threshold,
include_url=include_url,
include_rss=include_rss,
rss_limit=rss_limit
)
return json.dumps(result, ensure_ascii=False, indent=2)
# ==================== 配置与系统管理工具 ====================
@mcp.tool
async def get_current_config(
section: str = "all"
) -> str:
"""
获取当前系统配置
Args:
section: 配置节,可选值:
- "all": 所有配置(默认)
- "crawler": 爬虫配置
- "push": 推送配置
- "keywords": 关键词配置
- "weights": 权重配置
Returns:
JSON格式的配置信息
"""
tools = _get_tools()
result = await asyncio.to_thread(tools['config'].get_current_config, section=section)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def get_system_status() -> str:
"""
获取系统运行状态和健康检查信息
返回系统版本、数据统计、缓存状态等信息
Returns:
JSON格式的系统状态信息
"""
tools = _get_tools()
result = await asyncio.to_thread(tools['system'].get_system_status)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def check_version(
proxy_url: Optional[str] = None
) -> str:
"""
检查版本更新(同时检查 TrendRadar 和 MCP Server)
比较本地版本与 GitHub 远程版本,判断是否需要更新。
Args:
proxy_url: 可选的代理URL,用于访问 GitHub(如 http://127.0.0.1:7890)
Returns:
JSON格式的版本检查结果,包含两个组件的版本对比和是否需要更新
Examples:
- check_version()
- check_version(proxy_url="http://127.0.0.1:7890")
"""
tools = _get_tools()
result = await asyncio.to_thread(tools['system'].check_version, proxy_url=proxy_url)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def trigger_crawl(
platforms: Optional[List[str]] = None,
save_to_local: bool = False,
include_url: bool = False
) -> str:
"""
手动触发一次爬取任务(可选持久化)
Args:
platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
save_to_local: 是否保存到本地 output 目录,默认 False
include_url: 是否包含URL链接,默认False(节省token)
Returns:
JSON格式的任务状态信息,包含成功/失败平台列表和新闻数据
Examples:
- trigger_crawl(platforms=['zhihu'])
- trigger_crawl(save_to_local=True)
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['system'].trigger_crawl,
platforms=platforms, save_to_local=save_to_local, include_url=include_url
)
return json.dumps(result, ensure_ascii=False, indent=2)
# ==================== 存储同步工具 ====================
@mcp.tool
async def sync_from_remote(
days: int = 7
) -> str:
"""
从远程存储拉取数据到本地
用于 MCP Server 等场景:爬虫存到远程云存储(如 Cloudflare R2),
MCP Server 拉取到本地进行分析查询。
Args:
days: 拉取最近 N 天的数据,默认 7 天
- 0: 不拉取
- 7: 拉取最近一周的数据
- 30: 拉取最近一个月的数据
Returns:
JSON格式的同步结果,包含:
- success: 是否成功
- synced_files: 成功同步的文件数量
- synced_dates: 成功同步的日期列表
- skipped_dates: 跳过的日期(本地已存在)
- failed_dates: 失败的日期及错误信息
- message: 操作结果描述
Examples:
- sync_from_remote() # 拉取最近7天
- sync_from_remote(days=30) # 拉取最近30天
Note:
需要在 config/config.yaml 中配置远程存储(storage.remote)或设置环境变量:
- S3_ENDPOINT_URL: 服务端点
- S3_BUCKET_NAME: 存储桶名称
- S3_ACCESS_KEY_ID: 访问密钥 ID
- S3_SECRET_ACCESS_KEY: 访问密钥
"""
tools = _get_tools()
result = await asyncio.to_thread(tools['storage'].sync_from_remote, days=days)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def get_storage_status() -> str:
"""
获取存储配置和状态
查看当前存储后端配置、本地和远程存储的状态信息。
Returns:
JSON格式的存储状态信息,包含本地/远程存储状态和拉取配置
"""
tools = _get_tools()
result = await asyncio.to_thread(tools['storage'].get_storage_status)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def list_available_dates(
source: str = "both"
) -> str:
"""
列出本地/远程可用的日期范围
查看本地和远程存储中有哪些日期的数据可用。
Args:
source: 数据来源
- "local": 仅本地
- "remote": 仅远程
- "both": 同时列出并对比(默认)
Returns:
JSON格式的日期列表,包含各来源的日期信息和对比结果
Examples:
- list_available_dates()
- list_available_dates(source="local")
"""
tools = _get_tools()
result = await asyncio.to_thread(tools['storage'].list_available_dates, source=source)
return json.dumps(result, ensure_ascii=False, indent=2)
# ==================== 文章内容读取工具 ====================
@mcp.tool
async def read_article(
url: str,
timeout: int = 30
) -> str:
"""
读取指定 URL 的文章内容,返回 LLM 友好的 Markdown 格式
通过 Jina AI Reader 将网页转换为干净的 Markdown,自动去除广告、导航栏等噪音内容。
适合用于:阅读新闻正文、获取文章详情、分析文章内容。
**典型使用流程:**
1. 先用 search_news(include_url=True) 搜索新闻获取链接
2. 再用 read_article(url=链接) 读取正文内容
3. AI 对 Markdown 正文进行分析、摘要、翻译等
Args:
url: 文章链接(必需),以 http:// 或 https:// 开头
timeout: 请求超时时间(秒),默认 30,最大 60
Returns:
JSON格式的文章内容,包含完整 Markdown 正文
Examples:
- read_article(url="https://example.com/news/123")
Note:
- 使用 Jina AI Reader 免费服务(100 RPM 限制)
- 每次请求间隔 5 秒(内置速率控制)
- 部分付费墙/登录墙页面可能无法完整获取
"""
tools = _get_tools()
timeout = min(max(timeout, 10), 60)
result = await asyncio.to_thread(
tools['article'].read_article,
url=url, timeout=timeout
)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def read_articles_batch(
urls: List[str],
timeout: int = 30
) -> str:
"""
批量读取多篇文章内容(最多 5 篇,间隔 5 秒)
逐篇请求文章内容,每篇之间自动间隔 5 秒以遵守速率限制。
**典型使用流程:**
1. 先用 search_news(include_url=True) 搜索新闻获取多个链接
2. 再用 read_articles_batch(urls=[...]) 批量读取正文
3. AI 对多篇文章进行对比分析、综合报告
Args:
urls: 文章链接列表(必需),最多处理 5 篇
timeout: 每篇的请求超时时间(秒),默认 30
Returns:
JSON格式的批量读取结果,包含每篇的完整内容和状态
Examples:
- read_articles_batch(urls=["https://a.com/1", "https://b.com/2"])
Note:
- 单次最多读取 5 篇,超出部分会被跳过
- 5 篇约需 25-30 秒(每篇间隔 5 秒)
- 单篇失败不影响其他篇的读取
"""
tools = _get_tools()
timeout = min(max(timeout, 10), 60)
result = await asyncio.to_thread(
tools['article'].read_articles_batch,
urls=urls, timeout=timeout
)
return json.dumps(result, ensure_ascii=False, indent=2)
# ==================== 通知推送工具 ====================
@mcp.tool
async def get_channel_format_guide(channel: Optional[str] = None) -> str:
"""
获取通知渠道的格式化策略指南
返回各渠道支持的 Markdown 特性、格式限制和最佳格式化提示词。
在调用 send_notification 之前使用此工具,可以了解目标渠道的格式要求,
从而生成最佳排版效果的消息内容。
各渠道格式差异概览:
- 飞书:支持 **粗体**、<font color>彩色文本、[链接](url)、--- 分割线
- 钉钉:支持 ### 标题、**粗体**、> 引用、--- 分割线,不支持颜色
- 企业微信:仅支持 **粗体**、[链接](url)、> 引用,不支持标题和分割线
- Telegram:自动转为 HTML,支持粗体/斜体/删除线/代码/链接/引用块
- ntfy:支持标准 Markdown,不支持颜色
- Bark:iOS 推送,仅支持粗体和链接,内容需精简
- Slack:自动转为 mrkdwn,*粗体*、~删除线~、<url|链接>
- 邮件:自动转为完整 HTML 网页,支持标题/样式/分割线
- 通用 Webhook:标准 Markdown 或自定义模板
Args:
channel: 指定渠道 ID(可选),不指定返回所有渠道策略
可选值: feishu, dingtalk, wework, telegram, email, ntfy, bark, slack, generic_webhook
Returns:
JSON格式的渠道格式化策略,包含支持特性、限制和格式化提示词
Examples:
- get_channel_format_guide() # 获取所有渠道策略
- get_channel_format_guide(channel="feishu") # 获取飞书策略
- get_channel_format_guide(channel="telegram") # 获取 Telegram 策略
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['notification'].get_channel_format_guide,
channel=channel
)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def get_notification_channels() -> str:
"""
获取所有已配置的通知渠道及其状态
检测 config.yaml 和 .env 环境变量中的通知渠道配置。
支持 9 个渠道:飞书、钉钉、企业微信、Telegram、邮件、ntfy、Bark、Slack、通用 Webhook。
Returns:
JSON格式的渠道状态,包含每个渠道是否已配置及配置来源
Examples:
- get_notification_channels()
"""
tools = _get_tools()
result = await asyncio.to_thread(tools['notification'].get_notification_channels)
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.tool
async def send_notification(
message: str,
title: str = "TrendRadar 通知",
channels: Optional[List[str]] = None,
) -> str:
"""
向已配置的通知渠道发送消息
接受 markdown 格式内容,内部自动适配各渠道的格式要求和限制:
- 飞书:Markdown 卡片消息(支持 **粗体**、<font color>彩色文本、[链接](url)、---)
- 钉钉:Markdown(自动降级标题为 ###、剥离 <font> 标签和删除线)
- 企业微信:Markdown(自动剥离 # 标题、---、<font> 标签、删除线)
- Telegram:HTML(自动转换 **→<b>、*→<i>、~~→<s>、>→<blockquote>)
- Email:HTML 邮件(完整网页样式,支持 # 标题、---、粗体斜体)
- ntfy:Markdown(自动剥离 <font> 标签)
- Bark:Markdown(自动简化为粗体+链接,适配 iOS 推送)
- Slack:mrkdwn(自动转换 **→*、~~→~、[text](url)→<url|text>)
- 通用 Webhook:Markdown(支持自定义模板)
提示:发送前可调用 get_channel_format_guide 获取目标渠道的详细格式化策略,
以生成最佳排版效果的消息内容。
Args:
message: markdown 格式的消息内容(必需)
title: 消息标题,默认 "TrendRadar 通知"
channels: 指定发送的渠道列表,不指定则发送到所有已配置渠道
可选值: feishu, dingtalk, wework, telegram, email, ntfy, bark, slack, generic_webhook
Returns:
JSON格式的发送结果,包含每个渠道的发送状态
Examples:
- send_notification(message="**测试消息**\\n这是一条测试通知")
- send_notification(message="紧急通知", title="系统告警", channels=["feishu", "dingtalk"])
"""
tools = _get_tools()
result = await asyncio.to_thread(
tools['notification'].send_notification,
message=message, title=title, channels=channels
)
return json.dumps(result, ensure_ascii=False, indent=2)
# ==================== 启动入口 ====================
def run_server(
project_root: Optional[str] = None,
transport: str = 'stdio',
host: str = '0.0.0.0',
port: int = 3333
):
"""
启动 MCP 服务器
Args:
project_root: 项目根目录路径
transport: 传输模式,'stdio' 或 'http'
host: HTTP模式的监听地址,默认 0.0.0.0
port: HTTP模式的监听端口,默认 3333
"""
# 初始化工具实例
_get_tools(project_root)
# 打印启动信息
print()
print("=" * 60)
print(" TrendRadar MCP Server - FastMCP 2.0")
print("=" * 60)
print(f" 传输模式: {transport.upper()}")
if transport == 'stdio':
print(" 协议: MCP over stdio (标准输入输出)")
print(" 说明: 通过标准输入输出与 MCP 客户端通信")
elif transport == 'http':
print(f" 协议: MCP over HTTP (生产环境)")
print(f" 服务器监听: {host}:{port}")
if project_root:
print(f" 项目目录: {project_root}")
else:
print(" 项目目录: 当前目录")
print()
print(" 已注册的工具:")
print(" === 日期解析工具(推荐优先调用)===")
print(" 0. resolve_date_range - 解析自然语言日期为标准格式")
print()
print(" === 基础数据查询(P0核心)===")
print(" 1. get_latest_news - 获取最新新闻")
print(" 2. get_news_by_date - 按日期查询新闻(支持自然语言)")
print(" 3. get_trending_topics - 获取趋势话题(支持自动提取)")
print()
print(" === RSS 数据查询 ===")
print(" 4. get_latest_rss - 获取最新 RSS 订阅数据")
print(" 5. search_rss - 搜索 RSS 数据")
print(" 6. get_rss_feeds_status - 获取 RSS 源状态")
print()
print(" === 智能检索工具 ===")
print(" 7. search_news - 统一新闻搜索(关键词/模糊/实体)")
print(" 8. find_related_news - 相关新闻查找(支持历史数据)")
print()
print(" === 高级数据分析 ===")
print(" 9. analyze_topic_trend - 统一话题趋势分析(热度/生命周期/爆火/预测)")
print(" 10. analyze_data_insights - 统一数据洞察分析(平台对比/活跃度/关键词共现)")
print(" 11. analyze_sentiment - 情感倾向分析")
print(" 12. aggregate_news - 跨平台新闻聚合去重")
print(" 13. compare_periods - 时期对比分析(周环比/月环比)")
print(" 14. generate_summary_report - 每日/每周摘要生成")
print()
print(" === 配置与系统管理 ===")
print(" 15. get_current_config - 获取当前系统配置")
print(" 16. get_system_status - 获取系统运行状态")
print(" 17. check_version - 检查版本更新(对比本地与远程版本)")
print(" 18. trigger_crawl - 手动触发爬取任务")
print()
print(" === 存储同步工具 ===")
print(" 19. sync_from_remote - 从远程存储拉取数据到本地")
print(" 20. get_storage_status - 获取存储配置和状态")
print(" 21. list_available_dates - 列出本地/远程可用日期")
print()
print(" === 文章内容读取 ===")
print(" 22. read_article - 读取单篇文章内容(Markdown格式)")
print(" 23. read_articles_batch - 批量读取多篇文章(自动限速)")
print()
print(" === 通知推送工具 ===")
print(" 24. get_channel_format_guide - 获取渠道格式化策略指南(提示词)")
print(" 25. get_notification_channels - 获取已配置的通知渠道状态")
print(" 26. send_notification - 向通知渠道发送消息(自动适配格式)")
print("=" * 60)
print()
# 根据传输模式运行服务器
if transport == 'stdio':
mcp.run(transport='stdio')
elif transport == 'http':
# HTTP 模式(生产推荐)
mcp.run(
transport='http',
host=host,
port=port,
path='/mcp' # HTTP 端点路径
)
else:
raise ValueError(f"不支持的传输模式: {transport}")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
description='TrendRadar MCP Server - 新闻热点聚合 MCP 工具服务器',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
详细配置教程请查看: README-Cherry-Studio.md
"""
)
parser.add_argument(
'--transport',
choices=['stdio', 'http'],
default='stdio',
help='传输模式:stdio (默认) 或 http (生产环境)'
)
parser.add_argument(
'--host',
default='0.0.0.0',
help='HTTP模式的监听地址,默认 0.0.0.0'
)
parser.add_argument(
'--port',
type=int,
default=3333,
help='HTTP模式的监听端口,默认 3333'
)
parser.add_argument(
'--project-root',
help='项目根目录路径'
)
args = parser.parse_args()
run_server(
project_root=args.project_root,
transport=args.transport,
host=args.host,
port=args.port
)