Skip to main content
Glama

mcp-pinterest

pinterest-mcp-server.ts25.2 kB
#!/usr/bin/env bun import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; import { CallToolRequestSchema, ErrorCode, ListToolsRequestSchema, McpError, CallToolRequest, } from '@modelcontextprotocol/sdk/types.js'; // @ts-ignore import PinterestScraper from './pinterest-scraper.js'; import { downloadImage, batchDownload } from './src/pinterest-download.js'; import { DEFAULT_FILENAME_TEMPLATE, validateTemplate, generateFileName } from './src/filename-template.js'; import fs from 'node:fs'; import path from 'node:path'; import { fileURLToPath } from 'node:url'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); // Default configuration constants const DEFAULT_SEARCH_LIMIT = 10; const DEFAULT_SEARCH_KEYWORD = 'landscape'; const DEFAULT_HEADLESS_MODE = true; // 从环境变量读取下载目录,如果未设置则使用默认值 const ENV_DOWNLOAD_DIR = process.env.MCP_PINTEREST_DOWNLOAD_DIR; const DEFAULT_DOWNLOAD_DIR = path.join(__dirname, '../downloads'); // 从环境变量读取文件名模板,如果未设置则使用默认值 const ENV_FILENAME_TEMPLATE = process.env.MCP_PINTEREST_FILENAME_TEMPLATE; // 检查和验证下载目录 function validateDownloadDirectory(dirPath: string): boolean { try { // 如果目录不存在,尝试创建 if (!fs.existsSync(dirPath)) { fs.mkdirSync(dirPath, { recursive: true }); console.log(`创建下载目录: ${dirPath}`); } // 验证目录是否可写 const testFile = path.join(dirPath, '.write-test'); fs.writeFileSync(testFile, 'test'); fs.unlinkSync(testFile); return true; } catch (error: any) { console.error(`下载目录验证失败: ${dirPath}, 错误: ${error.message}`); return false; } } // 获取有效的下载目录 function getValidDownloadDirectory(): string { if (ENV_DOWNLOAD_DIR) { if (validateDownloadDirectory(ENV_DOWNLOAD_DIR)) { console.log(`使用环境变量指定的下载目录: ${ENV_DOWNLOAD_DIR}`); return ENV_DOWNLOAD_DIR; } console.error('环境变量指定的下载目录无效,退出程序'); process.exit(1); } // 验证默认目录 if (validateDownloadDirectory(DEFAULT_DOWNLOAD_DIR)) { console.log(`使用默认下载目录: ${DEFAULT_DOWNLOAD_DIR}`); return DEFAULT_DOWNLOAD_DIR; } console.error('默认下载目录无效,退出程序'); process.exit(1); } // 获取有效的文件名模板 function getValidFilenameTemplate(): string { if (ENV_FILENAME_TEMPLATE) { const validationResult = validateTemplate(ENV_FILENAME_TEMPLATE); if (validationResult.isValid) { console.log(`使用环境变量指定的文件名模板: ${ENV_FILENAME_TEMPLATE}`); return ENV_FILENAME_TEMPLATE; } console.error(`环境变量指定的文件名模板无效: ${validationResult.error}`); console.log(`将使用默认文件名模板: ${DEFAULT_FILENAME_TEMPLATE}`); } return DEFAULT_FILENAME_TEMPLATE; } // 设置当前使用的下载目录 const CURRENT_DOWNLOAD_DIR = getValidDownloadDirectory(); // 设置当前使用的文件名模板 const CURRENT_FILENAME_TEMPLATE = getValidFilenameTemplate(); /** * 下载结果类型定义 */ interface DownloadResult { success: boolean; total: number; downloadedCount: number; failedCount: number; downloaded: Array<{ success: boolean; id: string; path: string; url: string; }>; failed: Array<{ url: string; error: string; }>; } /** * Pinterest MCP Server * Provides Pinterest image search functionality */ export class PinterestMcpServer { private server: Server; private scraper: PinterestScraper; constructor() { // Initialize MCP server this.server = new Server( { name: 'pinterest-mcp-server', version: '1.0.0', }, { capabilities: { tools: {}, }, } ); // Initialize Pinterest scraper this.scraper = new PinterestScraper(); // Set up tool handlers this.setupToolHandlers(); // Error handling this.server.onerror = (error) => console.error('[MCP Error]', error); // Handle process termination signals process.on('SIGINT', async () => { await this.cleanup(); process.exit(0); }); } /** * Clean up resources */ private async cleanup(): Promise<void> { await this.server.close(); } /** * Set up tool handlers */ private setupToolHandlers(): void { // Handle tool list requests this.server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: [ { name: 'pinterest_search', description: 'Search for images on Pinterest by keyword', inputSchema: { type: 'object', properties: { keyword: { type: 'string', description: 'Search keyword', }, limit: { type: 'integer', description: `Number of images to return (default: ${DEFAULT_SEARCH_LIMIT})`, default: DEFAULT_SEARCH_LIMIT, }, headless: { type: 'boolean', description: `Whether to use headless browser mode (default: ${DEFAULT_HEADLESS_MODE})`, default: DEFAULT_HEADLESS_MODE, }, }, required: ['keyword'], }, }, { name: 'pinterest_get_image_info', description: 'Get Pinterest image information', inputSchema: { type: 'object', properties: { image_url: { type: 'string', description: 'Image URL', }, }, required: ['image_url'], }, }, { name: 'pinterest_search_and_download', description: 'Search for images on Pinterest by keyword and download them', inputSchema: { type: 'object', properties: { keyword: { type: 'string', description: 'Search keyword', }, limit: { type: 'integer', description: `Number of images to return and download (default: ${DEFAULT_SEARCH_LIMIT})`, default: DEFAULT_SEARCH_LIMIT, }, headless: { type: 'boolean', description: `Whether to use headless browser mode (default: ${DEFAULT_HEADLESS_MODE})`, default: DEFAULT_HEADLESS_MODE, }, }, required: ['keyword'], }, } ] })); // Handle tool call requests this.server.setRequestHandler(CallToolRequestSchema, async (request: CallToolRequest) => { try { // 打印完整的请求对象,帮助调试 // console.error('Received request:', JSON.stringify(request, null, 2)); // console.error('Request params:', JSON.stringify(request.params, null, 2)); switch (request.params.name) { case 'pinterest_search': return await this.handlePinterestSearch(request.params.args || request.params.arguments); case 'pinterest_get_image_info': return await this.handlePinterestGetImageInfo(request.params.args || request.params.arguments); case 'pinterest_search_and_download': return await this.handlePinterestSearchAndDownload(request.params.args || request.params.arguments); default: throw new McpError( ErrorCode.MethodNotFound, `Unknown tool: ${request.params.name}` ); } } catch (error: any) { console.error(`[Tool call error] ${request.params.name}:`, error); throw new McpError( ErrorCode.InternalError, `Tool call failed: ${error.message}` ); } }); } /** * Handle Pinterest search requests */ private async handlePinterestSearch(args: any) { try { // Extract keyword and limits from input let keyword = ''; let limit = DEFAULT_SEARCH_LIMIT; let headless = DEFAULT_HEADLESS_MODE; // Normalize args if it's a string with backticks if (typeof args === 'string') { // Replace backticks with double quotes args = args.replace(/`/g, '"'); // console.error('Normalized args string:', args); } // Handle different input types if (args) { if (typeof args === 'object') { // If object, try to get properties directly // console.error('Args is object with keys:', Object.keys(args)); // Check for keyword property if ('keyword' in args && typeof args.keyword === 'string') { keyword = args.keyword.trim(); // console.error('Found keyword in object:', keyword); } else if ('`keyword`' in args) { keyword = String(args['`keyword`']).trim(); // console.error('Found `keyword` in object:', keyword); } // Check for limit property if ('limit' in args && (typeof args.limit === 'number' || !isNaN(parseInt(String(args.limit))))) { limit = typeof args.limit === 'number' ? args.limit : parseInt(String(args.limit), 10); // console.error('Found limit in object:', limit); } else if ('`limit`' in args) { const limitValue = args['`limit`']; limit = typeof limitValue === 'number' ? limitValue : parseInt(String(limitValue), 10); // console.error('Found `limit` in object:', limit); } // Check for headless property if ('headless' in args && typeof args.headless === 'boolean') { headless = args.headless; // console.error('Found headless in object:', headless); } else if ('`headless`' in args) { headless = Boolean(args['`headless`']); // console.error('Found `headless` in object:', headless); } } else if (typeof args === 'string') { // console.error('Args is string type, attempting to parse'); // Try to parse as JSON try { // First try standard JSON parsing let parsed; try { parsed = JSON.parse(args); // console.error('Successfully parsed as standard JSON'); } catch (jsonError) { // If that fails, try to fix common JSON format issues console.error('Standard JSON parse failed, trying to fix format'); // Replace single quotes with double quotes const fixedJson = args .replace(/'/g, '"') .replace(/(\w+):/g, '"$1":'); // Convert unquoted keys to quoted keys console.error('Attempting to parse fixed JSON:', fixedJson); parsed = JSON.parse(fixedJson); console.error('Successfully parsed fixed JSON'); } // Extract values from parsed object if (parsed) { if (parsed.keyword && typeof parsed.keyword === 'string') { keyword = parsed.keyword.trim(); // console.error('Found keyword in parsed JSON:', keyword); } if (parsed.limit !== undefined) { if (typeof parsed.limit === 'number') { limit = parsed.limit; } else if (typeof parsed.limit === 'string' && !isNaN(parseInt(parsed.limit))) { limit = parseInt(parsed.limit, 10); } // console.error('Found limit in parsed JSON:', limit); } if (parsed.headless !== undefined && typeof parsed.headless === 'boolean') { headless = parsed.headless; // console.error('Found headless in parsed JSON:', headless); } } } catch (e) { // console.error('All JSON parsing attempts failed, trying regex'); // If can't parse as JSON, try to extract using regex const keywordMatch = args.match(/["`']?keyword["`']?\s*[:=]\s*["`']([^"`']+)["`']/i); if (keywordMatch && keywordMatch[1]) { keyword = keywordMatch[1].trim(); // console.error('Found keyword using regex:', keyword); } // Try to extract limit const limitMatch = args.match(/["`']?limit["`']?\s*[:=]\s*(\d+)/i); if (limitMatch && limitMatch[1]) { limit = parseInt(limitMatch[1], 10); // console.error('Found limit using regex:', limit); } } } } // If keyword is empty, use default keyword if (!keyword) { keyword = DEFAULT_SEARCH_KEYWORD; // console.error('No keyword provided, using default keyword:', keyword); } // Ensure limit is a positive number if (isNaN(limit) || limit <= 0) { limit = DEFAULT_SEARCH_LIMIT; // console.error('Invalid limit, using default limit:', limit); } // console.error('Final parameters - keyword:', keyword, 'limit:', limit, 'headless:', headless); // Execute search let results = []; try { // 创建不会触发取消的AbortController const controller = new AbortController(); results = await this.scraper.search(keyword, limit, headless, controller.signal); } catch (searchError) { // console.error('Search error:', searchError); results = []; } // Ensure results is an array const validResults = Array.isArray(results) ? results : []; // Validate and fix image URLs for (const result of validResults) { if (result.image_url) { // Check if URL contains thumbnail markers const thumbnailPatterns = ['/60x60/', '/236x/', '/474x/', '/736x/']; let needsFix = false; // Check if matches any thumbnail pattern for (const pattern of thumbnailPatterns) { if (result.image_url.includes(pattern)) { needsFix = true; break; } } // Use regex to check more generic thumbnail formats if (!needsFix && result.image_url.match(/\/\d+x\d*\//)) { needsFix = true; } // If needs fixing, replace with original image URL if (needsFix) { // console.error(`Fixing thumbnail URL: ${result.image_url}`); result.image_url = result.image_url.replace(/\/\d+x\d*\//, '/originals/'); // console.error(`Fixed URL: ${result.image_url}`); } } } // Return results in MCP protocol format const contentItems = [ { type: 'text', text: `Found ${validResults.length} images related to "${keyword}" on Pinterest` } ]; // Add a text content item for each image result validResults.forEach((result, index) => { contentItems.push({ type: 'text', text: `Image ${index + 1}: ${result.title || 'No title'}` }); // Add image link contentItems.push({ type: 'text', text: `Link: ${result.image_url || 'No link'}` }); // Add original page link (if available) if (result.link && result.link !== result.image_url) { contentItems.push({ type: 'text', text: `Original page: ${result.link}` }); } // Add separator (except for last result) if (index < validResults.length - 1) { contentItems.push({ type: 'text', text: `---` }); } }); return { content: contentItems }; } catch (error: any) { console.error('Pinterest search handling error:', error); return { content: [ { type: 'text', text: `Error during search: ${error.message}` } ] }; } } /** * Handle Pinterest get image info requests */ private async handlePinterestGetImageInfo(args: any) { try { // Extract parameters const imageUrl = args.image_url; // Return image info return { content: [ { type: 'text', text: `Pinterest Image Information`, }, { type: 'text', text: JSON.stringify({ image_url: imageUrl, source: 'Pinterest', timestamp: new Date().toISOString(), }, null, 2), }, ], }; } catch (error: any) { console.error('Error getting Pinterest image info:', error); return { content: [ { type: 'text', text: `Error getting image info: ${error.message}`, }, ], }; } } /** * Handle Pinterest search and download requests */ private async handlePinterestSearchAndDownload(args: any) { try { // Extract parameters let keyword = ''; let limit = DEFAULT_SEARCH_LIMIT; let headless = DEFAULT_HEADLESS_MODE; let downloadDir = CURRENT_DOWNLOAD_DIR; // Normalize args if it's a string with backticks if (typeof args === 'string') { args = args.replace(/`/g, '"'); } // Handle different input types if (args) { if (typeof args === 'object') { // Extract keyword if ('keyword' in args && typeof args.keyword === 'string') { keyword = args.keyword.trim(); } else if ('`keyword`' in args) { keyword = String(args['`keyword`']).trim(); } // Extract limit if ('limit' in args && (typeof args.limit === 'number' || !isNaN(parseInt(String(args.limit))))) { limit = typeof args.limit === 'number' ? args.limit : parseInt(String(args.limit), 10); } else if ('`limit`' in args) { const limitValue = args['`limit`']; limit = typeof limitValue === 'number' ? limitValue : parseInt(String(limitValue), 10); } // Extract headless if ('headless' in args && typeof args.headless === 'boolean') { headless = args.headless; } else if ('`headless`' in args) { headless = Boolean(args['`headless`']); } } else if (typeof args === 'string') { // Try to parse as JSON try { let parsed; try { parsed = JSON.parse(args); } catch (jsonError) { const fixedJson = args .replace(/'/g, '"') .replace(/(\w+):/g, '"$1":'); parsed = JSON.parse(fixedJson); } if (parsed) { if (parsed.keyword && typeof parsed.keyword === 'string') { keyword = parsed.keyword.trim(); } if (parsed.limit !== undefined) { if (typeof parsed.limit === 'number') { limit = parsed.limit; } else if (typeof parsed.limit === 'string' && !isNaN(parseInt(parsed.limit))) { limit = parseInt(parsed.limit, 10); } } if (parsed.headless !== undefined && typeof parsed.headless === 'boolean') { headless = parsed.headless; } } } catch (e) { // If can't parse as JSON, try to extract using regex const keywordMatch = args.match(/["`']?keyword["`']?\s*[:=]\s*["`']([^"`']+)["`']/i); if (keywordMatch && keywordMatch[1]) { keyword = keywordMatch[1].trim(); } // Try to extract limit const limitMatch = args.match(/["`']?limit["`']?\s*[:=]\s*(\d+)/i); if (limitMatch && limitMatch[1]) { limit = parseInt(limitMatch[1], 10); } } } } // Use defaults if values are missing if (!keyword) { keyword = DEFAULT_SEARCH_KEYWORD; } if (isNaN(limit) || limit <= 0) { limit = DEFAULT_SEARCH_LIMIT; } // 始终使用环境变量设置或默认值 downloadDir = CURRENT_DOWNLOAD_DIR; // 创建包含关键词的下载目录 const keywordDir = path.join(downloadDir, keyword.replace(/\s+/g, '_').replace(/[^a-zA-Z0-9_\u4e00-\u9fa5]/g, '')); // Create download directory if it doesn't exist try { if (!fs.existsSync(keywordDir)) { fs.mkdirSync(keywordDir, { recursive: true }); } } catch (dirError: any) { return { content: [ { type: 'text', text: `创建下载目录失败: ${dirError.message}` } ] }; } // Execute search let results = []; try { // 创建不会触发取消的AbortController const controller = new AbortController(); results = await this.scraper.search(keyword, limit, headless, controller.signal); } catch (searchError) { results = []; } // Ensure results is an array const validResults = Array.isArray(results) ? results : []; // 最大重试次数,提高稳定性 const maxRetries = 3; // 使用batchDownload函数进行批量下载,传入文件名模板 const downloadResult = await batchDownload(validResults, keywordDir, { filenameTemplate: CURRENT_FILENAME_TEMPLATE, maxRetries }) as DownloadResult; // Return results in MCP protocol format const contentItems = [ { type: 'text', text: `搜索并下载了 ${validResults.length} 张与"${keyword}"相关的图片` }, { type: 'text', text: `成功: ${downloadResult.downloadedCount}, 失败: ${downloadResult.failedCount}` } ]; // Add a text content item for each download result downloadResult.downloaded.forEach((result: {success: boolean; id: string; path: string; url: string}, index: number) => { contentItems.push({ type: 'text', text: `图片 ${index + 1}: ${validResults[index]?.title || 'Unknown Title'}` }); contentItems.push({ type: 'text', text: `链接: ${result.url}` }); contentItems.push({ type: 'text', text: `保存位置: ${result.path}` }); // Add separator (except for last result) if (index < downloadResult.downloaded.length - 1) { contentItems.push({ type: 'text', text: `---` }); } }); // Add failed downloads if any if (downloadResult.failedCount > 0) { contentItems.push({ type: 'text', text: `--- 下载失败的图片 ---` }); downloadResult.failed.forEach((failed: {url: string; error: string}, index: number) => { contentItems.push({ type: 'text', text: `失败 ${index + 1}: ${failed.url}` }); contentItems.push({ type: 'text', text: `错误: ${failed.error}` }); // Add separator (except for last result) if (index < downloadResult.failed.length - 1) { contentItems.push({ type: 'text', text: `---` }); } }); } return { content: contentItems }; } catch (error: any) { console.error('Pinterest search and download handling error:', error); return { content: [ { type: 'text', text: `搜索和下载过程中出错: ${error.message}` } ] }; } } /** * Run the server */ async run(): Promise<void> { try { const transport = new StdioServerTransport(); await this.server.connect(transport); console.log('Pinterest MCP server running via stdio'); } catch (error) { console.error('Failed to start server:', error); process.exit(1); } } } // Create and run server const server = new PinterestMcpServer(); server.run().catch(error => { console.error('Error running server:', error); process.exit(1); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/terryso/mcp-pinterest'

If you have feedback or need assistance with the MCP directory API, please join our Discord server