"""
YouTube Shorts 爆款发现 MCP Server
═══════════════════════════════════════════════════════════════════════════════
MCP (Model Context Protocol) 实现原理详解
═══════════════════════════════════════════════════════════════════════════════
## 什么是 MCP?
MCP (Model Context Protocol) 是一个开放协议,用于在 AI 应用(如 Claude Desktop)
和外部工具/数据源之间建立标准化的通信。
## 核心概念
1. **Server(服务器)**: 提供工具和资源的程序(本项目)
2. **Client(客户端)**: 调用工具的 AI 应用(Claude Desktop)
3. **Tool(工具)**: Server 暴露给 Client 的可调用功能
4. **Protocol(协议)**: 基于 JSON-RPC 2.0 的标准通信格式
## 通信流程
┌─────────────┐ ┌─────────────┐
│ Claude │ │ MCP Server │
│ Desktop │ │ (本项目) │
│ (Client) │ │ │
└──────┬──────┘ └──────┬──────┘
│ │
│ 1. 启动时请求工具列表 │
│────────────────────────────────────────>│
│ │
│ 2. 返回可用工具(list_tools) │
│<────────────────────────────────────────│
│ [get_youtube_shorts_trends, │
│ analyze_video_potential, ...] │
│ │
│ 3. 用户发送自然语言请求 │
│ "帮我看看 AI 相关的爆款 Shorts" │
│ │
│ 4. Claude 解析并调用工具 │
│ call_tool( │
│ name="get_youtube_shorts_trends", │
│ arguments={"keyword": "AI"} │
│ ) │
│────────────────────────────────────────>│
│ │
│ 5. Server 执行业务逻辑 │
## 关键组件
### 1. Server 初始化
- 创建 Server 实例
- 定义服务名称
### 2. 工具注册 (@app.list_tools)
- 声明工具名称、描述
- 定义输入参数的 JSON Schema
- Claude 会根据这些信息理解如何调用工具
### 3. 工具调用处理 (@app.call_tool)
- 接收工具名称和参数
- 执行业务逻辑
- 返回 TextContent 格式的结果
### 4. 通信协议 (stdio)
- 使用标准输入/输出(stdin/stdout)进行通信
- Claude Desktop 启动 Server 进程并通过管道通信
- 基于 JSON-RPC 2.0 协议
## 为什么使用 MCP?
1. **标准化**: 一次开发,可被所有支持 MCP 的 AI 应用调用
2. **安全性**: Server 运行在本地,数据不经过第三方
3. **灵活性**: 可以连接任何 API、数据库或本地工具
4. **自然交互**: 用户用自然语言,AI 自动选择和调用工具
═══════════════════════════════════════════════════════════════════════════════
"""
import re
from typing import Optional
from mcp.server import Server
from mcp.types import Tool, TextContent
from .youtube.client import YouTubeClient
from .youtube.analyzer import ViralAnalyzer
from .models.video import VideoData
# ═══════════════════════════════════════════════════════════════════════════
# 步骤 1: 创建 MCP Server 实例
# ═══════════════════════════════════════════════════════════════════════════
app = Server("youtube-shorts-viral-agent")
"""
Server 实例是 MCP 的核心
- 参数是服务器的唯一标识符
- Claude Desktop 会通过这个标识符来识别和管理服务器
"""
# ═══════════════════════════════════════════════════════════════════════════
# 辅助函数
# ═══════════════════════════════════════════════════════════════════════════
def extract_video_id(url: str) -> Optional[str]:
"""从 YouTube URL 中提取视频 ID"""
patterns = [
r'youtube\.com/shorts/([a-zA-Z0-9_-]+)',
r'youtube\.com/watch\?v=([a-zA-Z0-9_-]+)',
r'youtu\.be/([a-zA-Z0-9_-]+)',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
# ═══════════════════════════════════════════════════════════════════════════
# 步骤 2: 注册工具列表
# ═══════════════════════════════════════════════════════════════════════════
@app.list_tools()
async def list_tools() -> list[Tool]:
"""
这个函数会在 Claude Desktop 启动时被调用
返回值:
Tool 对象列表,每个 Tool 包含:
- name: 工具名称(唯一标识符)
- description: 工具描述(帮助 Claude 理解何时使用)
- inputSchema: JSON Schema 格式的参数定义
Claude 会根据这些信息:
1. 理解每个工具的功能
2. 知道需要哪些参数
3. 在用户提问时自动选择合适的工具
"""
return [
# ─────────────────────────────────────────────────────────────────
# 工具 1: 发现热门 Shorts
# ─────────────────────────────────────────────────────────────────
Tool(
name="get_youtube_shorts_trends",
description=(
"发现 YouTube Shorts 上的潜在热门视频。"
"通过分析播放量、互动率和发布时间,识别高质量的短视频。"
),
inputSchema={
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "搜索关键词(留空表示全局趋势)",
"default": ""
},
"hours_ago": {
"type": "integer",
"description": "时间范围(小时),默认 24 小时,最大 30 天",
"default": 24,
"minimum": 1,
"maximum": 720
},
"max_results": {
"type": "integer",
"description": "返回结果数量,默认 10",
"default": 10,
"minimum": 1,
"maximum": 50
},
"min_views": {
"type": "integer",
"description": "最低播放量阈值,默认 100,000",
"default": 100000,
"minimum": 0
}
}
}
),
# ─────────────────────────────────────────────────────────────────
# 工具 2: 分析单个视频
# ─────────────────────────────────────────────────────────────────
Tool(
name="analyze_video_potential",
description=(
"深度分析单个 YouTube Shorts 视频的表现。"
"提供详细的播放量、互动率等核心指标。"
),
inputSchema={
"type": "object",
"properties": {
"video_url": {
"type": "string",
"description": "YouTube Shorts 视频链接"
}
},
"required": ["video_url"]
}
),
# ─────────────────────────────────────────────────────────────────
# 工具 3: 发现热门话题
# ─────────────────────────────────────────────────────────────────
Tool(
name="get_trending_topics",
description=(
"发现当前 YouTube Shorts 上的热门话题和趋势。"
"分析多个分类的热门内容,帮助选题。"
),
inputSchema={
"type": "object",
"properties": {
"category": {
"type": "string",
"description": "内容分类",
"enum": ["tech", "entertainment", "education", "gaming", "all"],
"default": "all"
},
"hours_ago": {
"type": "integer",
"description": "时间范围(小时),最大 30 天",
"default": 24,
"minimum": 1,
"maximum": 720
}
}
}
),
# ─────────────────────────────────────────────────────────────────
# 工具 4: 提炼视频故事梗概
# ─────────────────────────────────────────────────────────────────
Tool(
name="summarize_video_story",
description=(
"提炼 YouTube Shorts 视频的故事梗概和核心内容。"
"基于视频标题和描述,分析视频的主题、情节和创作手法。"
),
inputSchema={
"type": "object",
"properties": {
"video_url": {
"type": "string",
"description": "YouTube Shorts 视频链接"
}
},
"required": ["video_url"]
}
),
# ─────────────────────────────────────────────────────────────────
# 工具 5: 发现细分爆款领域
# ─────────────────────────────────────────────────────────────────
Tool(
name="discover_niche_trends",
description=(
"智能发现某个主题下的细分爆款领域。"
"通过分析大量视频标题,自动识别高 VPH 的关键词组合,"
"例如在 'AI' 主题下发现 'AI Kpop'、'AI 印度故事' 等细分趋势。"
),
inputSchema={
"type": "object",
"properties": {
"main_topic": {
"type": "string",
"description": "主题关键词(如 'AI', 'tutorial', 'funny')"
},
"hours_ago": {
"type": "integer",
"description": "时间范围(小时),默认 24 小时",
"default": 24,
"minimum": 1,
"maximum": 720
},
"min_videos": {
"type": "integer",
"description": "细分领域最少视频数量,默认 3",
"default": 3,
"minimum": 2,
"maximum": 20
},
"top_niches": {
"type": "integer",
"description": "返回前 N 个细分领域,默认 10",
"default": 10,
"minimum": 3,
"maximum": 30
}
},
"required": ["main_topic"]
}
)
]
# ═══════════════════════════════════════════════════════════════════════════
# 步骤 3: 处理工具调用
# ═══════════════════════════════════════════════════════════════════════════
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""
当 Claude 调用工具时,这个函数会被执行
参数:
name: 工具名称(来自 list_tools 中定义的 name)
arguments: 工具参数(JSON 对象,符合 inputSchema)
返回值:
TextContent 列表,包含要返回给 Claude 的文本内容
工作流程:
1. Claude 解析用户的自然语言请求
2. 选择合适的工具和参数
3. 调用这个函数
4. 我们执行业务逻辑(调用 YouTube API、分析数据)
5. 返回格式化的结果
6. Claude 将结果展示给用户
"""
try:
# 根据工具名称路由到对应的处理函数
if name == "get_youtube_shorts_trends":
return await get_youtube_shorts_trends(
keyword=arguments.get("keyword", ""),
hours_ago=arguments.get("hours_ago", 24),
max_results=arguments.get("max_results", 10),
min_views=arguments.get("min_views", 100000)
)
elif name == "analyze_video_potential":
return await analyze_video_potential(
video_url=arguments["video_url"]
)
elif name == "get_trending_topics":
return await get_trending_topics(
category=arguments.get("category", "all"),
hours_ago=arguments.get("hours_ago", 24)
)
elif name == "summarize_video_story":
return await summarize_video_story(
video_url=arguments["video_url"]
)
elif name == "discover_niche_trends":
return await discover_niche_trends(
main_topic=arguments["main_topic"],
hours_ago=arguments.get("hours_ago", 24),
min_videos=arguments.get("min_videos", 3),
top_niches=arguments.get("top_niches", 10)
)
else:
return [TextContent(
type="text",
text=f"未知工具: {name}"
)]
except Exception as e:
# 错误处理:返回友好的错误信息
return [TextContent(
type="text",
text=f"❌ 执行失败: {str(e)}"
)]
# ═══════════════════════════════════════════════════════════════════════════
# 业务逻辑实现
# ═══════════════════════════════════════════════════════════════════════════
async def get_youtube_shorts_trends(
keyword: str = "",
hours_ago: int = 24,
max_results: int = 10,
min_views: int = 100000
) -> list[TextContent]:
"""
工具 1 的实现:获取 YouTube Shorts 热门趋势
这里是真正的业务逻辑:
1. 调用 YouTube API 搜索视频
2. 使用爆款分析算法计算指标
3. 筛选和排序
4. 生成 Markdown 格式的报告
"""
try:
# 初始化客户端和分析器
client = YouTubeClient()
analyzer = ViralAnalyzer()
# 获取视频数据
videos = client.get_trending_shorts(
keyword=keyword,
hours_ago=hours_ago,
max_results=max_results
)
if not videos:
return [TextContent(
type="text",
text=f"未找到符合条件的视频。\n\n搜索参数:\n- 关键词: {keyword or '全局'}\n- 时间范围: 最近 {hours_ago} 小时"
)]
# 分析视频(计算互动率)
videos = analyzer.analyze_videos(videos)
# 筛选和排序
videos = analyzer.filter_by_views(videos, min_views=min_views)
videos = analyzer.rank_videos(videos)
if not videos:
return [TextContent(
type="text",
text=f"未找到满足条件的视频(播放量 >= {min_views:,})。\n\n建议:\n- 降低筛选阈值\n- 扩大时间范围\n- 尝试其他关键词"
)]
# 生成 Markdown 报告
report = f"# 🔥 YouTube Shorts 热门视频报告\n\n"
report += f"**搜索参数**\n"
report += f"- 关键词: {keyword or '全局趋势'}\n"
report += f"- 时间范围: 最近 {hours_ago} 小时\n"
if min_views > 0:
report += f"- 播放量阈值: {min_views:,}\n"
report += f"- 找到视频: {len(videos)} 个\n\n"
report += "---\n\n"
report += VideoData.markdown_header() + "\n"
for video in videos:
report += video.to_markdown_row() + "\n"
# 添加统计摘要
report += f"\n---\n\n## 📊 数据摘要\n\n"
avg_engagement = sum(v.engagement_rate for v in videos) / len(videos)
avg_views = sum(v.views for v in videos) / len(videos)
top_video = videos[0]
report += f"- **平均播放量**: {avg_views:,.0f}\n"
report += f"- **平均互动率**: {avg_engagement:.2f}%\n"
report += f"- **最高播放量**: {top_video.views:,} ([{top_video.title[:30]}...]({top_video.url}))\n"
# 返回 TextContent 对象
return [TextContent(type="text", text=report)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 获取趋势失败: {str(e)}"
)]
async def analyze_video_potential(video_url: str) -> list[TextContent]:
"""工具 2 的实现:分析单个视频的爆款潜力"""
try:
# 提取视频 ID
video_id = extract_video_id(video_url)
if not video_id:
return [TextContent(
type="text",
text=f"❌ 无效的 YouTube 链接: {video_url}\n\n支持的格式:\n- https://www.youtube.com/shorts/VIDEO_ID\n- https://www.youtube.com/watch?v=VIDEO_ID\n- https://youtu.be/VIDEO_ID"
)]
# 获取视频详情
client = YouTubeClient()
videos = client.get_video_details([video_id])
if not videos:
return [TextContent(
type="text",
text=f"❌ 未找到视频: {video_id}"
)]
video = videos[0]
# 获取频道信息
channel_map = client.get_channel_info([video.channel_id])
if video.channel_id in channel_map:
video.channel_subscribers = channel_map[video.channel_id].subscribers
# 分析视频
analyzer = ViralAnalyzer()
video = analyzer.analyze_video(video)
# 生成详细报告
report = analyzer.generate_analysis_report(video)
return [TextContent(type="text", text=report)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 分析失败: {str(e)}"
)]
async def get_trending_topics(
category: str = "all",
hours_ago: int = 24
) -> list[TextContent]:
"""工具 3 的实现:获取热门话题"""
# 定义分类关键词
category_keywords = {
"tech": ["AI", "coding", "tech", "programming", "software"],
"entertainment": ["funny", "comedy", "entertainment", "viral"],
"education": ["tutorial", "learn", "education", "how to"],
"gaming": ["gaming", "gameplay", "game", "esports"],
"all": [""] # 空关键词表示全局搜索
}
keywords = category_keywords.get(category, [""])
try:
client = YouTubeClient()
analyzer = ViralAnalyzer()
all_videos = []
# 对每个关键词搜索
for keyword in keywords:
videos = client.get_trending_shorts(
keyword=keyword,
hours_ago=hours_ago,
max_results=5 # 每个关键词取 5 个
)
all_videos.extend(videos)
if not all_videos:
return [TextContent(
type="text",
text=f"未找到 {category} 分类的热门视频"
)]
# 分析并排序
all_videos = analyzer.analyze_videos(all_videos)
all_videos = analyzer.rank_videos(all_videos)
# 去重(按视频 ID)
seen = set()
unique_videos = []
for v in all_videos:
if v.video_id not in seen:
seen.add(v.video_id)
unique_videos.append(v)
# 取前 10
top_videos = unique_videos[:10]
# 生成报告
report = f"# 🎯 {category.upper()} 分类热门话题\n\n"
report += f"**时间范围**: 最近 {hours_ago} 小时\n\n"
report += "---\n\n"
report += VideoData.markdown_header() + "\n"
for video in top_videos:
report += video.to_markdown_row() + "\n"
return [TextContent(type="text", text=report)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 获取热门话题失败: {str(e)}"
)]
async def summarize_video_story(video_url: str) -> list[TextContent]:
"""
工具 4 的实现:提炼视频故事梗概
这个工具的巧妙之处:
1. 我们只负责获取视频的标题和描述
2. 将这些信息返回给 Claude
3. Claude 会自动分析并提炼故事梗概
4. 用户看到的是 Claude 基于视频信息生成的分析
"""
try:
# 提取视频 ID
video_id = extract_video_id(video_url)
if not video_id:
return [TextContent(
type="text",
text=f"❌ 无效的 YouTube 链接: {video_url}"
)]
# 获取视频详情
client = YouTubeClient()
videos = client.get_video_details([video_id])
if not videos:
return [TextContent(
type="text",
text=f"❌ 未找到视频: {video_id}"
)]
video = videos[0]
# 构建视频信息文本,供 Claude 分析
info = f"""# 视频信息
**标题**: {video.title}
**频道**: {video.channel_name}
**发布时间**: {video.published_at.strftime('%Y-%m-%d %H:%M UTC')}
**播放量**: {video.views:,}
**点赞数**: {video.likes:,}
**评论数**: {video.comments:,}
**视频链接**: {video.url}
---
## 视频描述
{video.description if video.description else '(该视频没有描述)'}
---
**请基于以上信息,分析这个视频的:**
1. 主题和核心内容
2. 可能的故事情节或创作手法
3. 为什么这个视频可能受欢迎
4. 适合什么样的观众群体
"""
return [TextContent(type="text", text=info)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 获取视频信息失败: {str(e)}"
)]
async def discover_niche_trends(
main_topic: str,
hours_ago: int = 24,
min_videos: int = 3,
top_niches: int = 10
) -> list[TextContent]:
"""
工具 5 的实现:发现细分爆款领域
算法流程:
1. 搜索主题关键词,获取大量视频
2. 提取标题中的关键词组合(bigram/trigram)
3. 统计每个细分领域的视频数量和平均 VPH
4. 返回 VPH 最高的细分领域
"""
try:
from collections import defaultdict
from datetime import datetime, timezone
client = YouTubeClient()
analyzer = ViralAnalyzer()
# 获取更多视频用于分析(最多 50 个)
videos = client.get_trending_shorts(
keyword=main_topic,
hours_ago=hours_ago,
max_results=50
)
if not videos:
return [TextContent(
type="text",
text=f"未找到 '{main_topic}' 相关的视频"
)]
# 分析视频(计算 VPH)
videos = analyzer.analyze_videos(videos)
# 提取关键词组合
niche_data = defaultdict(lambda: {
'videos': [],
'total_vph': 0,
'total_views': 0,
'count': 0
})
for video in videos:
# 提取标题中的关键词组合
title_lower = video.title.lower()
words = re.findall(r'\b\w+\b', title_lower)
# 生成 bigram (2个词的组合)
for i in range(len(words) - 1):
bigram = f"{words[i]} {words[i+1]}"
# 必须包含主题词
if main_topic.lower() in bigram:
niche_data[bigram]['videos'].append(video)
niche_data[bigram]['total_vph'] += video.vph
niche_data[bigram]['total_views'] += video.views
niche_data[bigram]['count'] += 1
# 生成 trigram (3个词的组合)
for i in range(len(words) - 2):
trigram = f"{words[i]} {words[i+1]} {words[i+2]}"
if main_topic.lower() in trigram:
niche_data[trigram]['videos'].append(video)
niche_data[trigram]['total_vph'] += video.vph
niche_data[trigram]['total_views'] += video.views
niche_data[trigram]['count'] += 1
# 过滤:至少有 min_videos 个视频
filtered_niches = {
niche: data
for niche, data in niche_data.items()
if data['count'] >= min_videos
}
if not filtered_niches:
return [TextContent(
type="text",
text=f"未找到符合条件的细分领域(至少 {min_videos} 个视频)\n\n建议:\n- 降低 min_videos 参数\n- 扩大时间范围\n- 尝试其他主题关键词"
)]
# 计算平均 VPH 并排序
niche_rankings = []
for niche, data in filtered_niches.items():
avg_vph = data['total_vph'] / data['count']
avg_views = data['total_views'] / data['count']
niche_rankings.append({
'niche': niche,
'avg_vph': avg_vph,
'avg_views': avg_views,
'video_count': data['count'],
'top_video': max(data['videos'], key=lambda v: v.vph)
})
# 按平均 VPH 排序
niche_rankings.sort(key=lambda x: x['avg_vph'], reverse=True)
top_niches_list = niche_rankings[:top_niches]
# 生成报告
report = f"# 🔍 '{main_topic}' 主题细分爆款领域分析\n\n"
report += f"**分析参数**\n"
report += f"- 主题: {main_topic}\n"
report += f"- 时间范围: 最近 {hours_ago} 小时\n"
report += f"- 分析视频数: {len(videos)}\n"
report += f"- 发现细分领域: {len(filtered_niches)} 个\n"
report += f"- 最少视频数阈值: {min_videos}\n\n"
report += "---\n\n"
report += "## 📊 高 VPH 细分领域排行\n\n"
for idx, niche_info in enumerate(top_niches_list, 1):
report += f"### {idx}. **{niche_info['niche'].title()}**\n\n"
report += f"- **平均 VPH**: {niche_info['avg_vph']:,.0f} 次/小时\n"
report += f"- **平均播放量**: {niche_info['avg_views']:,.0f}\n"
report += f"- **视频数量**: {niche_info['video_count']}\n"
top_vid = niche_info['top_video']
report += f"- **代表视频**: [{top_vid.title[:50]}...]({top_vid.url})\n"
report += f" - 播放量: {top_vid.views:,}\n"
report += f" - VPH: {top_vid.vph:,.0f}\n"
report += f" - 互动率: {top_vid.engagement_rate:.2f}%\n\n"
# 添加洞察
report += "---\n\n## 💡 洞察建议\n\n"
if top_niches_list:
top_niche = top_niches_list[0]
report += f"1. **最强细分领域**: '{top_niche['niche'].title()}' (平均 VPH {top_niche['avg_vph']:,.0f})\n"
report += f"2. **内容策略**: 可以围绕这些细分领域创作类似内容\n"
report += f"3. **竞争程度**: 视频数量 {top_niche['video_count']} 个,属于"
if top_niche['video_count'] < 5:
report += "**蓝海市场**(竞争少)\n"
elif top_niche['video_count'] < 10:
report += "**成长市场**(适度竞争)\n"
else:
report += "**红海市场**(竞争激烈)\n"
return [TextContent(type="text", text=report)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 发现细分领域失败: {str(e)}"
)]
# ═══════════════════════════════════════════════════════════════════════════
# 步骤 4: 启动 MCP Server
# ═══════════════════════════════════════════════════════════════════════════
def main():
"""
启动 MCP Server
这个函数会:
1. 创建异步事件循环
2. 使用 stdio_server 建立标准输入/输出通信
3. 运行 Server,等待 Claude Desktop 的调用
通信方式:
- Claude Desktop 启动这个 Python 进程
- 通过 stdin/stdout 发送 JSON-RPC 消息
- Server 处理请求并返回响应
- 整个过程对用户透明,用户只需用自然语言交互
"""
import asyncio
from mcp.server.stdio import stdio_server
async def run():
# stdio_server 是一个上下文管理器,提供读写流
async with stdio_server() as (read_stream, write_stream):
# 运行 Server,处理来自 Claude 的请求
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
# 启动异步事件循环
asyncio.run(run())
if __name__ == "__main__":
main()