mcp-youtube-transcript
by sinco-lab
Verified
#!/usr/bin/env node
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { McpError } from "@modelcontextprotocol/sdk/types.js";
import { YouTubeTranscriptFetcher, YouTubeUtils, YouTubeTranscriptError, TranscriptOptions, Transcript } from './youtube.js';
import { z } from "zod";
class YouTubeTranscriptExtractor {
/**
* Extracts YouTube video ID from various URL formats or direct ID input
*/
extractYoutubeId(input: string): string {
return YouTubeTranscriptFetcher.extractVideoId(input);
}
/**
* Retrieves transcripts for a given video ID and language
*/
async getTranscripts({ videoID, lang }: TranscriptOptions): Promise<{ transcripts: Transcript[], title: string }> {
try {
const result = await YouTubeTranscriptFetcher.fetchTranscripts(videoID, { lang });
if (result.transcripts.length === 0) {
throw new YouTubeTranscriptError('No transcripts found');
}
return result;
} catch (error) {
if (error instanceof YouTubeTranscriptError || error instanceof McpError) {
throw error;
}
throw new YouTubeTranscriptError(`Failed to fetch transcripts: ${(error as Error).message}`);
}
}
}
class TranscriptServer {
private extractor: YouTubeTranscriptExtractor;
private server: McpServer;
constructor() {
this.extractor = new YouTubeTranscriptExtractor();
this.server = new McpServer({
name: "mcp-youtube-transcript",
version: "0.0.1",
description: "A server built on the Model Context Protocol (MCP) that enables direct downloading of YouTube video transcripts, supporting AI and video analysis workflows."
});
this.setupTools();
this.setupErrorHandling();
}
private setupErrorHandling(): void {
process.on('SIGINT', async () => {
await this.stop();
process.exit(0);
});
}
private setupTools(): void {
this.server.tool(
"get_transcripts",
"Extract and process transcripts from a YouTube video",
{
url: z.string().describe("YouTube video URL or ID"),
lang: z.string().default("en").describe("Language code for transcripts, default 'en' (e.g. 'en', 'uk', 'ja', 'ru', 'zh')"),
enableParagraphs: z.boolean().default(false).describe("Enable automatic paragraph breaks, default `false`")
},
async (input) => {
try {
const videoId = this.extractor.extractYoutubeId(input.url);
console.error(`Processing transcripts for video: ${videoId}`);
const { transcripts, title } = await this.extractor.getTranscripts({
videoID: videoId,
lang: input.lang
});
// Format text with optional paragraph breaks
const formattedText = YouTubeUtils.formatTranscriptText(transcripts, {
enableParagraphs: input.enableParagraphs
});
console.error(`Successfully extracted transcripts for "${title}" (${formattedText.length} chars)`);
return {
content: [{
type: "text",
text: `# ${title}\n\n${formattedText}`,
metadata: {
videoId,
title,
language: input.lang,
timestamp: new Date().toISOString(),
charCount: formattedText.length,
transcriptCount: transcripts.length,
totalDuration: YouTubeUtils.calculateTotalDuration(transcripts),
paragraphsEnabled: input.enableParagraphs
}
}]
};
} catch (error) {
if (error instanceof YouTubeTranscriptError || error instanceof McpError) {
throw error;
}
throw new YouTubeTranscriptError(`Failed to process transcripts: ${(error as Error).message}`);
}
}
);
}
async start(): Promise<void> {
const transport = new StdioServerTransport();
await this.server.connect(transport);
}
async stop(): Promise<void> {
await this.server.close();
}
}
async function main() {
try {
const server = new TranscriptServer();
await server.start();
} catch (error) {
console.error('Server error:', error);
process.exit(1);
}
}
main();