Skip to main content
Glama
ytdlp_service.py47.1 kB
""" YouTube integration using yt-dlp. This module provides enhanced YouTube integration with improved video quality filtering, metadata extraction, and support for playlists. """ import yt_dlp import asyncio import uuid import re from typing import List, Dict, Any, Optional from datetime import datetime from infrastructure.logging import logger from infrastructure.cache import cache from infrastructure.config import config from api.models import Resource from services.youtube.youtube_service import YouTubeService class YtDlpService(YouTubeService): """ YouTube integration using yt-dlp. Implements the YouTubeService interface with enhanced features: - Quality filtering for videos - Improved metadata extraction - Support for playlists - Relevance scoring for better results """ # Quality filter constants - further relaxed for better coverage MIN_VIEWS = 200 # Reduced from 500 to 200 minimum views MIN_DURATION_SECONDS = 20 # Reduced from 30 to 20 seconds minimum duration MAX_DURATION_SECONDS = 7200 # Increased from 5400 to 7200 seconds (2 hours) maximum duration MAX_AGE_DAYS = 3650 # Increased from 2555 to 3650 days (10 years) maximum age # Relevance scoring weights TITLE_MATCH_WEIGHT = 3.0 DESCRIPTION_MATCH_WEIGHT = 1.5 VIEWS_WEIGHT = 1.0 RECENCY_WEIGHT = 1.0 DURATION_WEIGHT = 1.0 LIKE_RATIO_WEIGHT = 2.0 # List of search term templates for subtopics - expanded for better coverage SUBTOPIC_SEARCH_TERMS = [ "{topic} tutorial", "{topic} guide", "{topic} explained", "{topic} how to", "{topic} examples", "{topic} course", "{topic} for beginners", "{topic} introduction", "{topic} overview", "{topic} fundamentals", "{topic} basics", "{topic} learn", "{topic} lecture", "{topic} class", "{topic} lesson", "{topic} concepts", "{topic} principles", "{topic} techniques", "{topic} methods", "{topic} tips", "{topic} best practices", "{topic} in depth", "{topic} masterclass", "{topic} crash course" ] # Language prefix mapping LANGUAGE_PREFIXES = { "pt": "português ", "en": "english ", "es": "español ", "fr": "français ", "de": "deutsch ", "it": "italiano ", "ru": "русский ", "ja": "日本語 ", "zh": "中文 " } # Prefixes to remove from subtopics for better search results PREFIXES_TO_REMOVE = [ "Introduction to", "Getting Started with", "Understanding", "Basics of", "Advanced", "Mastering", "Practical", "Exploring", "Deep Dive into", "Essential", "Fundamentals of", "Working with", "Building with", "Developing with", "Professional", "Modern", "Effective", "Efficient", "Introdução a", "Introdução ao", "Conceitos de", "Fundamentos de", "Avançado", "Prático", "Explorando", "Essencial", "Trabalhando com", "Desenvolvendo com", "Profissional", "Moderno", "Eficiente" ] def __init__(self, cache_ttl: int = 86400): """ Initialize the YouTube service. Args: cache_ttl: Cache TTL in seconds (default: 1 day) """ self.cache_ttl = cache_ttl self.logger = logger.get_logger("youtube.ytdlp") # Get configuration youtube_config = config.get_section("YOUTUBE") self.max_results_default = youtube_config.get("max_results", 5) self.timeout = youtube_config.get("timeout", 12) # Increased from 8 to 12 seconds for better results # Configure yt-dlp common options with improved settings for better performance self.common_ydl_opts = { 'quiet': True, 'no_warnings': True, 'ignoreerrors': True, 'skip_download': True, 'extract_flat': 'in_playlist', 'format': 'best', 'socket_timeout': 10, # Increased from 8 to 10 seconds for more reliable connections 'retries': 5, # Increased from 3 to 5 retries for better reliability 'fragment_retries': 3, # Increased from 2 to 3 retries 'skip_unavailable_fragments': True, 'no_color': True, 'geo_bypass': True, # Try to bypass geo-restrictions 'geo_bypass_country': 'US', # Use US as default country for geo-bypass 'sleep_interval': 0.5, # Reduced from 1 to 0.5 seconds for faster processing 'max_sleep_interval': 3, # Reduced from 5 to 3 seconds for faster processing 'extractor_retries': 3, # Added extractor retries 'skip_playlist_after_errors': 3, # Skip playlist after 3 errors 'postprocessor_args': { 'ffmpeg': ['-threads', '4'] # Use 4 threads for ffmpeg processing }, } # Create a cache for video details to avoid redundant lookups self._video_details_cache = {} self.logger.info("Initialized YtDlpService with optimized settings") async def search_videos(self, query: str, max_results: int = None, language: str = "en") -> List[Dict[str, Any]]: """ Search for YouTube videos with enhanced quality filtering and relevance scoring. Args: query: Search query max_results: Maximum number of results to return language: Language code (e.g., 'en', 'pt') Returns: List of dictionaries with video information, sorted by relevance """ if max_results is None: max_results = self.max_results_default # Check cache first cache_key = f"youtube:search:{query}_{max_results}_{language}" cached_result = cache.get(cache_key) if cached_result: self.logger.debug(f"Using cached YouTube search results for '{query}'") return cached_result # Configure yt-dlp options ydl_opts = { 'quiet': True, 'no_warnings': True, 'ignoreerrors': True, 'extract_flat': 'in_playlist', 'skip_download': True, 'format': 'best', } # Add language prefix to query lang_prefix = self.LANGUAGE_PREFIXES.get(language, "") # Request more results than needed to allow for filtering search_query = f"ytsearch{max_results*4}:{lang_prefix}{query}" try: # Run search asynchronously loop = asyncio.get_event_loop() results = await loop.run_in_executor( None, lambda: self._extract_info_with_ytdlp(search_query, ydl_opts) ) # Process results videos = [] for entry in results: # Check if it's a valid video if entry.get('_type') == 'url' and 'youtube' in entry.get('url', ''): # Extract duration in minutes duration_seconds = entry.get('duration') duration_minutes = int(duration_seconds / 60) if duration_seconds else None # Get thumbnail thumbnail = self._get_best_thumbnail(entry) # Create video info video = { 'id': entry.get('id', uuid.uuid4().hex[:8]), 'title': entry.get('title', ''), 'url': entry.get('url', ''), 'description': entry.get('description', '') or f"Channel: {entry.get('uploader', '')}", 'duration': duration_minutes, 'duration_seconds': duration_seconds, 'thumbnail': thumbnail, 'channel': entry.get('uploader', ''), 'publishedAt': entry.get('upload_date', ''), 'viewCount': entry.get('view_count', 0), 'likeCount': entry.get('like_count', 0), 'tags': entry.get('tags', []) } # Get detailed information for better filtering and scoring if video['id']: detailed_info = await self.get_video_details(video['id']) if detailed_info: # Update with more detailed information video.update({ 'viewCount': detailed_info.get('viewCount', video['viewCount']), 'likeCount': detailed_info.get('likeCount', video['likeCount']), 'tags': detailed_info.get('tags', video['tags']), 'description': detailed_info.get('description', video['description']) }) # Calculate relevance score video['relevance_score'] = self._score_video(video, query) # Apply quality filters if self._filter_video_by_quality(video): videos.append(video) # Sort videos by relevance score (descending) videos.sort(key=lambda v: v.get('relevance_score', 0), reverse=True) # Limit to max_results videos = videos[:max_results] # Remove scoring information before caching for video in videos: if 'relevance_score' in video: del video['relevance_score'] # Cache the results if videos: cache.setex(cache_key, self.cache_ttl, videos) self.logger.debug(f"Cached YouTube search results for '{query}' ({len(videos)} videos)") else: self.logger.warning(f"No YouTube videos found for '{query}'") return videos except Exception as e: self.logger.error(f"Error searching YouTube for '{query}': {str(e)}") return [] async def get_video_details(self, video_id: str) -> Optional[Dict[str, Any]]: """ Get details for a specific video. Args: video_id: YouTube video ID Returns: Dictionary with video details or None if not found """ # Check cache first cache_key = f"youtube:video:{video_id}" cached_result = cache.get(cache_key) if cached_result: self.logger.debug(f"Using cached YouTube video details for '{video_id}'") return cached_result # Configure yt-dlp options ydl_opts = { 'quiet': True, 'no_warnings': True, 'ignoreerrors': True, 'skip_download': True, 'format': 'best', } try: # Run extraction asynchronously loop = asyncio.get_event_loop() video_url = f"https://www.youtube.com/watch?v={video_id}" result = await loop.run_in_executor( None, lambda: self._extract_video_info(video_url, ydl_opts) ) if not result: self.logger.warning(f"No details found for YouTube video '{video_id}'") return None # Process result duration_seconds = result.get('duration') duration_minutes = int(duration_seconds / 60) if duration_seconds else None # Get thumbnail thumbnail = self._get_best_thumbnail(result) # Create video info video = { 'id': result.get('id', video_id), 'title': result.get('title', ''), 'url': video_url, 'description': result.get('description', ''), 'duration': duration_minutes, 'thumbnail': thumbnail, 'channel': result.get('uploader', ''), 'publishedAt': result.get('upload_date', ''), 'viewCount': result.get('view_count'), 'likeCount': result.get('like_count'), 'tags': result.get('tags', []) } # Cache the result cache.setex(cache_key, self.cache_ttl, video) self.logger.debug(f"Cached YouTube video details for '{video_id}'") return video except Exception as e: self.logger.error(f"Error getting YouTube video details for '{video_id}': {str(e)}") return None async def search_playlists(self, query: str, max_results: int = 3, language: str = "en") -> List[Dict[str, Any]]: """ Search for YouTube playlists related to a query. Args: query: Search query max_results: Maximum number of playlists to return language: Language code (e.g., 'en', 'pt') Returns: List of dictionaries with playlist information """ # Check cache first cache_key = f"youtube:playlists:{query}_{max_results}_{language}" cached_result = cache.get(cache_key) if cached_result: self.logger.debug(f"Using cached YouTube playlist results for '{query}'") return cached_result # Configure yt-dlp options - use minimal options for faster processing ydl_opts = { 'quiet': True, 'no_warnings': True, 'ignoreerrors': True, 'extract_flat': True, 'skip_download': True, 'format': 'best', 'socket_timeout': 3, # Short timeout 'retries': 1, # Minimal retries } # Add language prefix to query lang_prefix = self.LANGUAGE_PREFIXES.get(language, "") # Request fewer playlists to speed up processing search_query = f"ytsearchplaylist{max_results}:{lang_prefix}{query}" try: # Run search asynchronously with a timeout loop = asyncio.get_event_loop() extract_task = loop.run_in_executor( None, lambda: self._extract_info_with_ytdlp(search_query, ydl_opts) ) # Set a timeout for the extraction try: results = await asyncio.wait_for(extract_task, timeout=4) # 4 second timeout except asyncio.TimeoutError: self.logger.warning(f"Timeout searching playlists for '{query}'") return [] # Process results - simplified processing playlists = [] for entry in results: # Check if it's a valid playlist if entry and entry.get('_type') == 'url' and 'youtube.com/playlist' in entry.get('url', ''): # Extract playlist ID from URL url = entry.get('url', '') playlist_id = None if 'list=' in url: playlist_id = url.split('list=')[1].split('&')[0] if not playlist_id: continue # Create playlist info - minimal information for speed playlist = { 'id': playlist_id, 'title': entry.get('title', ''), 'url': url, 'description': entry.get('description', '') or f"Playlist by: {entry.get('uploader', '')}", 'channel': entry.get('uploader', ''), 'thumbnail': entry.get('thumbnail', '') } playlists.append(playlist) # Stop after finding the first valid playlist to save time if len(playlists) >= max_results: break # Cache the results if playlists: cache.setex(cache_key, self.cache_ttl, playlists) self.logger.debug(f"Cached YouTube playlist results for '{query}' ({len(playlists)} playlists)") else: self.logger.warning(f"No YouTube playlists found for '{query}'") return playlists except Exception as e: self.logger.error(f"Error searching YouTube playlists for '{query}': {str(e)}") return [] async def get_playlist_videos(self, playlist_id: str, max_videos: int = 5, language: str = "en") -> List[Dict[str, Any]]: """ Get videos from a YouTube playlist. Args: playlist_id: YouTube playlist ID max_videos: Maximum number of videos to return language: Language code (e.g., 'en', 'pt') Returns: List of dictionaries with video information """ # Check cache first cache_key = f"youtube:playlist_videos:{playlist_id}_{max_videos}_{language}" cached_result = cache.get(cache_key) if cached_result: self.logger.debug(f"Using cached YouTube playlist videos for '{playlist_id}'") return cached_result # Configure yt-dlp options - use minimal options for faster processing ydl_opts = { 'quiet': True, 'no_warnings': True, 'ignoreerrors': True, 'extract_flat': True, 'skip_download': True, 'format': 'best', 'socket_timeout': 3, # Short timeout 'retries': 1, # Minimal retries 'playlistend': max_videos, # Limit the number of videos to extract } playlist_url = f"https://www.youtube.com/playlist?list={playlist_id}" try: # Run extraction asynchronously with a timeout loop = asyncio.get_event_loop() extract_task = loop.run_in_executor( None, lambda: self._extract_info_with_ytdlp(playlist_url, ydl_opts) ) # Set a timeout for the extraction try: result = await asyncio.wait_for(extract_task, timeout=4) # 4 second timeout except asyncio.TimeoutError: self.logger.warning(f"Timeout getting videos from playlist '{playlist_id}'") return [] # Process results - simplified processing videos = [] playlist_title = result.get('title', '') if 'entries' in result: # Limit the number of entries to process entries = result['entries'][:max_videos] # Process entries for entry in entries: # Skip non-video entries if not entry or entry.get('_type') != 'url' or 'youtube' not in entry.get('url', ''): continue video_id = entry.get('id') if not video_id: continue # Create a minimal video object from the entry data # This avoids the need for additional API calls video = { 'id': video_id, 'title': entry.get('title', ''), 'url': entry.get('url', ''), 'description': entry.get('description', '') or f"Video from playlist: {playlist_title}", 'duration': entry.get('duration'), 'thumbnail': entry.get('thumbnail'), 'playlistId': playlist_id, 'playlistTitle': playlist_title, 'isFromPlaylist': True } videos.append(video) if len(videos) >= max_videos: break # Cache the results if videos: cache.setex(cache_key, self.cache_ttl, videos) self.logger.debug(f"Cached YouTube playlist videos for '{playlist_id}' ({len(videos)} videos)") else: self.logger.warning(f"No videos found in YouTube playlist '{playlist_id}'") return videos except Exception as e: self.logger.error(f"Error getting videos from YouTube playlist '{playlist_id}': {str(e)}") return [] async def search_videos_for_topic(self, topic: str, subtopic: str = None, max_results: int = 3, language: str = "en") -> List[Resource]: """ Search for videos related to a topic and convert to Resource objects. Enhanced implementation with multiple search strategies and fallbacks. Optimized for better performance and reliability. Args: topic: Main topic subtopic: Optional subtopic for more specific results max_results: Maximum number of results to return language: Language code (e.g., 'en', 'pt') Returns: List of Resource objects """ # Check cache first for this specific topic/subtopic combination cache_key = f"youtube:topic:{topic}_{subtopic}_{max_results}_{language}" cached_result = cache.get(cache_key, resource_type='resource_list') if cached_result: # If we have a small number of cached results, consider refreshing in the background if len(cached_result) < max_results: self.logger.info(f"Found only {len(cached_result)} cached results for '{topic}', will refresh in background") # Schedule a background refresh but still return cached results asyncio.create_task(self._refresh_topic_cache(topic, subtopic, max_results, language, cache_key)) else: self.logger.info(f"Using cached YouTube topic results for '{topic}' ({len(cached_result)} videos)") return cached_result # Generate multiple search queries for better coverage search_queries = self._generate_search_queries(topic, subtopic, language) # Track all videos found across all queries all_videos = [] existing_ids = set() # Limit the number of queries to try based on topic complexity max_queries_to_try = min(5, len(search_queries)) if ' ' not in topic: # Simple topics need fewer queries max_queries_to_try = min(3, max_queries_to_try) # Use a longer timeout for the first query, shorter for subsequent ones first_query_timeout = 15 # 15 seconds for first query subsequent_query_timeout = 8 # 8 seconds for subsequent queries # Try each query until we have enough results or run out of queries for i, query_info in enumerate(search_queries[:max_queries_to_try]): query = query_info["query"] query_type = query_info["type"] self.logger.info(f"Trying {query_type} query: '{query}'") # Skip if we already have enough videos if len(all_videos) >= max_results * 1.5: # Get 50% more than needed for better filtering break # Set timeout based on query position query_timeout = first_query_timeout if i == 0 else subsequent_query_timeout # Try this query with a timeout try: # Adjust max_results based on how many we still need needed_results = max(1, max_results - len(all_videos)) # For first query, request more results if i == 0: request_count = needed_results * 3 # Request 3x for first query else: request_count = needed_results * 2 # Request 2x for subsequent queries videos_task = asyncio.create_task(self.search_videos(query, request_count, language)) videos = await asyncio.wait_for(videos_task, timeout=query_timeout) # Add new videos to our collection, avoiding duplicates for video in videos: video_id = video.get('id') if video_id and video_id not in existing_ids: all_videos.append(video) existing_ids.add(video_id) self.logger.info(f"Found {len(videos)} videos for query '{query}', total now: {len(all_videos)}") # If we got good results from the first query, we can be more selective with subsequent ones if i == 0 and len(videos) >= max_results: max_queries_to_try = min(3, max_queries_to_try) # Reduce max queries if first query was successful except asyncio.TimeoutError: self.logger.warning(f"Timeout searching videos for '{query}'") continue except Exception as e: self.logger.error(f"Error searching videos for '{query}': {str(e)}") continue # If we still don't have enough videos, try searching for playlists if len(all_videos) < max_results: self.logger.info(f"Only found {len(all_videos)} videos, trying playlists") # Use the main topic for playlist search playlist_query = topic if subtopic: # Include subtopic for more specific playlists playlist_query = f"{topic} {subtopic}" try: # Search for playlists with a timeout playlists_task = asyncio.create_task(self.search_playlists(playlist_query, 2, language)) playlists = await asyncio.wait_for(playlists_task, timeout=6) # Increased timeout # Try each playlist until we have enough videos for playlist in playlists: # Skip if we already have enough videos if len(all_videos) >= max_results: break playlist_id = playlist.get('id') if not playlist_id: continue self.logger.info(f"Trying playlist: {playlist.get('title', 'Unknown')}") # Get videos from this playlist try: needed_videos = max(1, max_results - len(all_videos)) playlist_videos_task = asyncio.create_task( self.get_playlist_videos(playlist_id, max_videos=needed_videos * 2, language=language) ) playlist_videos = await asyncio.wait_for(playlist_videos_task, timeout=6) # Increased timeout # Add playlist videos to results, avoiding duplicates for video in playlist_videos: video_id = video.get('id') if video_id and video_id not in existing_ids: all_videos.append(video) existing_ids.add(video_id) self.logger.info(f"Added {len(playlist_videos)} videos from playlist, total now: {len(all_videos)}") except (asyncio.TimeoutError, Exception) as e: self.logger.warning(f"Error or timeout getting playlist videos: {str(e)}") continue except (asyncio.TimeoutError, Exception) as e: self.logger.warning(f"Error or timeout searching playlists: {str(e)}") # Sort videos by relevance to the topic/subtopic if subtopic: relevance_query = f"{topic} {subtopic}" else: relevance_query = topic for video in all_videos: video['relevance_score'] = self._score_video(video, relevance_query) all_videos.sort(key=lambda v: v.get('relevance_score', 0), reverse=True) # Limit to max_results final_videos = all_videos[:max_results] # Convert to Resource objects resources = self._convert_videos_to_resources(final_videos, subtopic=subtopic, is_subtopic=bool(subtopic)) # Cache the results if resources: # Use appropriate TTL based on result count if len(resources) < max_results // 2: # Fewer results than half requested - cache for 12 hours cache_ttl = min(43200, self.cache_ttl) self.logger.info(f"Caching {len(resources)} resources for '{topic}' with shorter TTL (12 hours)") else: # Good number of results - cache for normal TTL cache_ttl = self.cache_ttl self.logger.info(f"Caching {len(resources)} resources for '{topic}' with normal TTL") cache.setex(cache_key, cache_ttl, resources) else: self.logger.warning(f"No YouTube videos found for '{topic}'") return resources def _generate_search_queries(self, topic: str, subtopic: str = None, language: str = "en") -> List[Dict[str, str]]: """ Generate multiple search queries for a topic/subtopic combination. Args: topic: Main topic subtopic: Optional subtopic language: Language code Returns: List of query dictionaries with 'query' and 'type' keys """ queries = [] lang_prefix = self.LANGUAGE_PREFIXES.get(language, "") # If we have a subtopic, create specific queries if subtopic: # Clean subtopic for better search results clean_subtopic = self._clean_subtopic(subtopic) # Add direct subtopic query queries.append({ "query": f"{lang_prefix}{clean_subtopic} {topic}", "type": "direct_subtopic" }) # Add formatted subtopic queries for template in self.SUBTOPIC_SEARCH_TERMS[:8]: # Use first 8 templates search_term = template.format(topic=clean_subtopic) queries.append({ "query": f"{lang_prefix}{search_term} {topic}", "type": "formatted_subtopic" }) # Add topic-only query as fallback queries.append({ "query": f"{lang_prefix}{topic}", "type": "topic_only" }) else: # For main topic, start with direct query queries.append({ "query": f"{lang_prefix}{topic}", "type": "direct_topic" }) # Add some formatted topic queries for template in self.SUBTOPIC_SEARCH_TERMS[:6]: # Use first 6 templates search_term = template.format(topic=topic) queries.append({ "query": f"{lang_prefix}{search_term}", "type": "formatted_topic" }) # Add simplified query if topic has multiple words if ' ' in topic: main_word = topic.split()[0] queries.append({ "query": f"{lang_prefix}{main_word}", "type": "simplified_topic" }) if subtopic and ' ' in subtopic: main_subtopic_word = subtopic.split()[0] queries.append({ "query": f"{lang_prefix}{main_subtopic_word} {main_word}", "type": "simplified_subtopic" }) return queries async def _refresh_topic_cache(self, topic: str, subtopic: str, max_results: int, language: str, cache_key: str) -> None: """ Refresh the cache for a topic in the background. Args: topic: Main topic subtopic: Optional subtopic max_results: Maximum number of results to return language: Language code cache_key: Cache key to update """ try: self.logger.debug(f"Background refreshing cache for topic '{topic}'") # Get the current cache value to compare later current_cached = cache.get(cache_key, resource_type='resource_list') current_count = len(current_cached) if current_cached else 0 # Generate multiple search queries search_queries = self._generate_search_queries(topic, subtopic, language) # Track all videos all_videos = [] existing_ids = set() # Try each query for query_info in search_queries[:3]: # Limit to first 3 queries for background refresh query = query_info["query"] try: videos = await self.search_videos(query, max_results * 2, language) # Add new videos, avoiding duplicates for video in videos: video_id = video.get('id') if video_id and video_id not in existing_ids: all_videos.append(video) existing_ids.add(video_id) except Exception: continue if len(all_videos) >= max_results * 1.5: break # Sort by relevance relevance_query = f"{topic} {subtopic}" if subtopic else topic for video in all_videos: video['relevance_score'] = self._score_video(video, relevance_query) all_videos.sort(key=lambda v: v.get('relevance_score', 0), reverse=True) # Limit to max_results final_videos = all_videos[:max_results] # Convert to Resource objects resources = self._convert_videos_to_resources(final_videos, subtopic=subtopic, is_subtopic=bool(subtopic)) # Only update cache if we found more results than before if len(resources) > current_count: self.logger.info(f"Background refresh found {len(resources)} resources (was {current_count})") cache.setex(cache_key, self.cache_ttl, resources) else: self.logger.info(f"Background refresh didn't improve results ({len(resources)} ≤ {current_count})") except Exception as e: self.logger.error(f"Error in background topic cache refresh: {str(e)}") def _convert_videos_to_resources(self, videos: List[Dict[str, Any]], subtopic: str = None, is_subtopic: bool = False) -> List[Resource]: """ Convert video dictionaries to Resource objects. Args: videos: List of video dictionaries subtopic: Optional subtopic is_subtopic: Whether this is a subtopic search Returns: List of Resource objects """ resources = [] for video in videos: # Determine difficulty based on duration and other factors difficulty = "intermediate" duration_minutes = video.get('duration', 0) if duration_minutes: if duration_minutes < 5: difficulty = "beginner" elif duration_minutes > 30: difficulty = "advanced" resource = Resource( id=f"youtube_{video.get('id')}", title=video.get('title', ''), url=video.get('url', ''), type="video", description=video.get('description', ''), duration=video.get('duration'), readTime=None, difficulty=difficulty, thumbnail=video.get('thumbnail') ) # Add subtopic information if applicable if is_subtopic and subtopic: resource.title = f"{resource.title} - Relevante para: {subtopic}" # Add playlist information if applicable if video.get('isFromPlaylist'): resource.description = f"{resource.description}\n\nParte da playlist: {video.get('playlistTitle', '')}" resource.metadata = { "playlistId": video.get('playlistId'), "playlistTitle": video.get('playlistTitle') } resources.append(resource) return resources def _extract_info_with_ytdlp(self, search_query: str, ydl_opts: dict) -> List[dict]: """ Extract information from videos using yt-dlp. Args: search_query: Search query ydl_opts: yt-dlp options Returns: List of video information """ # Merge with common options merged_opts = {**self.common_ydl_opts, **ydl_opts} # Add a timeout to the entire operation try: with yt_dlp.YoutubeDL(merged_opts) as ydl: # Set a timeout for the extraction process ydl.params['socket_timeout'] = 5 # 5 seconds timeout # Extract info with timeout result = ydl.extract_info(search_query, download=False) if result and 'entries' in result: # Filter out None entries that might cause issues entries = [entry for entry in result['entries'] if entry is not None] return entries return [] except yt_dlp.utils.DownloadError as e: # More specific error handling for common YouTube issues if "This video is not available" in str(e): self.logger.warning(f"Video not available: {str(e)}") elif "Video unavailable" in str(e): self.logger.warning(f"Video unavailable: {str(e)}") else: self.logger.error(f"Error extracting info with yt-dlp: {str(e)}") return [] except Exception as e: self.logger.error(f"Error extracting info with yt-dlp: {str(e)}") return [] def _extract_video_info(self, video_url: str, ydl_opts: dict) -> Optional[dict]: """ Extract information for a specific video using yt-dlp. Args: video_url: Video URL ydl_opts: yt-dlp options Returns: Video information or None if not found """ # Check in-memory cache first video_id = video_url.split('v=')[-1].split('&')[0] if 'v=' in video_url else None if video_id and video_id in self._video_details_cache: return self._video_details_cache[video_id] # Merge with common options merged_opts = {**self.common_ydl_opts, **ydl_opts} # Use a more minimal set of options for single video extraction merged_opts['extract_flat'] = False # We want full details for a single video merged_opts['writesubtitles'] = False merged_opts['writeautomaticsub'] = False merged_opts['allsubtitles'] = False merged_opts['playlist_items'] = '1' # Only extract the first item if it's a playlist try: with yt_dlp.YoutubeDL(merged_opts) as ydl: # Set a shorter timeout for single video extraction ydl.params['socket_timeout'] = 3 # 3 seconds timeout # Extract info with timeout result = ydl.extract_info(video_url, download=False, process=False) # Cache the result in memory if result and video_id: self._video_details_cache[video_id] = result return result except yt_dlp.utils.DownloadError as e: # More specific error handling for common YouTube issues if "This video is not available" in str(e): self.logger.warning(f"Video not available: {str(e)}") elif "Video unavailable" in str(e): self.logger.warning(f"Video unavailable: {str(e)}") else: self.logger.error(f"Error extracting video info with yt-dlp: {str(e)}") return None except Exception as e: self.logger.error(f"Error extracting video info with yt-dlp: {str(e)}") return None def _get_best_thumbnail(self, video_info: Dict[str, Any]) -> Optional[str]: """ Get the best thumbnail available for a video. Args: video_info: Video information Returns: URL of the best thumbnail or None if not found """ # Check if thumbnails are available thumbnails = video_info.get('thumbnails', []) if not thumbnails: # Fallback to default YouTube thumbnail video_id = video_info.get('id') if video_id: return f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg" return None # Sort thumbnails by resolution (width x height) sorted_thumbnails = sorted( thumbnails, key=lambda t: (t.get('width', 0) * t.get('height', 0)), reverse=True ) # Return the URL of the best thumbnail return sorted_thumbnails[0].get('url') if sorted_thumbnails else None def _parse_duration(self, duration_str: str) -> Optional[int]: """ Convert a duration string to minutes. Args: duration_str: Duration string (e.g., "PT1H30M15S" or "1:30:15") Returns: Duration in minutes or None if conversion is not possible """ if not duration_str: return None # ISO 8601 format (PT1H30M15S) iso_match = re.match(r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?', duration_str) if iso_match: hours = int(iso_match.group(1) or 0) minutes = int(iso_match.group(2) or 0) seconds = int(iso_match.group(3) or 0) return hours * 60 + minutes + (1 if seconds > 30 else 0) # HH:MM:SS or MM:SS format time_match = re.match(r'(?:(\d+):)?(\d+):(\d+)', duration_str) if time_match: hours = int(time_match.group(1) or 0) minutes = int(time_match.group(2) or 0) seconds = int(time_match.group(3) or 0) return hours * 60 + minutes + (1 if seconds > 30 else 0) return None def _clean_subtopic(self, subtopic: str) -> str: """ Clean a subtopic for better search results. Args: subtopic: Subtopic to clean Returns: Cleaned subtopic """ clean_subtopic = subtopic # Remove common prefixes that might interfere with search for prefix in self.PREFIXES_TO_REMOVE: if clean_subtopic.startswith(prefix): clean_subtopic = clean_subtopic[len(prefix):].strip() break return clean_subtopic def _score_video(self, video: Dict[str, Any], query: str) -> float: """ Calculate a relevance score for a video based on various factors. Args: video: Video information dictionary query: Original search query Returns: Relevance score (higher is better) """ score = 0.0 # Extract video data title = video.get('title', '').lower() description = video.get('description', '').lower() view_count = video.get('viewCount', 0) or 0 duration_seconds = video.get('duration', 0) * 60 if video.get('duration') else 0 like_count = video.get('likeCount', 0) or 0 # Parse upload date upload_date_str = video.get('publishedAt', '') upload_date = None if upload_date_str: try: # YouTube API format: YYYYMMDD if len(upload_date_str) == 8: upload_date = datetime.strptime(upload_date_str, '%Y%m%d') # ISO format elif 'T' in upload_date_str: upload_date = datetime.fromisoformat(upload_date_str.replace('Z', '+00:00')) except ValueError: pass # Calculate title match score query_terms = query.lower().split() title_match_count = sum(1 for term in query_terms if term in title) title_match_score = title_match_count / max(1, len(query_terms)) score += title_match_score * self.TITLE_MATCH_WEIGHT # Calculate description match score description_match_count = sum(1 for term in query_terms if term in description) description_match_score = description_match_count / max(1, len(query_terms)) score += description_match_score * self.DESCRIPTION_MATCH_WEIGHT # Calculate view count score (logarithmic scale) if view_count > 0: view_score = min(1.0, max(0.0, (min(10000000, view_count) / 10000000))) score += view_score * self.VIEWS_WEIGHT # Calculate recency score if upload_date: days_old = (datetime.now() - upload_date).days recency_score = max(0.0, 1.0 - (days_old / self.MAX_AGE_DAYS)) score += recency_score * self.RECENCY_WEIGHT # Calculate duration score (prefer videos between 5-30 minutes) if duration_seconds: if duration_seconds < 300: # Less than 5 minutes duration_score = duration_seconds / 300 elif duration_seconds <= 1800: # 5-30 minutes (ideal) duration_score = 1.0 else: # More than 30 minutes duration_score = max(0.0, 1.0 - ((duration_seconds - 1800) / 1800)) score += duration_score * self.DURATION_WEIGHT # Calculate like ratio score if view_count > 0 and like_count > 0: like_ratio = min(1.0, like_count / max(1, view_count / 10)) # Assuming 10% like rate is good score += like_ratio * self.LIKE_RATIO_WEIGHT return score def _filter_video_by_quality(self, video: Dict[str, Any]) -> bool: """ Check if a video meets the quality criteria. Args: video: Video information dictionary Returns: True if the video meets quality criteria, False otherwise """ # Check view count view_count = video.get('viewCount', 0) or 0 if view_count < self.MIN_VIEWS: return False # Check duration duration_seconds = video.get('duration', 0) * 60 if video.get('duration') else 0 if duration_seconds < self.MIN_DURATION_SECONDS or duration_seconds > self.MAX_DURATION_SECONDS: return False # Check age upload_date_str = video.get('publishedAt', '') if upload_date_str: try: # YouTube API format: YYYYMMDD if len(upload_date_str) == 8: upload_date = datetime.strptime(upload_date_str, '%Y%m%d') days_old = (datetime.now() - upload_date).days if days_old > self.MAX_AGE_DAYS: return False except ValueError: pass return True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cabrit0/mcp_server_reuneMacacada'

If you have feedback or need assistance with the MCP directory API, please join our Discord server