"""YouTube transcript fetching tool for MCP server."""
import os
import re
from pathlib import Path
from typing import Literal, Optional
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import (
TranscriptsDisabled,
NoTranscriptFound,
VideoUnavailable,
InvalidVideoId,
)
def extract_video_id(url_or_id: str) -> Optional[str]:
"""Extract video ID from YouTube URL or return as-is if already an ID."""
if len(url_or_id) > 500:
return None
patterns = [
r'(?:v=|\/)([0-9A-Za-z_-]{11}).*',
r'(?:embed\/)([0-9A-Za-z_-]{11})',
r'^([0-9A-Za-z_-]{11})$'
]
for pattern in patterns:
match = re.search(pattern, url_or_id)
if match and re.match(r'^[0-9A-Za-z_-]{11}$', match.group(1)):
# Sanitize for logging
return re.sub(r'[^\w-]', '', match.group(1))
return None
def create_api_client() -> YouTubeTranscriptApi:
"""Create YouTubeTranscriptApi client with optional env configuration."""
kwargs = {}
cookies_path = os.environ.get('YOUTUBE_COOKIES')
if cookies_path:
expanded = Path(cookies_path).expanduser()
if expanded.exists() and expanded.is_file():
kwargs['cookies'] = str(expanded)
proxy_http = os.environ.get('YOUTUBE_PROXY_HTTP')
proxy_https = os.environ.get('YOUTUBE_PROXY_HTTPS')
if proxy_http or proxy_https:
from youtube_transcript_api.proxies import GenericProxyConfig
kwargs['proxy_config'] = GenericProxyConfig(
http_url=proxy_http,
https_url=proxy_https or proxy_http
)
return YouTubeTranscriptApi(**kwargs)
def format_transcript(
transcript_data,
format_type: Literal["plain", "structured", "srt", "vtt"] = "plain"
) -> str | dict:
"""Format transcript data according to requested format."""
if format_type == "plain":
return '\n'.join(entry.text for entry in transcript_data)
elif format_type == "structured":
return {
"entries": [
{
"text": entry.text,
"start": entry.start,
"duration": entry.duration
}
for entry in transcript_data
]
}
elif format_type == "srt":
# SRT subtitle format
srt_output = []
for idx, entry in enumerate(transcript_data, 1):
start = _format_timestamp_srt(entry.start)
end = _format_timestamp_srt(entry.start + entry.duration)
srt_output.append(f"{idx}\n{start} --> {end}\n{entry.text}\n")
return '\n'.join(srt_output)
elif format_type == "vtt":
# WebVTT format
vtt_output = ["WEBVTT\n"]
for entry in transcript_data:
start = _format_timestamp_vtt(entry.start)
end = _format_timestamp_vtt(entry.start + entry.duration)
vtt_output.append(f"{start} --> {end}\n{entry.text}\n")
return '\n'.join(vtt_output)
return '\n'.join(entry.text for entry in transcript_data)
def _format_timestamp_srt(seconds: float) -> str:
"""Format seconds to SRT timestamp (HH:MM:SS,mmm)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
def _format_timestamp_vtt(seconds: float) -> str:
"""Format seconds to WebVTT timestamp (HH:MM:SS.mmm)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d}.{millis:03d}"
def get_transcript(
video_url_or_id: str,
languages: Optional[list[str]] = None,
format_type: Literal["plain", "structured", "srt", "vtt"] = "plain"
) -> dict:
"""
Fetch YouTube video transcript.
Args:
video_url_or_id: YouTube URL or 11-character video ID
languages: Preferred languages in order (default: ["en"])
format_type: Output format (plain, structured, srt, vtt)
Returns:
{
"success": bool,
"video_id": str,
"transcript": str | dict,
"language": str,
"is_generated": bool,
"error": str | None,
"message": str | None
}
"""
video_id = extract_video_id(video_url_or_id)
if not video_id:
return {
"success": False,
"error": "INVALID_VIDEO_ID",
"message": "Could not extract valid YouTube video ID from input"
}
languages = languages or ['en']
try:
api = create_api_client()
transcript_list = api.list(video_id)
# Try to find transcript in preferred languages
transcript_obj = None
for lang in languages:
try:
transcript_obj = transcript_list.find_transcript([lang])
break
except NoTranscriptFound:
continue
# Fallback to first available if preferred not found
if not transcript_obj:
transcript_obj = next(iter(transcript_list))
# Fetch the actual transcript data
transcript_data = transcript_obj.fetch()
# Format according to requested type
formatted = format_transcript(transcript_data, format_type)
return {
"success": True,
"video_id": video_id,
"transcript": formatted,
"language": transcript_obj.language_code,
"is_generated": transcript_obj.is_generated,
"format": format_type
}
except TranscriptsDisabled:
return {
"success": False,
"video_id": video_id,
"error": "TRANSCRIPTS_DISABLED",
"message": "Transcripts are disabled for this video"
}
except NoTranscriptFound:
return {
"success": False,
"video_id": video_id,
"error": "NO_TRANSCRIPT_FOUND",
"message": f"No transcript found for languages: {', '.join(languages)}"
}
except VideoUnavailable:
return {
"success": False,
"video_id": video_id,
"error": "VIDEO_UNAVAILABLE",
"message": "Video is unavailable or private"
}
except InvalidVideoId:
return {
"success": False,
"video_id": video_id,
"error": "INVALID_VIDEO_ID",
"message": "Invalid YouTube video ID"
}
except Exception as e:
return {
"success": False,
"video_id": video_id,
"error": "UNKNOWN_ERROR",
"message": f"Unexpected error: {type(e).__name__}: {str(e)}"
}
def list_transcript_languages(video_url_or_id: str) -> dict:
"""
List all available transcript languages for a video.
Returns:
{
"success": bool,
"video_id": str,
"languages": [
{
"code": str,
"name": str,
"is_generated": bool,
"is_translatable": bool
}
]
}
"""
video_id = extract_video_id(video_url_or_id)
if not video_id:
return {
"success": False,
"error": "INVALID_VIDEO_ID",
"message": "Could not extract valid YouTube video ID from input"
}
try:
api = create_api_client()
transcript_list = api.list(video_id)
languages = []
for transcript in transcript_list:
languages.append({
"code": transcript.language_code,
"name": transcript.language,
"is_generated": transcript.is_generated,
"is_translatable": transcript.is_translatable
})
return {
"success": True,
"video_id": video_id,
"languages": languages
}
except (TranscriptsDisabled, NoTranscriptFound, VideoUnavailable, InvalidVideoId) as e:
return {
"success": False,
"video_id": video_id,
"error": type(e).__name__.upper(),
"message": str(e)
}
except Exception as e:
return {
"success": False,
"video_id": video_id,
"error": "UNKNOWN_ERROR",
"message": f"Unexpected error: {type(e).__name__}: {str(e)}"
}