"""Category A: Core Discovery (Search) Tools
Provides video, channel, and playlist search capabilities using
youtube-search-python and yt-dlp.
"""
import asyncio
from typing import Any
from youtubesearchpython import VideosSearch, ChannelsSearch, PlaylistsSearch, CustomSearch
from youtubesearchpython.internal.constants import (
VideoUploadDateFilter,
VideoDurationFilter,
VideoSortOrder,
)
import yt_dlp
from ..middleware.rate_limiter import rate_limiter
@rate_limiter
async def search_videos(query: str, limit: int = 10) -> str:
"""
Search YouTube for videos by keyword.
Args:
query: Search query string
limit: Maximum number of results (1-50, default: 10)
Returns:
Markdown-formatted list of videos with titles, channels, views, and URLs
"""
limit = max(1, min(limit, 50))
def _search():
search = VideosSearch(query, limit=limit)
return search.result()
result = await asyncio.to_thread(_search)
# Format for LLM consumption
videos = result.get("result", [])
if not videos:
return f"No videos found for query: {query}"
output = f"# Search Results for: {query}\n\n"
for idx, video in enumerate(videos, 1):
title = video.get("title", "N/A")
channel = video.get("channel", {}).get("name", "Unknown")
views = video.get("viewCount", {}).get("short", "0")
duration = video.get("duration", "N/A")
url = video.get("link", "")
thumbnail = video.get("thumbnails", [{}])[0].get("url", "")
output += f"## {idx}. {title}\n"
output += f"- **Channel**: {channel}\n"
output += f"- **Views**: {views}\n"
output += f"- **Duration**: {duration}\n"
output += f"- **URL**: {url}\n"
if thumbnail:
output += f"- **Thumbnail**: {thumbnail}\n"
output += "\n"
return output
@rate_limiter
async def search_filtered(
query: str,
upload_date: str = "any",
duration: str = "any",
sort: str = "relevance",
limit: int = 10
) -> str:
"""
Advanced video search with filters.
Args:
query: Search query string
upload_date: Filter by upload date - 'hour', 'today', 'week', 'month', 'year', 'any'
duration: Filter by duration - 'short' (<4min), 'medium' (4-20min), 'long' (>20min), 'any'
sort: Sort order - 'relevance', 'upload_date', 'view_count', 'rating'
limit: Maximum number of results (1-50, default: 10)
Returns:
Markdown-formatted filtered search results
"""
limit = max(1, min(limit, 50))
# Map string inputs to library constants
date_filters = {
"hour": VideoUploadDateFilter.lastHour,
"today": VideoUploadDateFilter.today,
"week": VideoUploadDateFilter.thisWeek,
"month": VideoUploadDateFilter.thisMonth,
"year": VideoUploadDateFilter.thisYear,
"any": None
}
duration_filters = {
"short": VideoDurationFilter.short,
"medium": VideoDurationFilter.medium,
"long": VideoDurationFilter.long,
"any": None
}
sort_orders = {
"relevance": VideoSortOrder.relevance,
"upload_date": VideoSortOrder.uploadDate,
"view_count": VideoSortOrder.viewCount,
"rating": VideoSortOrder.rating,
}
date_filter = date_filters.get(upload_date)
duration_filter = duration_filters.get(duration)
sort_order = sort_orders.get(sort, VideoSortOrder.relevance)
def _search():
search = CustomSearch(
query,
searchPreferences=sort_order,
limit=limit
)
return search.result()
result = await asyncio.to_thread(_search)
videos = result.get("result", [])
if not videos:
return f"No videos found matching filters for: {query}"
output = f"# Filtered Search: {query}\n"
output += f"**Filters**: Upload: {upload_date}, Duration: {duration}, Sort: {sort}\n\n"
for idx, video in enumerate(videos, 1):
title = video.get("title", "N/A")
channel = video.get("channel", {}).get("name", "Unknown")
views = video.get("viewCount", {}).get("short", "0")
published = video.get("publishedTime", "N/A")
url = video.get("link", "")
output += f"## {idx}. {title}\n"
output += f"- **Channel**: {channel}\n"
output += f"- **Views**: {views}\n"
output += f"- **Published**: {published}\n"
output += f"- **URL**: {url}\n\n"
return output
@rate_limiter
async def get_trending_videos(limit: int = 20) -> str:
"""
Fetch currently trending videos from YouTube.
Uses yt-dlp to extract the trending playlist without downloading.
Args:
limit: Maximum number of trending videos (1-50, default: 20)
Returns:
Markdown-formatted list of trending videos
"""
limit = max(1, min(limit, 50))
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': True,
'playlist_items': f'1-{limit}',
'skip_download': True,
}
def _fetch():
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info('https://www.youtube.com/feed/trending', download=False)
return info
try:
info = await asyncio.to_thread(_fetch)
entries = info.get('entries', [])
if not entries:
return "No trending videos found."
output = "# Trending Videos on YouTube\n\n"
for idx, entry in enumerate(entries[:limit], 1):
if entry:
title = entry.get('title', 'N/A')
video_id = entry.get('id', '')
url = f"https://www.youtube.com/watch?v={video_id}" if video_id else "N/A"
output += f"## {idx}. {title}\n"
output += f"- **URL**: {url}\n\n"
return output
except Exception as e:
return f"Error fetching trending videos: {str(e)}"
@rate_limiter
async def find_channels(query: str, limit: int = 10) -> str:
"""
Search for YouTube channels by name or topic.
Args:
query: Channel search query
limit: Maximum number of results (1-30, default: 10)
Returns:
Markdown-formatted channel information
"""
limit = max(1, min(limit, 30))
def _search():
search = ChannelsSearch(query, limit=limit)
return search.result()
result = await asyncio.to_thread(_search)
channels = result.get("result", [])
if not channels:
return f"No channels found for: {query}"
output = f"# Channel Search: {query}\n\n"
for idx, channel in enumerate(channels, 1):
title = channel.get("title", "N/A")
subscribers = channel.get("subscribers", {}).get("simpleText", "N/A")
description = channel.get("descriptionSnippet", [{}])[0].get("text", "No description")
url = channel.get("link", "")
thumbnails = channel.get("thumbnails", [])
thumbnail = thumbnails[0].get("url", "") if thumbnails else ""
output += f"## {idx}. {title}\n"
output += f"- **Subscribers**: {subscribers}\n"
output += f"- **Description**: {description[:150]}...\n" if len(description) > 150 else f"- **Description**: {description}\n"
output += f"- **URL**: {url}\n"
if thumbnail:
output += f"- **Thumbnail**: {thumbnail}\n"
output += "\n"
return output
@rate_limiter
async def find_playlists(query: str, limit: int = 10) -> str:
"""
Search for YouTube playlists by keyword.
Args:
query: Playlist search query
limit: Maximum number of results (1-30, default: 10)
Returns:
Markdown-formatted playlist information
"""
limit = max(1, min(limit, 30))
def _search():
search = PlaylistsSearch(query, limit=limit)
return search.result()
result = await asyncio.to_thread(_search)
playlists = result.get("result", [])
if not playlists:
return f"No playlists found for: {query}"
output = f"# Playlist Search: {query}\n\n"
for idx, playlist in enumerate(playlists, 1):
title = playlist.get("title", "N/A")
channel = playlist.get("channel", {}).get("name", "Unknown")
video_count = playlist.get("videoCount", "N/A")
url = playlist.get("link", "")
output += f"## {idx}. {title}\n"
output += f"- **Channel**: {channel}\n"
output += f"- **Videos**: {video_count}\n"
output += f"- **URL**: {url}\n\n"
return output