"""Category C: Channel & Playlist Forensics Tools
Provides deep channel and playlist analysis using scrapetube
for efficient listing operations.
"""
import asyncio
import scrapetube
from youtubesearchpython import Channel
from ..middleware.rate_limiter import rate_limiter
def extract_channel_id(url_or_id: str) -> str:
"""Extract channel ID from URL or return as-is."""
if "youtube.com" in url_or_id:
if "/channel/" in url_or_id:
return url_or_id.split("/channel/")[1].split("/")[0].split("?")[0]
elif "/@" in url_or_id:
# Handle at format - scrapetube can handle this
return url_or_id.split("/@")[1].split("/")[0].split("?")[0]
return url_or_id
def extract_playlist_id(url_or_id: str) -> str:
"""Extract playlist ID from URL or return as-is."""
if "youtube.com" in url_or_id and "list=" in url_or_id:
return url_or_id.split("list=")[1].split("&")[0]
return url_or_id
@rate_limiter
async def get_channel_videos(channel_id: str, sort_by: str = "newest", limit: int = 30) -> str:
"""
List all videos from a channel with sorting options.
Args:
channel_id: YouTube channel ID, @handle, or channel URL
sort_by: Sort order - 'newest', 'oldest', 'popular'
limit: Maximum number of videos (1-100, default: 30)
Returns:
Markdown-formatted channel video list
"""
channel_id = extract_channel_id(channel_id)
limit = max(1, min(limit, 100))
sort_mapping = {
"newest": "newest",
"oldest": "oldest",
"popular": "popular"
}
sort_order = sort_mapping.get(sort_by, "newest")
def _fetch():
videos = scrapetube.get_channel(channel_id, sort_by=sort_order, limit=limit)
return list(videos)
try:
videos = await asyncio.to_thread(_fetch)
if not videos:
return f"No videos found for channel: {channel_id}"
output = f"# Channel Videos: {channel_id}\n"
output += f"**Sort**: {sort_by} | **Count**: {len(videos)}\n\n"
for idx, video in enumerate(videos, 1):
video_id = video.get('videoId', '')
title = video.get('title', {}).get('runs', [{}])[0].get('text', 'N/A')
views = video.get('viewCountText', {}).get('simpleText', 'N/A')
published = video.get('publishedTimeText', {}).get('simpleText', 'N/A')
output += f"## {idx}. {title}\n"
output += f"- **Video ID**: {video_id}\n"
output += f"- **URL**: https://www.youtube.com/watch?v={video_id}\n"
output += f"- **Views**: {views}\n"
output += f"- **Published**: {published}\n\n"
return output
except Exception as e:
return f"❌ Error fetching channel videos for {channel_id}: {str(e)}"
@rate_limiter
async def get_channel_shorts(channel_id: str, limit: int = 30) -> str:
"""
List YouTube Shorts from a specific channel.
Args:
channel_id: YouTube channel ID, @handle, or channel URL
limit: Maximum number of shorts (1-100, default: 30)
Returns:
Markdown-formatted list of channel shorts
"""
channel_id = extract_channel_id(channel_id)
limit = max(1, min(limit, 100))
def _fetch():
shorts = scrapetube.get_channel(channel_id, content_type="shorts", limit=limit)
return list(shorts)
try:
shorts = await asyncio.to_thread(_fetch)
if not shorts:
return f"No shorts found for channel: {channel_id}"
output = f"# Channel Shorts: {channel_id}\n"
output += f"**Count**: {len(shorts)}\n\n"
for idx, short in enumerate(shorts, 1):
video_id = short.get('videoId', '')
title = short.get('title', {}).get('runs', [{}])[0].get('text', 'N/A')
views = short.get('viewCountText', {}).get('simpleText', 'N/A')
output += f"## {idx}. {title}\n"
output += f"- **Video ID**: {video_id}\n"
output += f"- **URL**: https://www.youtube.com/shorts/{video_id}\n"
output += f"- **Views**: {views}\n\n"
return output
except Exception as e:
return f"❌ Error fetching shorts for {channel_id}: {str(e)}"
@rate_limiter
async def get_channel_streams(channel_id: str, limit: int = 20) -> str:
"""
List live streams (past and present) from a channel.
Args:
channel_id: YouTube channel ID, @handle, or channel URL
limit: Maximum number of streams (1-50, default: 20)
Returns:
Markdown-formatted list of channel streams
"""
channel_id = extract_channel_id(channel_id)
limit = max(1, min(limit, 50))
def _fetch():
streams = scrapetube.get_channel(channel_id, content_type="streams", limit=limit)
return list(streams)
try:
streams = await asyncio.to_thread(_fetch)
if not streams:
return f"No streams found for channel: {channel_id}"
output = f"# Channel Streams: {channel_id}\n"
output += f"**Count**: {len(streams)}\n\n"
for idx, stream in enumerate(streams, 1):
video_id = stream.get('videoId', '')
title = stream.get('title', {}).get('runs', [{}])[0].get('text', 'N/A')
views = stream.get('viewCountText', {}).get('simpleText', 'N/A')
published = stream.get('publishedTimeText', {}).get('simpleText', 'N/A')
output += f"## {idx}. {title}\n"
output += f"- **Video ID**: {video_id}\n"
output += f"- **URL**: https://www.youtube.com/watch?v={video_id}\n"
output += f"- **Views**: {views}\n"
output += f"- **Published**: {published}\n\n"
return output
except Exception as e:
return f"❌ Error fetching streams for {channel_id}: {str(e)}"
@rate_limiter
async def get_playlist_items(playlist_id: str, limit: int = 50) -> str:
"""
Flatten a playlist into a list of video IDs and titles.
Args:
playlist_id: YouTube playlist ID or URL
limit: Maximum number of videos (1-500, default: 50)
Returns:
Markdown-formatted playlist contents
"""
playlist_id = extract_playlist_id(playlist_id)
limit = max(1, min(limit, 500))
def _fetch():
videos = scrapetube.get_playlist(playlist_id, limit=limit)
return list(videos)
try:
videos = await asyncio.to_thread(_fetch)
if not videos:
return f"No videos found in playlist: {playlist_id}"
output = f"# Playlist Contents: {playlist_id}\n"
output += f"**URL**: https://www.youtube.com/playlist?list={playlist_id}\n"
output += f"**Video Count**: {len(videos)}\n\n"
for idx, video in enumerate(videos, 1):
video_id = video.get('videoId', '')
title = video.get('title', {}).get('runs', [{}])[0].get('text', 'N/A')
output += f"{idx}. **{title}**\n"
output += f" - ID: `{video_id}`\n"
output += f" - URL: https://www.youtube.com/watch?v={video_id}\n\n"
return output
except Exception as e:
return f"❌ Error fetching playlist items for {playlist_id}: {str(e)}"
@rate_limiter
async def get_channel_about(channel_id: str) -> str:
"""
Get detailed channel information including description and statistics.
Args:
channel_id: YouTube channel ID, @handle, or channel URL
Returns:
Markdown-formatted channel about information
"""
channel_id = extract_channel_id(channel_id)
def _fetch():
channel = Channel.get(channel_id)
return channel
try:
info = await asyncio.to_thread(_fetch)
if not info:
return f"Channel not found: {channel_id}"
output = f"# Channel Information: {info.get('title', 'N/A')}\n\n"
output += "## Basic Info\n"
output += f"- **Channel ID**: {info.get('id', 'N/A')}\n"
output += f"- **Channel URL**: {info.get('url', 'N/A')}\n"
output += f"- **Subscribers**: {info.get('subscribers', {}).get('simpleText', 'N/A')}\n\n"
description = info.get('description', 'No description available')
output += "## Description\n"
output += f"{description}\n\n"
thumbnails = info.get('thumbnails', [])
if thumbnails:
output += "## Channel Banner\n"
output += f"{thumbnails[0].get('url', '')}\n\n"
return output
except Exception as e:
return f"❌ Error fetching channel info for {channel_id}: {str(e)}"