"""YouTube API 客户端"""
from datetime import datetime, timedelta
from typing import List, Optional
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from ..models.video import VideoData, ChannelData
from ..utils.config import config
class YouTubeClient:
"""YouTube Data API v3 客户端"""
def __init__(self, api_key: Optional[str] = None):
"""
初始化 YouTube 客户端
Args:
api_key: YouTube API Key,默认从配置读取
"""
self.api_key = api_key or config.get_api_key()
self.youtube = build('youtube', 'v3', developerKey=self.api_key)
def search_shorts(
self,
keyword: str = "",
hours_ago: int = 24,
max_results: int = 10
) -> List[str]:
"""
搜索 YouTube Shorts 视频
Args:
keyword: 搜索关键词(空字符串表示全局搜索)
hours_ago: 时间范围(小时)
max_results: 最大结果数
Returns:
视频 ID 列表
Raises:
HttpError: API 请求失败
"""
try:
# 计算时间阈值
time_threshold = (
datetime.utcnow() - timedelta(hours=hours_ago)
).isoformat() + 'Z'
# 搜索参数
search_params = {
'part': 'id,snippet',
'type': 'video',
'videoDuration': 'short', # 核心:筛选短视频
'order': 'viewCount',
'publishedAfter': time_threshold,
'maxResults': min(max_results, config.MAX_SEARCH_RESULTS),
'regionCode': 'US', # 可配置
}
# 如果有关键词,添加查询参数
if keyword.strip():
search_params['q'] = keyword
# 执行搜索
search_response = self.youtube.search().list(**search_params).execute()
# 提取视频 ID
video_ids = [
item['id']['videoId']
for item in search_response.get('items', [])
if item['id']['kind'] == 'youtube#video'
]
return video_ids
except HttpError as e:
if e.resp.status == 403:
raise Exception(
"YouTube API 配额已耗尽!\n"
f"错误详情: {e.error_details}\n"
"解决方案:\n"
"1. 等待配额重置(每日太平洋时间午夜)\n"
"2. 在 Google Cloud Console 申请配额提升"
) from e
raise Exception(f"YouTube API 错误: {e}") from e
def get_video_details(self, video_ids: List[str]) -> List[VideoData]:
"""
获取视频详细信息
Args:
video_ids: 视频 ID 列表
Returns:
VideoData 列表
"""
if not video_ids:
return []
try:
# 批量获取视频详情(最多 50 个)
videos_response = self.youtube.videos().list(
part='snippet,statistics,contentDetails',
id=','.join(video_ids[:50])
).execute()
video_data_list = []
for item in videos_response.get('items', []):
snippet = item['snippet']
stats = item.get('statistics', {})
content = item['contentDetails']
# 解析发布时间
published_at = datetime.fromisoformat(
snippet['publishedAt'].replace('Z', '+00:00')
)
# 构建视频数据
video_data = VideoData(
video_id=item['id'],
title=snippet['title'],
channel_name=snippet['channelTitle'],
channel_id=snippet['channelId'],
views=int(stats.get('viewCount', 0)),
likes=int(stats.get('likeCount', 0)),
comments=int(stats.get('commentCount', 0)),
published_at=published_at,
duration=content['duration'],
url=f"https://www.youtube.com/shorts/{item['id']}",
thumbnail_url=snippet['thumbnails']['high']['url'],
description=snippet.get('description', '') # 添加视频描述
)
video_data_list.append(video_data)
return video_data_list
except HttpError as e:
raise Exception(f"获取视频详情失败: {e}") from e
def get_channel_info(self, channel_ids: List[str]) -> dict[str, ChannelData]:
"""
获取频道信息
Args:
channel_ids: 频道 ID 列表
Returns:
频道 ID -> ChannelData 的字典
"""
if not channel_ids:
return {}
try:
channels_response = self.youtube.channels().list(
part='snippet,statistics',
id=','.join(set(channel_ids[:50])) # 去重并限制数量
).execute()
channel_map = {}
for item in channels_response.get('items', []):
stats = item.get('statistics', {})
channel_data = ChannelData(
channel_id=item['id'],
channel_name=item['snippet']['title'],
subscribers=int(stats.get('subscriberCount', 0)),
total_views=int(stats.get('viewCount', 0)),
video_count=int(stats.get('videoCount', 0))
)
channel_map[item['id']] = channel_data
return channel_map
except HttpError as e:
# 频道信息获取失败不应阻断主流程
print(f"警告: 获取频道信息失败: {e}")
return {}
def get_trending_shorts(
self,
keyword: str = "",
hours_ago: int = 24,
max_results: int = 10,
include_channel_info: bool = True
) -> List[VideoData]:
"""
获取热门 Shorts(完整流程)
Args:
keyword: 搜索关键词
hours_ago: 时间范围
max_results: 最大结果数
include_channel_info: 是否包含频道信息
Returns:
VideoData 列表
"""
# 1. 搜索视频
video_ids = self.search_shorts(keyword, hours_ago, max_results)
if not video_ids:
return []
# 2. 获取视频详情
videos = self.get_video_details(video_ids)
# 3. 获取频道信息(可选)
if include_channel_info:
channel_ids = [v.channel_id for v in videos]
channel_map = self.get_channel_info(channel_ids)
# 填充频道订阅数
for video in videos:
if video.channel_id in channel_map:
video.channel_subscribers = channel_map[video.channel_id].subscribers
return videos