main.py•3.21 kB
import re
from typing import Any
from mcp.server.stdio import stdio_server
from youtube_transcript_api import YouTubeTranscriptApi
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("youtube")
def extract_video_id(url: str) -> str:
"""Extract YouTube video ID from various YouTube URL formats."""
# Handle different YouTube URL formats
patterns = [
r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})',
r'youtube\.com/watch\?.*v=([a-zA-Z0-9_-]{11})',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
# If no pattern matches, maybe it's already a video ID
if len(url) == 11 and re.match(r'^[a-zA-Z0-9_-]{11}$', url):
return url
raise ValueError(f"Could not extract video ID from URL: {url}")
def format_timestamp(seconds: float) -> str:
"""Convert seconds to MM:SS or HH:MM:SS format."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
else:
return f"{minutes:02d}:{seconds:02d}"
def get_youtube_transcript(video_id: str) -> dict[str, Any]:
"""Get YouTube transcript using youtube-transcript-api (English only)."""
try:
# Get English transcript
transcript_list = YouTubeTranscriptApi.get_transcript(video_id, ['en'])
# Create timestamped transcript
timestamped_transcript = []
for entry in transcript_list:
timestamp = format_timestamp(entry['start'])
timestamped_transcript.append(f"[{timestamp}] {entry['text']}")
# Join all segments
full_transcript = '\n'.join(timestamped_transcript)
return {
'success': True,
'video_id': video_id,
'transcript': full_transcript,
'segments_count': len(transcript_list)
}
except Exception as e:
return {
'success': False,
'error': str(e),
'video_id': video_id
}
@mcp.tool()
async def get_youtube_transcript_tool(url: str) -> str:
"""Extract English transcript with timestamps from a YouTube video using its URL or video ID.
Args:
url: YouTube video URL or video ID
Returns:
The timestamped transcript text
"""
try:
video_id = extract_video_id(url)
result = get_youtube_transcript(video_id)
if result['success']:
response = f"✅ Successfully extracted English transcript for video: {video_id}\n\n"
response += f"📄 Transcript with timestamps:\n{result['transcript']}\n\n"
response += f"🔍 Total segments: {result['segments_count']}"
return response
else:
return f"❌ Failed to extract transcript: {result['error']}"
except Exception as e:
return f"❌ Error processing video: {str(e)}"
if __name__ == "__main__":
print("✅ Server is now running and ready to process requests")
mcp.run(transport='stdio')