#!/usr/bin/env python3
"""
YouTube Insights MCP Server
Extracts transcripts, metadata, and insights from YouTube videos.
"""
import re
import json
from typing import Any
from urllib.parse import urlparse, parse_qs
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import (
TranscriptsDisabled,
NoTranscriptFound,
VideoUnavailable,
)
import requests
from mcp.server import Server
from mcp.types import Tool, TextContent
def extract_video_id(url: str) -> str:
"""Extract YouTube video ID from various URL formats."""
# Handle different YouTube URL formats
patterns = [
r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/embed\/)([a-zA-Z0-9_-]{11})',
r'youtube\.com\/shorts\/([a-zA-Z0-9_-]{11})',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
# If no pattern matches, assume it's already a video ID
if re.match(r'^[a-zA-Z0-9_-]{11}$', url):
return url
raise ValueError(f"Could not extract video ID from: {url}")
def get_video_transcript(video_id: str, languages: list[str] = None) -> dict[str, Any]:
"""Get the transcript for a YouTube video."""
if languages is None:
languages = ['en', 'en-US', 'en-GB']
try:
# Create API instance and get list of available transcripts
api = YouTubeTranscriptApi()
transcript_list = api.list(video_id)
# Try to find a transcript in the preferred languages
transcript = None
for lang in languages:
try:
transcript = transcript_list.find_transcript([lang])
break
except NoTranscriptFound:
continue
# If no preferred language found, get any available transcript
if transcript is None:
try:
transcript = transcript_list.find_generated_transcript(['en'])
except:
# Get first available transcript
available = list(transcript_list)
if available:
transcript = available[0]
if transcript is None:
raise NoTranscriptFound(video_id, languages, None)
# Fetch the actual transcript data
fetched_transcript = transcript.fetch()
# Convert transcript snippets to dict format
transcript_data = []
for snippet in fetched_transcript:
transcript_data.append({
'text': snippet.text,
'start': snippet.start,
'duration': snippet.duration
})
# Format the transcript
full_text = " ".join([entry['text'] for entry in transcript_data])
return {
"success": True,
"video_id": video_id,
"language": transcript.language_code,
"is_generated": transcript.is_generated,
"transcript": transcript_data,
"full_text": full_text,
"duration": transcript_data[-1]['start'] + transcript_data[-1]['duration'] if transcript_data else 0,
}
except TranscriptsDisabled:
return {
"success": False,
"error": "Transcripts are disabled for this video",
"video_id": video_id,
}
except NoTranscriptFound:
return {
"success": False,
"error": "No transcript found for this video",
"video_id": video_id,
}
except VideoUnavailable:
return {
"success": False,
"error": "Video is unavailable",
"video_id": video_id,
}
except Exception as e:
return {
"success": False,
"error": f"Error fetching transcript: {str(e)}",
"video_id": video_id,
}
def get_video_metadata(video_id: str) -> dict[str, Any]:
"""Get metadata for a YouTube video using oEmbed API."""
try:
# Use YouTube's oEmbed endpoint
url = f"https://www.youtube.com/oembed?url=https://www.youtube.com/watch?v={video_id}&format=json"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
return {
"success": True,
"video_id": video_id,
"title": data.get("title", ""),
"author": data.get("author_name", ""),
"author_url": data.get("author_url", ""),
"thumbnail_url": data.get("thumbnail_url", ""),
"width": data.get("width"),
"height": data.get("height"),
}
else:
return {
"success": False,
"error": f"Failed to fetch metadata: HTTP {response.status_code}",
"video_id": video_id,
}
except Exception as e:
return {
"success": False,
"error": f"Error fetching metadata: {str(e)}",
"video_id": video_id,
}
def summarize_transcript(transcript_text: str, max_sentences: int = 5) -> str:
"""Create a simple summary by extracting key sentences."""
# Split into sentences
sentences = re.split(r'[.!?]+', transcript_text)
sentences = [s.strip() for s in sentences if len(s.strip()) > 20]
# Simple extraction: take sentences at regular intervals
if len(sentences) <= max_sentences:
return ". ".join(sentences) + "."
step = len(sentences) // max_sentences
key_sentences = [sentences[i * step] for i in range(max_sentences)]
return ". ".join(key_sentences) + "."
# Create MCP server
app = Server("youtube-insights-mcp")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return [
Tool(
name="get_youtube_transcript",
description="Extract the full transcript from a YouTube video. Supports various YouTube URL formats or direct video IDs.",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "YouTube video URL or video ID",
},
"languages": {
"type": "array",
"items": {"type": "string"},
"description": "Preferred languages for transcript (e.g., ['en', 'es']). Defaults to English.",
"default": ["en"],
},
},
"required": ["url"],
},
),
Tool(
name="get_youtube_metadata",
description="Get metadata for a YouTube video including title, author, and thumbnail.",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "YouTube video URL or video ID",
},
},
"required": ["url"],
},
),
Tool(
name="get_youtube_insights",
description="Get comprehensive insights from a YouTube video including transcript, metadata, and a brief summary.",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "YouTube video URL or video ID",
},
"languages": {
"type": "array",
"items": {"type": "string"},
"description": "Preferred languages for transcript. Defaults to English.",
"default": ["en"],
},
"summary_sentences": {
"type": "integer",
"description": "Number of sentences for the summary (default: 5)",
"default": 5,
},
},
"required": ["url"],
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls."""
if name == "get_youtube_transcript":
url = arguments.get("url")
languages = arguments.get("languages", ["en"])
try:
video_id = extract_video_id(url)
result = get_video_transcript(video_id, languages)
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
return [TextContent(
type="text",
text=json.dumps({
"success": False,
"error": str(e)
}, indent=2)
)]
elif name == "get_youtube_metadata":
url = arguments.get("url")
try:
video_id = extract_video_id(url)
result = get_video_metadata(video_id)
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
return [TextContent(
type="text",
text=json.dumps({
"success": False,
"error": str(e)
}, indent=2)
)]
elif name == "get_youtube_insights":
url = arguments.get("url")
languages = arguments.get("languages", ["en"])
summary_sentences = arguments.get("summary_sentences", 5)
try:
video_id = extract_video_id(url)
# Get both transcript and metadata
transcript_result = get_video_transcript(video_id, languages)
metadata_result = get_video_metadata(video_id)
# Create comprehensive result
result = {
"success": transcript_result["success"] and metadata_result["success"],
"video_id": video_id,
"metadata": metadata_result if metadata_result["success"] else None,
}
if transcript_result["success"]:
result["transcript"] = {
"language": transcript_result["language"],
"is_generated": transcript_result["is_generated"],
"full_text": transcript_result["full_text"],
"duration_seconds": transcript_result["duration"],
"word_count": len(transcript_result["full_text"].split()),
}
# Add summary
result["summary"] = summarize_transcript(
transcript_result["full_text"],
summary_sentences
)
else:
result["error"] = transcript_result.get("error", "Unknown error")
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
return [TextContent(
type="text",
text=json.dumps({
"success": False,
"error": str(e)
}, indent=2)
)]
else:
return [TextContent(
type="text",
text=json.dumps({
"success": False,
"error": f"Unknown tool: {name}"
}, indent=2)
)]
async def main():
"""Run the MCP server."""
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())