YouTube MCP Server
import { MCPFunction, MCPFunctionGroup } from "@modelcontextprotocol/sdk";
import { Translate } from "@google-cloud/translate/build/src/v2";
// Utility function for safe execution with error handling
function safelyExecute<T>(fn: () => Promise<T>): Promise<T> {
return fn().catch(error => {
throw new Error(`Operation failed: ${error instanceof Error ? error.message : String(error)}`);
});
}
interface VideoTranslation {
[language: string]: {
title?: string;
description?: string;
tags?: string[];
[key: string]: any;
};
}
interface LanguageDetection {
language: string;
confidence: number;
}
interface LanguageSegment extends LanguageDetection {
text: string;
}
export class TranslationManager implements MCPFunctionGroup {
private youtube: any;
private translate: Translate;
constructor() {
this.youtube = google.youtube({
version: "v3",
auth: process.env.YOUTUBE_API_KEY
});
this.translate = new Translate({
projectId: process.env.GOOGLE_PROJECT_ID,
key: process.env.GOOGLE_TRANSLATE_API_KEY
});
}
@MCPFunction({
description: "Translate video captions to multiple languages",
parameters: {
type: "object",
properties: {
videoId: { type: "string" },
targetLanguages: {
type: "array",
items: { type: "string" }
}
},
required: ["videoId", "targetLanguages"]
}
})
async translateCaptions({
videoId,
targetLanguages
}: {
videoId: string;
targetLanguages: string[];
}): Promise<Record<string, string[]>> {
return safelyExecute(async () => {
const captions = await this.youtube.captions.list({
part: ["snippet"],
videoId
});
if (!captions.data.items?.length) {
throw new Error(`No captions found for video: ${videoId}`);
}
const results: Record<string, string[]> = {};
for (const caption of captions.data.items) {
const track = await this.youtube.captions.download({
id: caption.id
});
for (const lang of targetLanguages) {
const [translation] = await this.translate.translate(track.data, lang);
if (!results[lang]) {
results[lang] = [];
}
results[lang].push(translation);
}
}
return results;
});
}
@MCPFunction({
description: "Translate video metadata to multiple languages",
parameters: {
type: "object",
properties: {
videoId: { type: "string" },
targetLanguages: {
type: "array",
items: { type: "string" }
},
fields: {
type: "array",
items: {
type: "string",
enum: ["title", "description", "tags"]
}
}
},
required: ["videoId", "targetLanguages"]
}
})
async translateMetadata({
videoId,
targetLanguages,
fields = ["title", "description", "tags"]
}: {
videoId: string;
targetLanguages: string[];
fields?: string[];
}): Promise<VideoTranslation> {
return safelyExecute(async () => {
const video = await this.youtube.videos.list({
part: ["snippet"],
id: [videoId]
});
if (!video.data.items?.length) {
throw new Error(`Video not found: ${videoId}`);
}
const translations: VideoTranslation = {};
for (const lang of targetLanguages) {
translations[lang] = {};
const snippet = video.data.items[0].snippet;
for (const field of fields) {
if (field === "tags" && snippet.tags) {
const [translatedTags] = await this.translate.translate(
snippet.tags,
lang
);
translations[lang].tags = Array.isArray(translatedTags)
? translatedTags
: [translatedTags];
} else {
const content = snippet[field];
if (content) {
const [translation] = await this.translate.translate(content, lang);
translations[lang][field] = translation;
}
}
}
}
return translations;
});
}
@MCPFunction({
description: "Detect spoken languages in video",
parameters: {
type: "object",
properties: {
videoId: { type: "string" },
segments: {
type: "boolean",
description: "Whether to detect languages in segments"
}
},
required: ["videoId"]
}
})
async detectLanguages({
videoId,
segments = false
}: {
videoId: string;
segments?: boolean;
}): Promise<LanguageDetection | LanguageSegment[]> {
return safelyExecute(async () => {
const captions = await this.youtube.captions.list({
part: ["snippet"],
videoId
});
if (!captions.data.items?.length) {
throw new Error(`No captions found for video: ${videoId}`);
}
if (segments) {
return this.detectLanguageSegments(captions.data.items);
}
const allText = await this.getAllCaptionText(captions.data.items);
const [detection] = await this.translate.detect(allText);
return {
language: detection.language,
confidence: detection.confidence
};
});
}
private async detectLanguageSegments(captions: any[]): Promise<LanguageSegment[]> {
const segments: LanguageSegment[] = [];
const segmentSize = 1000; // Characters per segment
for (const caption of captions) {
const track = await this.youtube.captions.download({
id: caption.id
});
let currentSegment = "";
const words = track.data.split(/\s+/);
for (const word of words) {
currentSegment += word + " ";
if (currentSegment.length >= segmentSize) {
const [detection] = await this.translate.detect(currentSegment);
segments.push({
text: currentSegment.trim(),
language: detection.language,
confidence: detection.confidence
});
currentSegment = "";
}
}
if (currentSegment) {
const [detection] = await this.translate.detect(currentSegment);
segments.push({
text: currentSegment.trim(),
language: detection.language,
confidence: detection.confidence
});
}
}
return this.mergeConsecutiveSegments(segments);
}
private async getAllCaptionText(captions: any[]): Promise<string> {
const texts = await Promise.all(
captions.map(async caption => {
const track = await this.youtube.captions.download({
id: caption.id
});
return track.data;
})
);
return texts.join(" ").trim();
}
private mergeConsecutiveSegments(segments: LanguageSegment[]): LanguageSegment[] {
const merged: LanguageSegment[] = [];
let current: LanguageSegment & { confidence: number[] } | null = null;
for (const segment of segments) {
if (!current || current.language !== segment.language) {
if (current) {
merged.push({
...current,
confidence: current.confidence.reduce((a, b) => a + b) / current.confidence.length
});
}
current = {
...segment,
confidence: [segment.confidence]
};
} else {
current.text += " " + segment.text;
current.confidence.push(segment.confidence);
}
}
if (current) {
merged.push({
...current,
confidence: current.confidence.reduce((a, b) => a + b) / current.confidence.length
});
}
return merged;
}
}