Skip to main content
Glama
youtube_service.py12.6 kB
"""YouTube API service wrapper for yt-fetch.""" import logging import os from typing import Any, Dict, List, Optional from dateutil.parser import parse as parse_date from googleapiclient.discovery import build from googleapiclient.errors import HttpError from pydantic import BaseModel from youtube_transcript_api import YouTubeTranscriptApi logger = logging.getLogger(__name__) class VideoInfo(BaseModel): """Video information model.""" video_id: str title: str description: str channel_id: str channel_title: str published_at: str duration: str view_count: int like_count: int comment_count: int thumbnail_url: str tags: List[str] = [] class ChannelInfo(BaseModel): """Channel information model.""" channel_id: str title: str description: str subscriber_count: int video_count: int view_count: int thumbnail_url: str published_at: str class SearchResult(BaseModel): """Search result model.""" videos: List[VideoInfo] total_results: int next_page_token: Optional[str] = None class YouTubeService: """Service for interacting with YouTube API and tldwatch.""" def __init__(self): """Initialize YouTube service.""" self.api_key = os.getenv("YOUTUBE_API_KEY") if not self.api_key: raise ValueError("YOUTUBE_API_KEY environment variable is required") self.youtube = build("youtube", "v3", developerKey=self.api_key) async def search_videos( self, query: str, max_results: int = 10, order: str = "relevance", published_after: Optional[str] = None, published_before: Optional[str] = None, duration: Optional[str] = None, video_type: Optional[str] = None, region_code: Optional[str] = None, ) -> SearchResult: """Search for videos using YouTube API.""" try: search_params = { "part": "snippet", "q": query, "type": "video", "maxResults": min(max_results, 50), "order": order, } if published_after: search_params["publishedAfter"] = self._format_datetime(published_after) if published_before: search_params["publishedBefore"] = self._format_datetime( published_before ) if duration: search_params["videoDuration"] = duration if video_type: search_params["videoType"] = video_type if region_code: search_params["regionCode"] = region_code search_response = self.youtube.search().list(**search_params).execute() video_ids = [item["id"]["videoId"] for item in search_response["items"]] videos = await self._get_video_details_batch(video_ids) return SearchResult( videos=videos, total_results=search_response["pageInfo"]["totalResults"], next_page_token=search_response.get("nextPageToken"), ) except HttpError as e: logger.error(f"YouTube API error: {e}") raise except Exception as e: logger.error(f"Error searching videos: {e}") raise async def get_video_details(self, video_id: str) -> VideoInfo: """Get detailed information about a specific video.""" try: videos = await self._get_video_details_batch([video_id]) if not videos: raise ValueError(f"Video not found: {video_id}") return videos[0] except Exception as e: logger.error(f"Error getting video details: {e}") raise async def get_channel_info(self, channel_id: str) -> ChannelInfo: """Get information about a YouTube channel.""" try: response = ( self.youtube.channels() .list(part="snippet,statistics", id=channel_id) .execute() ) if not response["items"]: raise ValueError(f"Channel not found: {channel_id}") channel = response["items"][0] snippet = channel["snippet"] stats = channel["statistics"] return ChannelInfo( channel_id=channel_id, title=snippet["title"], description=snippet.get("description", ""), subscriber_count=int(stats.get("subscriberCount", 0)), video_count=int(stats.get("videoCount", 0)), view_count=int(stats.get("viewCount", 0)), thumbnail_url=snippet["thumbnails"]["default"]["url"], published_at=snippet["publishedAt"], ) except HttpError as e: logger.error(f"YouTube API error: {e}") raise except Exception as e: logger.error(f"Error getting channel info: {e}") raise async def filter_videos( self, videos: List[VideoInfo], min_views: Optional[int] = None, max_views: Optional[int] = None, min_duration: Optional[str] = None, max_duration: Optional[str] = None, keywords: Optional[List[str]] = None, exclude_keywords: Optional[List[str]] = None, ) -> List[VideoInfo]: """Filter videos based on various criteria.""" filtered_videos = videos.copy() if min_views is not None: filtered_videos = [v for v in filtered_videos if v.view_count >= min_views] if max_views is not None: filtered_videos = [v for v in filtered_videos if v.view_count <= max_views] if min_duration: min_seconds = self._duration_to_seconds(min_duration) filtered_videos = [ v for v in filtered_videos if self._duration_to_seconds(v.duration) >= min_seconds ] if max_duration: max_seconds = self._duration_to_seconds(max_duration) filtered_videos = [ v for v in filtered_videos if self._duration_to_seconds(v.duration) <= max_seconds ] if keywords: filtered_videos = [ v for v in filtered_videos if any( keyword.lower() in v.title.lower() or keyword.lower() in v.description.lower() for keyword in keywords ) ] if exclude_keywords: filtered_videos = [ v for v in filtered_videos if not any( keyword.lower() in v.title.lower() or keyword.lower() in v.description.lower() for keyword in exclude_keywords ) ] return filtered_videos async def get_transcripts( self, video_ids: List[str], analysis_type: str = "summary", ) -> Dict[str, Any]: """Extract transcripts from selected videos for Claude to analyze.""" results = {} for video_id in video_ids: try: # Get transcript using youtube-transcript-api directly transcript_list = YouTubeTranscriptApi.get_transcript(video_id) transcript_text = " ".join([entry["text"] for entry in transcript_list]) # Get video metadata for context video_response = ( self.youtube.videos() .list(part="snippet,statistics", id=video_id) .execute() ) if video_response["items"]: video_info = video_response["items"][0] title = video_info["snippet"]["title"] channel = video_info["snippet"]["channelTitle"] description = ( video_info["snippet"]["description"][:500] + "..." if len(video_info["snippet"]["description"]) > 500 else video_info["snippet"]["description"] ) results[video_id] = { "success": True, "transcript": transcript_text, "metadata": { "title": title, "channel": channel, "description": description, "video_url": f"https://youtube.com/watch?v={video_id}", }, "type": analysis_type, "word_count": len(transcript_text.split()), } else: results[video_id] = { "success": False, "error": f"Video {video_id} not found", } except Exception as e: results[video_id] = { "success": False, "error": f"Failed to get transcript: {str(e)}", } return results async def trending_analysis( self, category_id: Optional[str] = None, region_code: str = "US", max_results: int = 25, ) -> List[VideoInfo]: """Get trending videos for analysis.""" try: params = { "part": "snippet", "chart": "mostPopular", "regionCode": region_code, "maxResults": min(max_results, 50), } if category_id: params["videoCategoryId"] = category_id response = self.youtube.videos().list(**params).execute() video_ids = [item["id"] for item in response["items"]] videos = await self._get_video_details_batch(video_ids) return videos except HttpError as e: logger.error(f"YouTube API error: {e}") raise except Exception as e: logger.error(f"Error getting trending videos: {e}") raise async def _get_video_details_batch(self, video_ids: List[str]) -> List[VideoInfo]: """Get detailed information for multiple videos.""" if not video_ids: return [] try: response = ( self.youtube.videos() .list(part="snippet,statistics,contentDetails", id=",".join(video_ids)) .execute() ) videos = [] for item in response["items"]: snippet = item["snippet"] stats = item["statistics"] content_details = item["contentDetails"] video = VideoInfo( video_id=item["id"], title=snippet["title"], description=snippet.get("description", ""), channel_id=snippet["channelId"], channel_title=snippet["channelTitle"], published_at=snippet["publishedAt"], duration=content_details["duration"], view_count=int(stats.get("viewCount", 0)), like_count=int(stats.get("likeCount", 0)), comment_count=int(stats.get("commentCount", 0)), thumbnail_url=snippet["thumbnails"]["default"]["url"], tags=snippet.get("tags", []), ) videos.append(video) return videos except HttpError as e: logger.error(f"YouTube API error: {e}") raise except Exception as e: logger.error(f"Error getting video details: {e}") raise def _format_datetime(self, date_str: str) -> str: """Format datetime string for YouTube API.""" try: dt = parse_date(date_str) return dt.isoformat() + "Z" except Exception: return date_str def _duration_to_seconds(self, duration: str) -> int: """Convert ISO 8601 duration to seconds.""" import re pattern = re.compile(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?") match = pattern.match(duration) if not match: return 0 hours = int(match.group(1) or 0) minutes = int(match.group(2) or 0) seconds = int(match.group(3) or 0) return hours * 3600 + minutes * 60 + seconds

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/smith-nathanh/yt-fetch'

If you have feedback or need assistance with the MCP directory API, please join our Discord server