get_video_enhanced_transcript
Extract and analyze YouTube video transcripts with filtering by time, search within content, segmentation, and multiple output formats for detailed processing.
Instructions
Advanced transcript extraction tool with filtering, search, and multi-video capabilities. Provides rich transcript data for detailed analysis and processing. Features: 1) Extract transcripts from multiple videos; 2) Filter by time ranges; 3) Search within transcripts; 4) Segment transcripts; 5) Format output in different ways; 6) Include video metadata.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| video_ids | Yes | ||
| language | No | ko | |
| start_time | No | ||
| end_time | No | ||
| query | No | ||
| case_sensitive | No | ||
| segment_method | No | equal | |
| segment_count | No | ||
| format | No | timestamped | |
| include_metadata | No |
Implementation Reference
- server.py:1294-1371 (handler)MCP tool handler function that validates input parameters, constructs options dictionary, and delegates to the YouTubeService.get_video_enhanced_transcript helper method for core processing.@mcp.tool( name="get_video_enhanced_transcript", description="Advanced transcript extraction tool with filtering, search, and multi-video capabilities. Provides rich transcript data for detailed analysis and processing. Features: 1) Extract transcripts from multiple videos; 2) Filter by time ranges; 3) Search within transcripts; 4) Segment transcripts; 5) Format output in different ways; 6) Include video metadata.", ) async def get_video_enhanced_transcript( video_ids: List[str], language: Optional[str] = 'ko', start_time: Optional[int] = None, end_time: Optional[int] = None, query: Optional[str] = None, case_sensitive: Optional[bool] = False, segment_method: Optional[str] = "equal", segment_count: Optional[int] = 2, format: Optional[str] = "timestamped", include_metadata: Optional[bool] = False, ) -> Dict[str, Any]: """ Get enhanced transcript for one or more YouTube videos with advanced filtering and processing Args: video_ids (List[str]): List of YouTube video IDs (max 5) language (str, optional): Language code for transcript start_time (int, optional): Start time in seconds end_time (int, optional): End time in seconds query (str, optional): Search query case_sensitive (bool, optional): Whether to use case-sensitive search segment_method (str, optional): Segment method ("equal" or "smart") segment_count (int, optional): Number of segments format (str, optional): Output format ("raw", "timestamped", "merged") include_metadata (bool, optional): Whether to include video details Returns: Dict[str, Any]: Enhanced transcript data """ try: # Validate input if not video_ids: return {'error': "No video IDs provided"} if len(video_ids) > 5: return {'error': "Maximum 5 video IDs allowed"} # Build options from individual parameters options = { 'language': language, 'format': format, 'includeMetadata': include_metadata } # Add time range filter if specified if start_time is not None or end_time is not None: options['timeRange'] = { 'start': start_time, 'end': end_time } # Add search filter if specified if query: options['search'] = { 'query': query, 'caseSensitive': case_sensitive, 'contextLines': 2 # Default context lines } # Add segment option if specified options['segment'] = { 'method': segment_method, 'count': segment_count } # Call the enhanced transcript method transcript = youtube_service.get_video_enhanced_transcript(video_ids, options) return transcript except Exception as e: logger.exception(f"Error in get_video_enhanced_transcript: {e}") return {'error': str(e)}
- server.py:426-667 (helper)Core helper method in YouTubeService class implementing the full enhanced transcript logic: fetching transcripts for multiple videos, applying time/search/segment filters, formatting output, including metadata and statistics.def get_video_enhanced_transcript(self, video_ids: List[str], options: Dict[str, Any]) -> Dict[str, Any]: """ Get enhanced transcript for one or more YouTube videos with advanced filtering and processing Args: video_ids (List[str]): List of YouTube video IDs options (Dict[str, Any]): Advanced options for transcript processing - language (str, optional): Language code - format (str, optional): Output format (raw, timestamped, merged) - includeMetadata (bool, optional): Whether to include video details - timeRange (Dict, optional): Time range filter with start and end in seconds - search (Dict, optional): Search filter with query, caseSensitive, and contextLines - segment (Dict, optional): Segmentation options with method and count Returns: Dict[str, Any]: Enhanced transcript data """ result = { "videos": [], "status": { "success": True, "message": "Transcripts processed successfully", "failedCount": 0, "successCount": 0 } } # Process options language = options.get('language') format_type = options.get('format', 'timestamped') include_metadata = options.get('includeMetadata', False) time_range = options.get('timeRange') search_filter = options.get('search') segment_options = options.get('segment') # Process each video for video_id in video_ids: video_result = {"videoId": video_id} try: # Get video details if metadata requested if include_metadata: video_data = self.get_video_details(video_id) if not video_data.get('items'): video_result["error"] = f"Video with ID {video_id} not found" result["videos"].append(video_result) result["status"]["failedCount"] += 1 continue video = video_data['items'][0] video_result["metadata"] = { 'id': video.get('id'), 'title': video.get('snippet', {}).get('title'), 'channelTitle': video.get('snippet', {}).get('channelTitle'), 'publishedAt': video.get('snippet', {}).get('publishedAt'), 'duration': video.get('contentDetails', {}).get('duration') } # Call the get_video_transcript method which returns transcript data raw_transcript_data = self.get_video_transcript(video_id, language) # Check if transcript was fetched successfully if not raw_transcript_data or (isinstance(raw_transcript_data, dict) and 'error' in raw_transcript_data): error_msg = raw_transcript_data.get('error', "Failed to retrieve transcript") if isinstance(raw_transcript_data, dict) else "Failed to retrieve transcript" video_result["error"] = error_msg result["videos"].append(video_result) result["status"]["failedCount"] += 1 continue # Get transcript segments - adapt to different response formats if isinstance(raw_transcript_data, dict) and 'transcript' in raw_transcript_data: # If it's a dictionary with transcript key (from existing get_video_transcript method) segments = raw_transcript_data['transcript'] elif isinstance(raw_transcript_data, dict) and 'text' in raw_transcript_data: # If the get_video_transcript method returned a formatted response with 'text' # This is a fallback case segments = [] video_result["error"] = "Transcript format not supported" result["videos"].append(video_result) result["status"]["failedCount"] += 1 continue elif isinstance(raw_transcript_data, list): # If it returned a list directly (might happen in some cases) segments = [] for item in raw_transcript_data: segments.append({ 'text': item.get('text', ''), 'start': item.get('start', 0), 'duration': item.get('duration', 0), 'timestamp': self.format_time(int(item.get('start', 0) * 1000)) }) else: # This handles the FetchedTranscript objects from YouTubeTranscriptApi # that don't have a .get() method segments = [] for segment in raw_transcript_data: text = getattr(segment, 'text', '') start = getattr(segment, 'start', 0) duration = getattr(segment, 'duration', 0) segments.append({ 'text': text, 'start': start, 'duration': duration, 'timestamp': self.format_time(int(start * 1000)) }) # Apply time range filter if specified if time_range: start_time = time_range.get('start') end_time = time_range.get('end') if start_time is not None: segments = [s for s in segments if (s['start'] + s['duration']) >= start_time] if end_time is not None: segments = [s for s in segments if s['start'] <= end_time] # Apply search filter if specified if search_filter and segments: query = search_filter.get('query', '') case_sensitive = search_filter.get('caseSensitive', False) context_lines = search_filter.get('contextLines', 0) if query: # Search in segments matched_indices = [] search_query = query if case_sensitive else query.lower() for i, segment in enumerate(segments): text = segment['text'] if case_sensitive else segment['text'].lower() if search_query in text: matched_indices.append(i) # Include context lines if context_lines > 0: expanded_indices = set() for idx in matched_indices: # Add the context lines before and after for i in range(max(0, idx - context_lines), min(len(segments), idx + context_lines + 1)): expanded_indices.add(i) matched_indices = sorted(expanded_indices) # Filter segments by matched indices segments = [segments[i] for i in matched_indices] # Apply segmentation if specified if segment_options and segments: method = segment_options.get('method', 'equal') count = segment_options.get('count', 1) if method == 'equal' and count > 1: # Divide into equal parts segment_size = len(segments) // count segmented_transcript = [] for i in range(count): start_idx = i * segment_size end_idx = start_idx + segment_size if i < count - 1 else len(segments) segment_chunks = segments[start_idx:end_idx] if segment_chunks: # Only add non-empty segments segmented_transcript.append({ "index": i, "segments": segment_chunks, "text": " ".join([s['text'] for s in segment_chunks]) }) video_result["segments"] = segmented_transcript elif method == 'smart' and count > 1: # Use a smarter segmentation approach # For simplicity, we'll use a basic approach dividing by total character count total_text = " ".join([s['text'] for s in segments]) total_chars = len(total_text) chars_per_segment = total_chars // count segmented_transcript = [] current_segment = [] current_chars = 0 segment_idx = 0 for s in segments: current_segment.append(s) current_chars += len(s['text']) if current_chars >= chars_per_segment and segment_idx < count - 1: segmented_transcript.append({ "index": segment_idx, "segments": current_segment, "text": " ".join([seg['text'] for seg in current_segment]) }) segment_idx += 1 current_segment = [] current_chars = 0 # Add the last segment if not empty if current_segment: segmented_transcript.append({ "index": segment_idx, "segments": current_segment, "text": " ".join([seg['text'] for seg in current_segment]) }) video_result["segments"] = segmented_transcript # Format transcript based on format type if format_type == 'raw': video_result["transcript"] = segments elif format_type == 'timestamped': video_result["transcript"] = [ f"[{s['timestamp']}] {s['text']}" for s in segments ] elif format_type == 'merged': video_result["transcript"] = " ".join([s['text'] for s in segments]) # Store statistics video_result["statistics"] = { "segmentCount": len(segments), "totalDuration": sum([s['duration'] for s in segments]), "averageSegmentLength": sum([len(s['text']) for s in segments]) / len(segments) if segments else 0 } result["videos"].append(video_result) result["status"]["successCount"] += 1 except Exception as e: logger.exception(f"Error processing transcript for video {video_id}: {e}") video_result["error"] = str(e) result["videos"].append(video_result) result["status"]["failedCount"] += 1 # Update overall status if result["status"]["failedCount"] > 0: if result["status"]["successCount"] == 0: result["status"]["success"] = False result["status"]["message"] = "All transcript requests failed" else: result["status"]["message"] = f"Partially successful ({result['status']['failedCount']} failed, {result['status']['successCount']} succeeded)" return result
- server.py:687-687 (registration)Tool name and description registered in the available-youtube-tools resource list.{"name": "get_video_enhanced_transcript", "description": "Advanced transcript extraction tool with filtering, search, and multi-video capabilities. Provides rich transcript data for detailed analysis and processing. Features: 1) Extract transcripts from multiple videos; 2) Filter by time ranges; 3) Search within transcripts; 4) Segment transcripts; 5) Format output in different ways; 6) Include video metadata."}