Skip to main content
Glama
ytdlp_mcp.py22.2 kB
import os import tempfile import json from typing import Dict, Any, List, Optional import yt_dlp from pydantic import BaseModel from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import TextContent, Tool # Pydantic models for input validation class YouTubeChapterInput(BaseModel): url: str class YouTubeChapter(BaseModel): start_time: str title: str class YouTubeSubtitles(BaseModel): url: str language: str = "en" chapters: List[YouTubeChapter] class YtDlpTools(str): YOUTUBE_CHAPTERS = "youtube_chapters" YOUTUBE_SUBTITLES = "youtube_subtitles" class YtDlpService: """Service for extracting subtitles and chapters from YouTube videos using yt-dlp.""" def __init__(self): """Initialize the YouTube extraction service.""" # No need to check if yt-dlp is installed since we're importing it directly def _time_to_seconds(self, time_str: str) -> float: """ Convert a time string (HH:MM:SS or MM:SS) to seconds. Args: time_str: Time string in format "HH:MM:SS", "MM:SS", or already a number Returns: Time in seconds as a float """ # If already a number, return it if isinstance(time_str, (int, float)): return float(time_str) # Handle string time formats try: parts = time_str.split(":") if len(parts) == 3: # HH:MM:SS return int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2]) elif len(parts) == 2: # MM:SS return int(parts[0]) * 60 + float(parts[1]) else: # Try to convert directly to float return float(time_str) except (ValueError, AttributeError): # If conversion fails, return 0 return 0.0 def extract_subtitles( self, url: str, language: str = "en", chapters: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: """ Extract subtitles from a YouTube video, filtered by chapters if provided. Args: url: YouTube video URL language: Language code for subtitles (default: "en" for English) chapters: List of chapters with start_time and title to filter subtitles Returns: Dict containing success status, subtitle content, and any error information """ try: # Convert any string timestamps in chapters to seconds if chapters: for chapter in chapters: if "start_time" in chapter and not isinstance( chapter["start_time"], (int, float) ): chapter["start_time"] = self._time_to_seconds( chapter["start_time"] ) if "end_time" in chapter and not isinstance( chapter["end_time"], (int, float) ): chapter["end_time"] = self._time_to_seconds(chapter["end_time"]) # Create a temporary directory to store the subtitle file with tempfile.TemporaryDirectory() as temp_dir: output_file = os.path.join(temp_dir, "subtitles") # Configure yt-dlp options ydl_opts = { "skip_download": True, "writesubtitles": True, "writeautomaticsub": True, # Also try auto-generated subs if regular ones aren't available "subtitleslangs": [language], "subtitlesformat": "vtt", "outtmpl": output_file, "quiet": True, } # Use yt-dlp library directly with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) # Get all chapters for context if we're filtering by chapters all_chapters = None if chapters and not any( chapter.get("end_time") for chapter in chapters ): # Get all chapters from video metadata all_chapters = info.get("chapters", []) # If no chapters in video metadata, create a single chapter for the whole video if not all_chapters and info.get("duration"): all_chapters = [ { "start_time": 0, "end_time": info.get("duration", 0), "title": info.get("title", "Full Video"), } ] # Sort all chapters by start time all_chapters = sorted( all_chapters, key=lambda x: x.get("start_time", 0) ) # Check if the subtitle file was created subtitle_file = f"{output_file}.{language}.vtt" if os.path.exists(subtitle_file): with open(subtitle_file, "r", encoding="utf-8") as f: subtitle_content = f.read() # Get video duration if available duration = info.get("duration", 0) # Process the VTT content to make it more readable, filtered by chapters processed_content = self._process_vtt_content( subtitle_content, chapters, all_chapters, duration ) return { "success": True, "content": processed_content, "message": "Subtitles successfully extracted" + (" for specified chapters" if chapters else ""), } else: return { "success": False, "error": f"No subtitles found for language '{language}'", } except Exception as e: raise def _process_vtt_content( self, vtt_content: str, selected_chapters: Optional[List[Dict[str, Any]]] = None, all_chapters: Optional[List[Dict[str, Any]]] = None, video_duration: int = 0, ) -> str: """ Process VTT subtitle content to make it more readable, filtered by chapters if provided. Args: vtt_content: Raw VTT subtitle content selected_chapters: List of specific chapters to extract subtitles for all_chapters: Complete list of all chapters in the video (for context) video_duration: Total duration of the video in seconds Returns: Processed subtitle text """ lines = vtt_content.split("\n") # If no chapters provided, process all subtitles if not selected_chapters: return self._process_all_subtitles(lines) # Create chapter ranges with start and end times chapter_ranges = [] # Sort selected chapters by start time selected_chapters = sorted( selected_chapters, key=lambda x: x.get("start_time", 0) ) for chapter in selected_chapters: start_time = chapter.get("start_time", 0) title = chapter.get("title", "Unnamed chapter") # If chapter has explicit end_time, use it if "end_time" in chapter: end_time = chapter["end_time"] else: # Find this chapter in all_chapters to determine its end time end_time = None if all_chapters: # Find the current chapter in all_chapters for i, full_chapter in enumerate(all_chapters): if ( full_chapter.get("start_time") == start_time and full_chapter.get("title") == title ): # If this is the last chapter, end time is video duration if i == len(all_chapters) - 1: end_time = video_duration else: # Otherwise, end time is the start time of the next chapter end_time = all_chapters[i + 1].get("start_time") break # If we couldn't find the end time, use the next chapter's start time or video duration if end_time is None: # Find the next chapter in selected_chapters that has a later start time next_chapters = [ c for c in selected_chapters if c.get("start_time", 0) > start_time ] if next_chapters: end_time = min( c.get("start_time", video_duration) for c in next_chapters ) else: end_time = video_duration # Add 2 seconds buffer to end time if it's not the last chapter is_last_chapter = i == len(selected_chapters) - 1 if not is_last_chapter and end_time < video_duration: end_time += 2 chapter_ranges.append( {"start": start_time, "end": end_time, "title": title} ) # Process subtitles by chapter result = [] for chapter_range in chapter_ranges: chapter_subs = self._extract_subtitles_for_timerange( lines, chapter_range["start"], chapter_range["end"] ) if chapter_subs: result.append(f"## {chapter_range['title']}") result.append(chapter_subs) return "\n\n".join(result) def _process_all_subtitles(self, lines: list) -> str: """Process all subtitles without chapter filtering.""" processed_lines = [] seen_sentences = set() # Track unique sentences current_sentence = "" # Skip header lines start_processing = False for line in lines: # Skip empty lines if not line.strip(): continue # Skip VTT header if not start_processing: if line.strip() and not line.startswith("WEBVTT"): start_processing = True else: continue # Skip timing lines (they contain --> ) if "-->" in line: continue # Skip lines with just numbers (timestamp indices) if line.strip().isdigit(): continue # Clean the line and add non-empty content cleaned_line = self._clean_subtitle_text(line.strip()) if not cleaned_line: continue # Check if this line is a duplicate of the previous content # This handles the case where VTT repeats the same text in consecutive segments if processed_lines and cleaned_line in processed_lines[-1]: continue # If we have a complete sentence, add it if cleaned_line.endswith((".", "!", "?")): if current_sentence: full_sentence = f"{current_sentence} {cleaned_line}".strip() else: full_sentence = cleaned_line # Only add if not a duplicate if full_sentence not in seen_sentences: seen_sentences.add(full_sentence) processed_lines.append(full_sentence) current_sentence = "" else: # Add to current sentence if current_sentence: current_sentence += f" {cleaned_line}" else: current_sentence = cleaned_line # Add any remaining text if current_sentence and current_sentence not in seen_sentences: seen_sentences.add(current_sentence) processed_lines.append(current_sentence) # Final deduplication pass - remove lines that are substrings of other lines final_lines = [] for i, line in enumerate(processed_lines): is_substring = False for j, other_line in enumerate(processed_lines): if i != j and line in other_line: is_substring = True break if not is_substring: final_lines.append(line) return "\n".join(final_lines) def _extract_subtitles_for_timerange( self, lines: list, start_seconds: float, end_seconds: float ) -> str: """ Extract subtitles that fall within a specific time range. Args: lines: VTT subtitle lines start_seconds: Start time in seconds end_seconds: End time in seconds Returns: Processed subtitle text for the time range """ # Ensure start_seconds and end_seconds are float values start_seconds = float(start_seconds) end_seconds = float(end_seconds) # First, collect all subtitle segments in the time range subtitle_segments = [] current_time = 0 current_segment = {"time": 0, "text": ""} i = 0 while i < len(lines): line = lines[i] # Parse timestamp lines if "-->" in line: time_parts = line.split("-->") if len(time_parts) >= 1: # Parse the start time time_str = time_parts[0].strip() try: # Handle different time formats (00:00:00.000 or 00:00.000) if time_str.count(":") == 2: h, m, s = time_str.split(":") current_time = int(h) * 3600 + int(m) * 60 + float(s) else: m, s = time_str.split(":") current_time = int(m) * 60 + float(s) # Check if this subtitle falls within our chapter range if start_seconds <= current_time < end_seconds: # Start a new segment current_segment = {"time": current_time, "text": ""} # Collect all text lines until next timestamp or empty line j = i + 1 while ( j < len(lines) and not ("-->" in lines[j]) and lines[j].strip() ): if not lines[j].strip().isdigit(): # Skip index numbers cleaned_line = self._clean_subtitle_text( lines[j].strip() ) if cleaned_line: current_segment["text"] += ( " " + cleaned_line if current_segment["text"] else cleaned_line ) j += 1 # Only add non-empty segments if current_segment["text"]: subtitle_segments.append(current_segment) except (ValueError, IndexError): pass i += 1 # Now process the segments to remove duplicates processed_lines = [] seen_texts = set() # Sort segments by time subtitle_segments.sort(key=lambda x: x["time"]) # Process segments for segment in subtitle_segments: text = segment["text"].strip() # Skip if we've seen this exact text before if text in seen_texts: continue # Skip if this text is a substring of the last added line if processed_lines and text in processed_lines[-1]: continue # Skip if the last added line is a substring of this text if processed_lines and processed_lines[-1] in text: # Replace the last line with this more complete one processed_lines[-1] = text seen_texts.add(text) continue # Add new unique text processed_lines.append(text) seen_texts.add(text) return "\n".join(processed_lines) def _clean_subtitle_text(self, text: str) -> str: """ Remove HTML-like tags and timestamps from subtitle text. Args: text: Raw subtitle text with tags Returns: Clean text without tags """ import re # Remove timestamp tags like <00:13:50.279> text = re.sub(r"<\d+:\d+:\d+\.\d+>", "", text) # Remove <c> and </c> tags text = re.sub(r"</?c>", "", text) # Remove any duplicate spaces created by tag removal text = re.sub(r"\s+", " ", text) return text.strip() def _format_time(self, seconds: float) -> str: """ Format time in seconds to HH:MM:SS format. Args: seconds: Time in seconds Returns: Formatted time string """ hours, remainder = divmod(int(seconds), 3600) minutes, seconds = divmod(remainder, 60) if hours > 0: return f"{hours:02d}:{minutes:02d}:{seconds:02d}" else: return f"{minutes:02d}:{seconds:02d}" def extract_chapters(self, url: str) -> Dict[str, Any]: """ Extract chapters from a YouTube video. Args: url: YouTube video URL Returns: Dict containing success status, chapters information, and any error information """ try: # Configure yt-dlp options ydl_opts = { "skip_download": True, "quiet": True, } # Use yt-dlp library directly with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) # Get video title video_title = info.get("title", "Untitled Video") # Check if chapters information is available if info and "chapters" in info and info["chapters"]: chapters = info["chapters"] # Format chapters information formatted_chapters = [] for chapter in chapters: start_time = self._format_time(chapter.get("start_time", 0)) title = chapter.get("title", "Unnamed chapter") formatted_chapters.append(f"{start_time} - {title}") chapters_text = "\n".join(formatted_chapters) return { "success": True, "content": chapters_text, "message": f"Successfully extracted {len(chapters)} chapters", "raw_chapters": chapters, # Include raw data for potential further processing } else: # No chapters found, return video title as a single chapter formatted_chapter = f"00:00 - {video_title}" return { "success": True, "content": formatted_chapter, "message": "No chapters found. Returning video title as a single chapter.", "raw_chapters": [{"start_time": 0, "title": video_title}], } except Exception as e: return {"success": False, "error": f"Failed to extract chapters: {str(e)}"} async def serve() -> None: server = Server("mcp-ytdlp") service = YtDlpService() @server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name=YtDlpTools.YOUTUBE_CHAPTERS, description="Extract chapters from a YouTube video", inputSchema=YouTubeChapterInput.model_json_schema(), ), Tool( name=YtDlpTools.YOUTUBE_SUBTITLES, description="Extract subtitles from a YouTube video for specific chapters", inputSchema=YouTubeSubtitles.model_json_schema(), ), ] @server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: try: if name == YtDlpTools.YOUTUBE_CHAPTERS: result = service.extract_chapters(arguments["url"]) return [TextContent(type="text", text=result["content"])] elif name == YtDlpTools.YOUTUBE_SUBTITLES: if "chapters" not in arguments or len(arguments["chapters"]) == 0: raise Exception("Chapter is required") result = service.extract_subtitles( arguments["url"], arguments.get("language", "en"), arguments["chapters"], ) return [TextContent(type="text", text=result["content"])] else: raise ValueError(f"Unknown tool: {name}") except Exception as e: return [TextContent(type="text", text=json.dumps({"error": str(e)}))] options = server.create_initialization_options() async with stdio_server() as (read_stream, write_stream): await server.run(read_stream, write_stream, options) def main(): import asyncio asyncio.run(serve())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/daltonnyx/userful-mcps'

If you have feedback or need assistance with the MCP directory API, please join our Discord server