"""YouTube API service wrapper for yt-fetch."""
import logging
import os
from typing import Any, Dict, List, Optional
from dateutil.parser import parse as parse_date
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from pydantic import BaseModel
from youtube_transcript_api import YouTubeTranscriptApi
logger = logging.getLogger(__name__)
class VideoInfo(BaseModel):
"""Video information model."""
video_id: str
title: str
description: str
channel_id: str
channel_title: str
published_at: str
duration: str
view_count: int
like_count: int
comment_count: int
thumbnail_url: str
tags: List[str] = []
class ChannelInfo(BaseModel):
"""Channel information model."""
channel_id: str
title: str
description: str
subscriber_count: int
video_count: int
view_count: int
thumbnail_url: str
published_at: str
class SearchResult(BaseModel):
"""Search result model."""
videos: List[VideoInfo]
total_results: int
next_page_token: Optional[str] = None
class YouTubeService:
"""Service for interacting with YouTube API and tldwatch."""
def __init__(self):
"""Initialize YouTube service."""
self.api_key = os.getenv("YOUTUBE_API_KEY")
if not self.api_key:
raise ValueError("YOUTUBE_API_KEY environment variable is required")
self.youtube = build("youtube", "v3", developerKey=self.api_key)
async def search_videos(
self,
query: str,
max_results: int = 10,
order: str = "relevance",
published_after: Optional[str] = None,
published_before: Optional[str] = None,
duration: Optional[str] = None,
video_type: Optional[str] = None,
region_code: Optional[str] = None,
) -> SearchResult:
"""Search for videos using YouTube API."""
try:
search_params = {
"part": "snippet",
"q": query,
"type": "video",
"maxResults": min(max_results, 50),
"order": order,
}
if published_after:
search_params["publishedAfter"] = self._format_datetime(published_after)
if published_before:
search_params["publishedBefore"] = self._format_datetime(
published_before
)
if duration:
search_params["videoDuration"] = duration
if video_type:
search_params["videoType"] = video_type
if region_code:
search_params["regionCode"] = region_code
search_response = self.youtube.search().list(**search_params).execute()
video_ids = [item["id"]["videoId"] for item in search_response["items"]]
videos = await self._get_video_details_batch(video_ids)
return SearchResult(
videos=videos,
total_results=search_response["pageInfo"]["totalResults"],
next_page_token=search_response.get("nextPageToken"),
)
except HttpError as e:
logger.error(f"YouTube API error: {e}")
raise
except Exception as e:
logger.error(f"Error searching videos: {e}")
raise
async def get_video_details(self, video_id: str) -> VideoInfo:
"""Get detailed information about a specific video."""
try:
videos = await self._get_video_details_batch([video_id])
if not videos:
raise ValueError(f"Video not found: {video_id}")
return videos[0]
except Exception as e:
logger.error(f"Error getting video details: {e}")
raise
async def get_channel_info(self, channel_id: str) -> ChannelInfo:
"""Get information about a YouTube channel."""
try:
response = (
self.youtube.channels()
.list(part="snippet,statistics", id=channel_id)
.execute()
)
if not response["items"]:
raise ValueError(f"Channel not found: {channel_id}")
channel = response["items"][0]
snippet = channel["snippet"]
stats = channel["statistics"]
return ChannelInfo(
channel_id=channel_id,
title=snippet["title"],
description=snippet.get("description", ""),
subscriber_count=int(stats.get("subscriberCount", 0)),
video_count=int(stats.get("videoCount", 0)),
view_count=int(stats.get("viewCount", 0)),
thumbnail_url=snippet["thumbnails"]["default"]["url"],
published_at=snippet["publishedAt"],
)
except HttpError as e:
logger.error(f"YouTube API error: {e}")
raise
except Exception as e:
logger.error(f"Error getting channel info: {e}")
raise
async def filter_videos(
self,
videos: List[VideoInfo],
min_views: Optional[int] = None,
max_views: Optional[int] = None,
min_duration: Optional[str] = None,
max_duration: Optional[str] = None,
keywords: Optional[List[str]] = None,
exclude_keywords: Optional[List[str]] = None,
) -> List[VideoInfo]:
"""Filter videos based on various criteria."""
filtered_videos = videos.copy()
if min_views is not None:
filtered_videos = [v for v in filtered_videos if v.view_count >= min_views]
if max_views is not None:
filtered_videos = [v for v in filtered_videos if v.view_count <= max_views]
if min_duration:
min_seconds = self._duration_to_seconds(min_duration)
filtered_videos = [
v
for v in filtered_videos
if self._duration_to_seconds(v.duration) >= min_seconds
]
if max_duration:
max_seconds = self._duration_to_seconds(max_duration)
filtered_videos = [
v
for v in filtered_videos
if self._duration_to_seconds(v.duration) <= max_seconds
]
if keywords:
filtered_videos = [
v
for v in filtered_videos
if any(
keyword.lower() in v.title.lower()
or keyword.lower() in v.description.lower()
for keyword in keywords
)
]
if exclude_keywords:
filtered_videos = [
v
for v in filtered_videos
if not any(
keyword.lower() in v.title.lower()
or keyword.lower() in v.description.lower()
for keyword in exclude_keywords
)
]
return filtered_videos
async def get_transcripts(
self,
video_ids: List[str],
analysis_type: str = "summary",
) -> Dict[str, Any]:
"""Extract transcripts from selected videos for Claude to analyze."""
results = {}
for video_id in video_ids:
try:
# Get transcript using youtube-transcript-api directly
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
transcript_text = " ".join([entry["text"] for entry in transcript_list])
# Get video metadata for context
video_response = (
self.youtube.videos()
.list(part="snippet,statistics", id=video_id)
.execute()
)
if video_response["items"]:
video_info = video_response["items"][0]
title = video_info["snippet"]["title"]
channel = video_info["snippet"]["channelTitle"]
description = (
video_info["snippet"]["description"][:500] + "..."
if len(video_info["snippet"]["description"]) > 500
else video_info["snippet"]["description"]
)
results[video_id] = {
"success": True,
"transcript": transcript_text,
"metadata": {
"title": title,
"channel": channel,
"description": description,
"video_url": f"https://youtube.com/watch?v={video_id}",
},
"type": analysis_type,
"word_count": len(transcript_text.split()),
}
else:
results[video_id] = {
"success": False,
"error": f"Video {video_id} not found",
}
except Exception as e:
results[video_id] = {
"success": False,
"error": f"Failed to get transcript: {str(e)}",
}
return results
async def trending_analysis(
self,
category_id: Optional[str] = None,
region_code: str = "US",
max_results: int = 25,
) -> List[VideoInfo]:
"""Get trending videos for analysis."""
try:
params = {
"part": "snippet",
"chart": "mostPopular",
"regionCode": region_code,
"maxResults": min(max_results, 50),
}
if category_id:
params["videoCategoryId"] = category_id
response = self.youtube.videos().list(**params).execute()
video_ids = [item["id"] for item in response["items"]]
videos = await self._get_video_details_batch(video_ids)
return videos
except HttpError as e:
logger.error(f"YouTube API error: {e}")
raise
except Exception as e:
logger.error(f"Error getting trending videos: {e}")
raise
async def _get_video_details_batch(self, video_ids: List[str]) -> List[VideoInfo]:
"""Get detailed information for multiple videos."""
if not video_ids:
return []
try:
response = (
self.youtube.videos()
.list(part="snippet,statistics,contentDetails", id=",".join(video_ids))
.execute()
)
videos = []
for item in response["items"]:
snippet = item["snippet"]
stats = item["statistics"]
content_details = item["contentDetails"]
video = VideoInfo(
video_id=item["id"],
title=snippet["title"],
description=snippet.get("description", ""),
channel_id=snippet["channelId"],
channel_title=snippet["channelTitle"],
published_at=snippet["publishedAt"],
duration=content_details["duration"],
view_count=int(stats.get("viewCount", 0)),
like_count=int(stats.get("likeCount", 0)),
comment_count=int(stats.get("commentCount", 0)),
thumbnail_url=snippet["thumbnails"]["default"]["url"],
tags=snippet.get("tags", []),
)
videos.append(video)
return videos
except HttpError as e:
logger.error(f"YouTube API error: {e}")
raise
except Exception as e:
logger.error(f"Error getting video details: {e}")
raise
def _format_datetime(self, date_str: str) -> str:
"""Format datetime string for YouTube API."""
try:
dt = parse_date(date_str)
return dt.isoformat() + "Z"
except Exception:
return date_str
def _duration_to_seconds(self, duration: str) -> int:
"""Convert ISO 8601 duration to seconds."""
import re
pattern = re.compile(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?")
match = pattern.match(duration)
if not match:
return 0
hours = int(match.group(1) or 0)
minutes = int(match.group(2) or 0)
seconds = int(match.group(3) or 0)
return hours * 3600 + minutes * 60 + seconds