We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/walksoda/crawl-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
YouTube Processing Module for Transcript Extraction
Handles YouTube video transcript retrieval using youtube-transcript-api v1.1.0+
Simple and reliable transcript extraction without complex authentication
"""
import asyncio
import re
import logging
import os
from typing import Dict, List, Optional, Any, Union
from urllib.parse import urlparse, parse_qs
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from youtube_transcript_api.formatters import TextFormatter
class YouTubeProcessor:
"""Process YouTube videos and extract transcripts"""
def __init__(self):
self.formatter = TextFormatter()
self.youtube_patterns = [
r'(?:https?://)?(?:www\.)?youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})',
r'(?:https?://)?(?:www\.)?youtu\.be/([a-zA-Z0-9_-]{11})',
r'(?:https?://)?(?:www\.)?youtube\.com/embed/([a-zA-Z0-9_-]{11})',
r'(?:https?://)?(?:www\.)?youtube\.com/v/([a-zA-Z0-9_-]{11})',
]
def is_youtube_url(self, url: str) -> bool:
"""Check if URL is a YouTube video URL"""
try:
for pattern in self.youtube_patterns:
if re.search(pattern, url):
return True
return False
except Exception:
return False
def extract_video_id(self, url: str) -> Optional[str]:
"""Extract video ID from YouTube URL"""
try:
for pattern in self.youtube_patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
except Exception:
return None
def _get_transcript_list(self, video_id):
"""
youtube-transcript-api v1.2.1の新APIを使用してtranscript_listを返す。
"""
from youtube_transcript_api import YouTubeTranscriptApi
api = YouTubeTranscriptApi()
return api.list(video_id)
def get_video_info(self, video_id: str) -> Dict[str, Any]:
"""Get basic video information and available transcripts"""
try:
# Get transcript list to determine available languages
transcript_list = self._get_transcript_list(video_id)
available_languages = []
manual_transcripts = []
auto_transcripts = []
for transcript in transcript_list:
lang_info = {
'language': transcript.language,
'language_code': transcript.language_code,
'is_generated': transcript.is_generated,
'is_translatable': transcript.is_translatable
}
available_languages.append(lang_info)
if transcript.is_generated:
auto_transcripts.append(lang_info)
else:
manual_transcripts.append(lang_info)
return {
'video_id': video_id,
'has_transcripts': len(available_languages) > 0,
'total_transcripts': len(available_languages),
'manual_transcripts': len(manual_transcripts),
'auto_transcripts': len(auto_transcripts),
'available_languages': available_languages,
'manual_languages': manual_transcripts,
'auto_languages': auto_transcripts,
'api_version': 'youtube-transcript-api-1.1.0+'
}
except Exception as e:
error_message = str(e)
# Handle specific errors with clearer messages
if "no element found" in error_message.lower() or "parseerror" in error_message.lower():
error_message = "Video transcript parsing failed - this may be a temporary YouTube API issue"
elif "video unavailable" in error_message.lower():
error_message = "Video is unavailable, private, or does not exist"
elif "transcripts disabled" in error_message.lower():
error_message = "Transcripts are disabled for this video"
elif "http error" in error_message.lower():
error_message = f"Network error accessing video: {error_message}"
return {
'video_id': video_id,
'has_transcripts': False,
'total_transcripts': 0,
'manual_transcripts': 0,
'auto_transcripts': 0,
'available_languages': [],
'manual_languages': [],
'auto_languages': [],
'error': error_message,
'api_version': 'youtube-transcript-api-1.1.0+'
}
def get_available_transcript_languages(self, video_id: str) -> List[Dict[str, Any]]:
"""Get available transcript languages for a video"""
try:
# Get transcript list to determine available languages
transcript_list = self._get_transcript_list(video_id)
available_languages = []
for transcript in transcript_list:
lang_info = {
'language': transcript.language,
'language_code': transcript.language_code,
'is_generated': transcript.is_generated,
'is_translatable': transcript.is_translatable
}
available_languages.append(lang_info)
return available_languages
except Exception as e:
# Return empty list if no transcripts available or error occurs
return []
async def extract_transcript(
self,
video_id: str,
languages: Optional[List[str]] = None,
translate_to: Optional[str] = None,
include_timestamps: bool = True,
preserve_formatting: bool = True
) -> Dict[str, Any]:
"""Extract transcript from YouTube video"""
try:
# Default language preferences
if languages is None:
languages = ['ja', 'en', 'en-US', 'en-GB']
# Get transcript using modern API approach with enhanced error handling
transcript_list = self._get_transcript_list(video_id)
if translate_to:
# Get any available transcript and translate
try:
transcript = transcript_list.find_transcript(languages)
translated_transcript = transcript.translate(translate_to)
transcript_data = translated_transcript.fetch()
source_language = transcript.language_code
final_language = translate_to
is_translated = True
except Exception as e:
# Try to get any available transcript for translation
try:
transcript = transcript_list.find_transcript(['en', 'ja', 'es', 'fr', 'de', 'it', 'pt', 'ru'])
translated_transcript = transcript.translate(translate_to)
transcript_data = translated_transcript.fetch()
source_language = transcript.language_code
final_language = translate_to
is_translated = True
except Exception as e2:
return {
'success': False,
'error': f'No transcripts available for translation to {translate_to}. Original error: {str(e)}',
'video_id': video_id,
'available_transcripts': [t.language_code for t in transcript_list],
'suggestion': 'Try get_youtube_video_info to see available transcript languages'
}
else:
# Get transcript in preferred language using modern approach
try:
transcript = transcript_list.find_transcript(languages)
transcript_data = transcript.fetch()
source_language = transcript.language_code
final_language = transcript.language_code
is_translated = False
except Exception as e:
return {
'success': False,
'error': f'No transcripts found in requested languages {languages}. Error: {str(e)}',
'video_id': video_id,
'available_transcripts': [t.language_code for t in transcript_list],
'suggestion': 'Try get_youtube_video_info to see available transcript languages, or use batch_extract_youtube_transcripts for alternative methods'
}
if not transcript_data:
return {
'success': False,
'error': 'No transcript data found',
'video_id': video_id
}
# Process transcript data
# Note: youtube-transcript-api v1.2.x returns FetchedTranscriptSnippet objects
# with text, start, duration as attributes (not dict keys)
full_text = ""
segments = []
for entry in transcript_data:
try:
# Support both dict (old API) and object (new API v1.2.x) formats
if hasattr(entry, 'text'):
text = entry.text
start_time = entry.start
duration = entry.duration
else:
text = entry.get('text', '')
start_time = entry.get('start', 0)
duration = entry.get('duration', 0)
if include_timestamps:
timestamp = self._format_timestamp(start_time)
if preserve_formatting:
full_text += f"[{timestamp}] {text}\n"
else:
full_text += f"{text} "
else:
full_text += f"{text} "
segments.append({
'text': text,
'start': start_time,
'duration': duration,
'end': start_time + duration
})
except Exception as e:
# Skip malformed entries but continue processing
continue
# Get clean text without timestamps
try:
clean_text = self.formatter.format_transcript(transcript_data)
except Exception as e:
# Fallback: create clean text manually - support both object and dict formats
clean_texts = []
for entry in transcript_data:
if hasattr(entry, 'text'):
clean_texts.append(entry.text)
elif hasattr(entry, 'get') and entry.get('text'):
clean_texts.append(entry.get('text'))
clean_text = " ".join(clean_texts)
# Calculate statistics
total_duration = max([seg['end'] for seg in segments]) if segments else 0
word_count = len(clean_text.split())
return {
'success': True,
'video_id': video_id,
'source_language': source_language,
'final_language': final_language,
'is_translated': is_translated,
'transcript_data': {
'full_text': full_text.strip(),
'clean_text': clean_text,
'segments': segments,
'segment_count': len(segments),
'word_count': word_count,
'duration_seconds': total_duration,
'duration_formatted': self._format_duration(total_duration)
}
}
except TranscriptsDisabled:
return {
'success': False,
'error': 'Transcripts are disabled for this video',
'video_id': video_id
}
except NoTranscriptFound:
return {
'success': False,
'error': f'No transcript found in languages: {languages}',
'video_id': video_id,
'requested_languages': languages
}
except Exception as e:
error_message = str(e)
# Handle specific errors with helpful messages
if "no element found" in error_message.lower() or "parseerror" in error_message.lower():
error_message = "YouTube transcript parsing failed - this may be a temporary issue with YouTube's servers"
elif "http error" in error_message.lower():
error_message = f"Network error accessing video: {error_message}"
elif "video unavailable" in error_message.lower():
error_message = "Video is unavailable, private, or has been removed"
elif "could not retrieve" in error_message.lower():
error_message = "Could not retrieve transcript data from YouTube"
elif "transcripts disabled" in error_message.lower():
error_message = "Transcripts are disabled for this video"
elif "transcript not found" in error_message.lower():
error_message = "No transcript found for the requested languages"
elif "list index out of range" in error_message.lower():
error_message = "Video parsing error - transcript data structure unexpected"
elif "connection" in error_message.lower() or "timeout" in error_message.lower():
error_message = "Network connection issue - please try again later"
return {
'success': False,
'error': f'Transcript extraction failed: {error_message}',
'video_id': video_id,
'api_version': 'youtube-transcript-api-1.1.0+',
'suggestion': self._get_error_suggestion(error_message),
'retry_recommended': "connection" in error_message.lower() or "timeout" in error_message.lower() or "temporary" in error_message.lower()
}
def _format_timestamp(self, seconds: float) -> str:
"""Format seconds to MM:SS or HH:MM:SS format"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
else:
return f"{minutes:02d}:{secs:02d}"
def _format_duration(self, seconds: float) -> str:
"""Format duration in human readable format"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
parts = []
if hours > 0:
parts.append(f"{hours}h")
if minutes > 0:
parts.append(f"{minutes}m")
if secs > 0 or not parts:
parts.append(f"{secs}s")
return " ".join(parts)
def _get_error_suggestion(self, error_message: str) -> str:
"""Get helpful suggestion based on error type"""
error_lower = error_message.lower()
if "transcript not found" in error_lower or "no transcript" in error_lower:
return "This video may not have transcripts available. Try a different video or check if captions are enabled."
elif "transcripts disabled" in error_lower:
return "The video owner has disabled transcripts. Try a different video."
elif "video unavailable" in error_lower or "private" in error_lower:
return "Video is not accessible. Check if the video exists and is publicly available."
elif "network" in error_lower or "connection" in error_lower or "timeout" in error_lower:
return "Network issue detected. Check your internet connection and try again."
elif "parsing" in error_lower or "temporary" in error_lower:
return "This appears to be a temporary issue with YouTube's servers. Try again in a few minutes."
else:
return "Try using a different video or check if the video has publicly available transcripts."
async def process_youtube_url(
self,
url: str,
languages: Optional[List[str]] = None,
translate_to: Optional[str] = None,
include_timestamps: bool = True,
preserve_formatting: bool = True,
include_metadata: bool = True
) -> Dict[str, Any]:
"""Process YouTube URL and extract transcript"""
if not self.is_youtube_url(url):
return {
'success': False,
'error': 'URL is not a valid YouTube video URL',
'url': url
}
video_id = self.extract_video_id(url)
if not video_id:
return {
'success': False,
'error': 'Could not extract video ID from URL',
'url': url
}
try:
# Get transcript
transcript_result = await self.extract_transcript(
video_id=video_id,
languages=languages,
translate_to=translate_to,
include_timestamps=include_timestamps,
preserve_formatting=preserve_formatting
)
if not transcript_result['success']:
return transcript_result
# Get video metadata if requested
video_metadata = None
if include_metadata:
video_metadata = self.get_video_info(video_id)
return {
'success': True,
'url': url,
'video_id': video_id,
'processing_method': 'youtube_transcript_api_v1.1.0+',
'transcript': transcript_result['transcript_data'],
'language_info': {
'source_language': transcript_result['source_language'],
'final_language': transcript_result['final_language'],
'is_translated': transcript_result['is_translated']
},
'metadata': video_metadata,
'api_version': 'youtube-transcript-api-1.1.0+'
}
except Exception as e:
return {
'success': False,
'error': f'YouTube processing failed: {str(e)}',
'url': url,
'video_id': video_id
}
async def batch_extract_transcripts(
self,
urls: List[str],
languages: Optional[List[str]] = None,
translate_to: Optional[str] = None,
include_timestamps: bool = True,
max_concurrent: int = 3
) -> List[Dict[str, Any]]:
"""Extract transcripts from multiple YouTube URLs"""
async def process_single_url(url):
return await self.process_youtube_url(
url=url,
languages=languages,
translate_to=translate_to,
include_timestamps=include_timestamps
)
# Create semaphore to limit concurrent requests
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(url):
async with semaphore:
return await process_single_url(url)
# Process all URLs concurrently
tasks = [process_with_semaphore(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
'success': False,
'url': urls[i],
'error': f'Processing failed: {str(result)}'
})
else:
processed_results.append(result)
return processed_results
async def summarize_transcript(
self,
transcript_text: str,
summary_length: str = "medium",
include_timestamps: bool = True,
llm_provider: Optional[str] = None,
llm_model: Optional[str] = None,
video_metadata: Optional[Dict[str, Any]] = None,
target_tokens: Optional[int] = None
) -> Dict[str, Any]:
"""
Summarize a long transcript using LLMClient with enhanced metadata preservation
Args:
transcript_text: The full transcript text to summarize
summary_length: "short", "medium", or "long" summary
include_timestamps: Whether to preserve key timestamps
llm_provider: LLM provider to use
llm_model: Specific model to use
video_metadata: Video metadata (title, channel, description, etc.)
target_tokens: Target token count for summary (if specified)
Returns:
Dictionary with summary and metadata
"""
try:
# Import LLMClient
from .utils.llm_client import LLMClient
# Extract video metadata
video_title = ""
video_url = ""
video_id = ""
channel_name = ""
video_description = ""
if video_metadata:
video_title = video_metadata.get('title', '')
video_url = video_metadata.get('url', '')
video_id = video_metadata.get('video_id', '')
channel_name = video_metadata.get('channel', '')
video_description = video_metadata.get('description', '')
# Prepare metadata for LLMClient
metadata = {
"video_title": video_title,
"video_url": video_url,
"video_id": video_id,
"channel_name": channel_name,
"include_timestamps": include_timestamps,
}
if video_description:
metadata["description"] = video_description[:200]
# Create LLMClient and call summarize
client = LLMClient()
result = await client.summarize(
content=transcript_text,
title=video_title,
url=video_url,
summary_length=summary_length,
content_type="video",
llm_provider=llm_provider,
llm_model=llm_model,
target_tokens=target_tokens,
metadata=metadata
)
# Transform result to YouTube-specific format
if result.get("success"):
return {
"success": True,
"summary": result.get("summary", "Summary generation failed"),
"video_title": video_title or result.get("title", ""),
"video_url": video_url or result.get("source_url", ""),
"video_id": video_id,
"channel_name": channel_name,
"key_topics": result.get("key_topics", []),
"key_timestamps": [] if not include_timestamps else result.get("key_timestamps", []),
"content_type": result.get("content_type", "video"),
"summary_length": summary_length,
"target_tokens": result.get("target_tokens", 800),
"estimated_summary_tokens": result.get("estimated_summary_tokens", 0),
"original_length": result.get("original_length", len(transcript_text)),
"compression_ratio": result.get("compression_ratio", 0),
"llm_provider": result.get("llm_provider", "unknown"),
"llm_model": result.get("llm_model", "unknown"),
}
else:
return {
"success": False,
"error": result.get("error", "Summarization failed"),
"summary_length": summary_length
}
except Exception as e:
return {
"success": False,
"error": f"Summarization failed: {str(e)}",
"summary_length": summary_length
}