YouTube MCP Integration
by spolepaka
Verified
import express, { Request, Response } from 'express';
import { z } from 'zod';
import * as cheerio from 'cheerio';
// Define interfaces for responses (copied from index.ts)
interface VideoResult {
videoId: string;
title: string;
url: string;
thumbnailUrl: string;
description: string;
channel: {
name: string;
url: string;
};
viewCount?: string;
publishedTime?: string;
}
// Common headers for requests (copied from index.ts)
const commonHeaders = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'Accept-Language': 'en-US,en;q=0.9',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Sec-Ch-Ua': '"Not A(Brand";v="99", "Google Chrome";v="121", "Chromium";v="121"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'max-age=0',
};
// Initialize Express app
const app = express();
app.use(express.json());
// Helper function to extract video ID from URL (copied from index.ts)
function extractVideoId(input: string): string | null {
if (/^[a-zA-Z0-9_-]{11}$/.test(input)) {
return input;
}
const patterns = [
/(?:youtube\.com\/watch\?v=)([^"&?\/\s]{11})/,
/(?:youtu\.be\/)([^"&?\/\s]{11})/,
/(?:youtube\.com\/embed\/)([^"&?\/\s]{11})/,
/(?:youtu\.be\/|youtube\.com\/watch\?v=)([^"&?\/\s]{11})/,
/(?:m\.youtube\.com\/watch\?v=)([^"&?\/\s]{11})/,
/(?:music\.youtube\.com\/watch\?v=)([^"&?\/\s]{11})/,
];
for (const pattern of patterns) {
const match = input.match(pattern);
if (match && match[1]) {
return match[1];
}
}
return null;
}
// Helper function to extract initial data from YouTube page (copied from index.ts)
function extractInitialData(html: string): any {
try {
const ytInitialDataMatch = html.match(/var ytInitialData = ({.*?});/);
if (ytInitialDataMatch && ytInitialDataMatch[1]) {
return JSON.parse(ytInitialDataMatch[1]);
}
return null;
} catch (error) {
console.error('Error parsing initial data:', error);
return null;
}
}
// Main search function (adapted from index.ts)
async function performYouTubeSearch(query: string, limit: number = 5): Promise<VideoResult[]> {
try {
const searchUrl = 'https://www.youtube.com/results?' + new URLSearchParams({
search_query: query,
sp: 'CAISAhAB',
}).toString();
const response = await fetch(searchUrl, {
headers: { ...commonHeaders, 'Referer': 'https://www.youtube.com/' },
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const html = await response.text();
const initialData = extractInitialData(html);
if (!initialData) {
throw new Error('Could not extract video data from page');
}
const results: VideoResult[] = [];
const items = initialData.contents?.twoColumnSearchResultsRenderer?.primaryContents?.sectionListRenderer?.contents?.[0]?.itemSectionRenderer?.contents || [];
for (const item of items) {
if (results.length >= limit) break;
const videoRenderer = item.videoRenderer;
if (!videoRenderer) continue;
const result: VideoResult = {
videoId: videoRenderer.videoId,
title: videoRenderer.title?.runs?.[0]?.text || '',
url: `https://youtube.com/watch?v=${videoRenderer.videoId}`,
thumbnailUrl: videoRenderer.thumbnail?.thumbnails?.[0]?.url || '',
description: videoRenderer.descriptionSnippet?.runs?.[0]?.text || '',
channel: {
name: videoRenderer.ownerText?.runs?.[0]?.text || '',
url: `https://youtube.com${videoRenderer.ownerText?.runs?.[0]?.navigationEndpoint?.commandMetadata?.webCommandMetadata?.url || ''}`,
},
viewCount: videoRenderer.viewCountText?.simpleText || '',
publishedTime: videoRenderer.publishedTimeText?.simpleText || '',
};
if (result.videoId && result.title) {
results.push(result);
}
}
return results;
} catch (error) {
throw new Error(`Failed to perform YouTube search: ${error instanceof Error ? error.message : 'Unknown error'}`);
}
}
// Helper function to extract transcript data (copied from index.ts)
async function extractTranscript(videoId: string): Promise<{ transcript: any[]; videoInfo: any }> {
try {
const response = await fetch(`https://www.youtube.com/watch?v=${videoId}`, {
headers: { ...commonHeaders, 'Referer': 'https://www.youtube.com/results' },
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const html = await response.text();
const playerResponseMatch = html.match(/ytInitialPlayerResponse\s*=\s*({.+?});/);
if (!playerResponseMatch) {
throw new Error('Could not find player response data');
}
const playerResponse = JSON.parse(playerResponseMatch[1]);
const captions = playerResponse?.captions?.playerCaptionsTracklistRenderer?.captionTracks;
if (!captions || captions.length === 0) {
throw new Error('No transcript available for this video');
}
const captionTrack = captions.find((track: any) => track.languageCode === 'en') || captions[0];
if (!captionTrack?.baseUrl) {
throw new Error('Could not find caption track URL');
}
const transcriptResponse = await fetch(captionTrack.baseUrl + '&fmt=json3');
if (!transcriptResponse.ok) {
throw new Error('Failed to fetch transcript');
}
const transcriptData = await transcriptResponse.json();
const transcriptEvents = transcriptData.events || [];
const processedTranscript = transcriptEvents
.filter((event: any) => event.segs)
.map((event: any) => {
const startTime = event.tStartMs / 1000;
const text = event.segs.map((seg: any) => seg.utf8).join(' ').trim();
return { time: startTime.toFixed(2), text };
});
const videoInfo = {
title: playerResponse.videoDetails?.title || '',
channel: { name: playerResponse.videoDetails?.author || '' },
duration: playerResponse.videoDetails?.lengthSeconds || '',
};
return { transcript: processedTranscript, videoInfo };
} catch (error) {
throw error;
}
}
// SSE endpoint for 'search' tool
app.get('/tools/search', async (req: Request, res: Response) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
const schema = z.object({
query: z.string().min(1),
limit: z.string().regex(/^\d+$/).transform(Number).pipe(z.number().min(1).max(10)).optional().default('5'),
});
try {
const { query, limit } = schema.parse(req.query);
const results = await performYouTubeSearch(query, limit);
res.write(`data: ${JSON.stringify(results.length > 0 ? results : { message: 'No results found' })}\n\n`);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
res.write(`data: ${JSON.stringify({ error: `Error performing search: ${errorMessage}` })}\n\n`);
} finally {
res.end();
}
});
// SSE endpoint for 'get-video-info' tool
app.get('/tools/get-video-info', async (req: Request, res: Response) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
const schema = z.object({
input: z.string().min(1).describe('YouTube video ID or URL'),
});
try {
const { input } = schema.parse(req.query);
const videoId = extractVideoId(input);
if (!videoId) {
res.write(`data: ${JSON.stringify({ error: `Invalid YouTube video ID or URL: ${input}` })}\n\n`);
res.end();
return;
}
const response = await fetch(`https://www.youtube.com/watch?v=${videoId}`, {
headers: { ...commonHeaders, 'Referer': 'https://www.youtube.com/results' },
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const html = await response.text();
const initialData = extractInitialData(html);
if (!initialData) {
throw new Error('Could not extract video data from page');
}
const videoData = initialData.contents?.twoColumnWatchNextResults?.results?.results?.contents?.[0]?.videoPrimaryInfoRenderer;
const channelData = initialData.contents?.twoColumnWatchNextResults?.results?.results?.contents?.[1]?.videoSecondaryInfoRenderer;
if (!videoData) {
throw new Error('Could not find video data');
}
const result = {
videoId,
title: videoData.title?.runs?.[0]?.text || '',
description: channelData?.description?.runs?.map((run: any) => run.text).join('') || '',
viewCount: videoData.viewCount?.videoViewCountRenderer?.viewCount?.simpleText || '',
publishDate: videoData.dateText?.simpleText || '',
channel: {
name: channelData?.owner?.videoOwnerRenderer?.title?.runs?.[0]?.text || '',
url: channelData?.owner?.videoOwnerRenderer?.navigationEndpoint?.commandMetadata?.webCommandMetadata?.url || '',
},
thumbnailUrl: `https://i.ytimg.com/vi/${videoId}/hqdefault.jpg`,
url: `https://youtube.com/watch?v=${videoId}`,
};
res.write(`data: ${JSON.stringify(result)}\n\n`);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
res.write(`data: ${JSON.stringify({ error: `Error fetching video info: ${errorMessage}` })}\n\n`);
} finally {
res.end();
}
});
// SSE endpoint for 'get-transcript' tool
app.get('/tools/get-transcript', async (req: Request, res: Response) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
const schema = z.object({
input: z.string().min(1).describe('YouTube video ID or URL'),
});
try {
const { input } = schema.parse(req.query);
const videoId = extractVideoId(input);
if (!videoId) {
res.write(`data: ${JSON.stringify({ error: `Invalid YouTube video ID or URL: ${input}` })}\n\n`);
res.end();
return;
}
const { transcript, videoInfo } = await extractTranscript(videoId);
const result = { videoId, videoInfo, transcript };
res.write(`data: ${JSON.stringify(result)}\n\n`);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
res.write(`data: ${JSON.stringify({ error: `Error fetching transcript: ${errorMessage}` })}\n\n`);
} finally {
res.end();
}
});
// Start the server
const PORT = 3000;
app.listen(PORT, () => {
console.log(`SSE YouTube Server running on http://localhost:${PORT}`);
console.log('Endpoints:');
console.log(' - Search: /tools/search?query=<query>&limit=<number>');
console.log(' - Video Info: /tools/get-video-info?input=<video_id_or_url>');
console.log(' - Transcript: /tools/get-transcript?input=<video_id_or_url>');
});