get_youtube_shorts_trends
Analyze YouTube Shorts to identify trending videos by tracking views, engagement, and publication time within specified timeframes.
Instructions
发现 YouTube Shorts 上的潜在热门视频。通过分析播放量、互动率和发布时间,识别高质量的短视频。
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| keyword | No | 搜索关键词(留空表示全局趋势) | |
| hours_ago | No | 时间范围(小时),默认 24 小时,最大 30 天 | |
| max_results | No | 返回结果数量,默认 10 | |
| min_views | No | 最低播放量阈值,默认 100,000 |
Implementation Reference
- src/server.py:375-455 (handler)The main handler function that implements get_youtube_shorts_trends. It orchestrates fetching trending videos from YouTube, analyzing them with ViralAnalyzer, filtering by minimum views, ranking, and generating a Markdown report with statistics.async def get_youtube_shorts_trends( keyword: str = "", hours_ago: int = 24, max_results: int = 10, min_views: int = 100000 ) -> list[TextContent]: """ 工具 1 的实现:获取 YouTube Shorts 热门趋势 这里是真正的业务逻辑: 1. 调用 YouTube API 搜索视频 2. 使用爆款分析算法计算指标 3. 筛选和排序 4. 生成 Markdown 格式的报告 """ try: # 初始化客户端和分析器 client = YouTubeClient() analyzer = ViralAnalyzer() # 获取视频数据 videos = client.get_trending_shorts( keyword=keyword, hours_ago=hours_ago, max_results=max_results ) if not videos: return [TextContent( type="text", text=f"未找到符合条件的视频。\n\n搜索参数:\n- 关键词: {keyword or '全局'}\n- 时间范围: 最近 {hours_ago} 小时" )] # 分析视频(计算互动率) videos = analyzer.analyze_videos(videos) # 筛选和排序 videos = analyzer.filter_by_views(videos, min_views=min_views) videos = analyzer.rank_videos(videos) if not videos: return [TextContent( type="text", text=f"未找到满足条件的视频(播放量 >= {min_views:,})。\n\n建议:\n- 降低筛选阈值\n- 扩大时间范围\n- 尝试其他关键词" )] # 生成 Markdown 报告 report = f"# 🔥 YouTube Shorts 热门视频报告\n\n" report += f"**搜索参数**\n" report += f"- 关键词: {keyword or '全局趋势'}\n" report += f"- 时间范围: 最近 {hours_ago} 小时\n" if min_views > 0: report += f"- 播放量阈值: {min_views:,}\n" report += f"- 找到视频: {len(videos)} 个\n\n" report += "---\n\n" report += VideoData.markdown_header() + "\n" for video in videos: report += video.to_markdown_row() + "\n" # 添加统计摘要 report += f"\n---\n\n## 📊 数据摘要\n\n" avg_engagement = sum(v.engagement_rate for v in videos) / len(videos) avg_views = sum(v.views for v in videos) / len(videos) top_video = videos[0] report += f"- **平均播放量**: {avg_views:,.0f}\n" report += f"- **平均互动率**: {avg_engagement:.2f}%\n" report += f"- **最高播放量**: {top_video.views:,} ([{top_video.title[:30]}...]({top_video.url}))\n" # 返回 TextContent 对象 return [TextContent(type="text", text=report)] except Exception as e: return [TextContent( type="text", text=f"❌ 获取趋势失败: {str(e)}" )]
- src/server.py:144-180 (schema)The Tool schema definition that declares the tool name, description, and inputSchema with JSON Schema validation for parameters: keyword (string), hours_ago (integer, 1-720), max_results (integer, 1-50), and min_views (integer, minimum 0).Tool( name="get_youtube_shorts_trends", description=( "发现 YouTube Shorts 上的潜在热门视频。" "通过分析播放量、互动率和发布时间,识别高质量的短视频。" ), inputSchema={ "type": "object", "properties": { "keyword": { "type": "string", "description": "搜索关键词(留空表示全局趋势)", "default": "" }, "hours_ago": { "type": "integer", "description": "时间范围(小时),默认 24 小时,最大 30 天", "default": 24, "minimum": 1, "maximum": 720 }, "max_results": { "type": "integer", "description": "返回结果数量,默认 10", "default": 10, "minimum": 1, "maximum": 50 }, "min_views": { "type": "integer", "description": "最低播放量阈值,默认 100,000", "default": 100000, "minimum": 0 } } } ),
- src/server.py:302-368 (registration)The call_tool registration handler decorated with @app.call_tool() that routes tool invocation requests. Lines 325-331 show the routing logic for get_youtube_shorts_trends, extracting arguments and calling the handler function.@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """ 当 Claude 调用工具时,这个函数会被执行 参数: name: 工具名称(来自 list_tools 中定义的 name) arguments: 工具参数(JSON 对象,符合 inputSchema) 返回值: TextContent 列表,包含要返回给 Claude 的文本内容 工作流程: 1. Claude 解析用户的自然语言请求 2. 选择合适的工具和参数 3. 调用这个函数 4. 我们执行业务逻辑(调用 YouTube API、分析数据) 5. 返回格式化的结果 6. Claude 将结果展示给用户 """ try: # 根据工具名称路由到对应的处理函数 if name == "get_youtube_shorts_trends": return await get_youtube_shorts_trends( keyword=arguments.get("keyword", ""), hours_ago=arguments.get("hours_ago", 24), max_results=arguments.get("max_results", 10), min_views=arguments.get("min_views", 100000) ) elif name == "analyze_video_potential": return await analyze_video_potential( video_url=arguments["video_url"] ) elif name == "get_trending_topics": return await get_trending_topics( category=arguments.get("category", "all"), hours_ago=arguments.get("hours_ago", 24) ) elif name == "summarize_video_story": return await summarize_video_story( video_url=arguments["video_url"] ) elif name == "discover_niche_trends": return await discover_niche_trends( main_topic=arguments["main_topic"], hours_ago=arguments.get("hours_ago", 24), min_videos=arguments.get("min_videos", 3), top_niches=arguments.get("top_niches", 10) ) else: return [TextContent( type="text", text=f"未知工具: {name}" )] except Exception as e: # 错误处理:返回友好的错误信息 return [TextContent( type="text", text=f"❌ 执行失败: {str(e)}" )]
- src/youtube/client.py:182-220 (helper)Helper method YouTubeClient.get_trending_shorts() that orchestrates the complete YouTube API workflow: searching for shorts by keyword, fetching video details, and optionally retrieving channel information. This is called by the main handler.def get_trending_shorts( self, keyword: str = "", hours_ago: int = 24, max_results: int = 10, include_channel_info: bool = True ) -> List[VideoData]: """ 获取热门 Shorts(完整流程) Args: keyword: 搜索关键词 hours_ago: 时间范围 max_results: 最大结果数 include_channel_info: 是否包含频道信息 Returns: VideoData 列表 """ # 1. 搜索视频 video_ids = self.search_shorts(keyword, hours_ago, max_results) if not video_ids: return [] # 2. 获取视频详情 videos = self.get_video_details(video_ids) # 3. 获取频道信息(可选) if include_channel_info: channel_ids = [v.channel_id for v in videos] channel_map = self.get_channel_info(channel_ids) # 填充频道订阅数 for video in videos: if video.channel_id in channel_map: video.channel_subscribers = channel_map[video.channel_id].subscribers return videos
- src/youtube/analyzer.py:58-102 (helper)Helper methods in ViralAnalyzer used by the handler: analyze_videos() for calculating engagement rates, filter_by_views() for applying minimum view thresholds, and rank_videos() for sorting by view count (descending).@classmethod def analyze_videos(cls, videos: List[VideoData]) -> List[VideoData]: """ 批量分析视频 Args: videos: 视频列表 Returns: 分析后的视频列表 """ return [cls.analyze_video(video) for video in videos] @staticmethod def rank_videos(videos: List[VideoData]) -> List[VideoData]: """ 按播放量排序视频 Args: videos: 视频列表 Returns: 排序后的视频列表(降序) """ return sorted(videos, key=lambda v: v.views, reverse=True) @staticmethod def filter_by_views( videos: List[VideoData], min_views: int = 0 ) -> List[VideoData]: """ 按播放量阈值筛选视频 Args: videos: 视频列表 min_views: 最低播放量阈值 (默认 0,不筛选) Returns: 筛选后的视频列表 """ if min_views == 0: return videos return [v for v in videos if v.views >= min_views]