Skip to main content
Glama

mcp-pinterest

/** * Pinterest Scraper 取消功能测试 * * 专注于测试搜索和下载的取消功能 */ import { jest, describe, expect, it, beforeEach, beforeAll, afterAll } from '@jest/globals'; import fs from 'fs'; import path from 'path'; import os from 'os'; import EventEmitter from 'events'; // 创建模拟的浏览器请求处理 class MockRequest extends EventEmitter { abort: jest.Mock; constructor() { super(); this.abort = jest.fn(); } } // 创建模拟的浏览器页面 class MockPage extends EventEmitter { setViewport: jest.Mock; setUserAgent: jest.Mock; setDefaultNavigationTimeout: jest.Mock; setDefaultTimeout: jest.Mock; setRequestInterception: jest.Mock; goto: jest.Mock; waitForSelector: jest.Mock; evaluate: jest.Mock; constructor() { super(); this.setViewport = jest.fn().mockResolvedValue(undefined); this.setUserAgent = jest.fn().mockResolvedValue(undefined); this.setDefaultNavigationTimeout = jest.fn(); this.setDefaultTimeout = jest.fn(); this.setRequestInterception = jest.fn().mockResolvedValue(undefined); this.goto = jest.fn().mockResolvedValue(undefined); this.waitForSelector = jest.fn().mockResolvedValue(undefined); this.evaluate = jest.fn().mockResolvedValue([]); } } // 创建模拟的浏览器 class MockBrowser extends EventEmitter { newPage: jest.Mock; close: jest.Mock; constructor() { super(); this.newPage = jest.fn().mockResolvedValue(new MockPage()); this.close = jest.fn().mockResolvedValue(undefined); } } // 为模拟puppeteer创建 const puppeteerMock = { launch: jest.fn().mockResolvedValue(new MockBrowser()) }; // 模拟AbortController class MockAbortController { signal: { aborted: boolean }; abort: jest.Mock; constructor() { this.signal = { aborted: false }; this.abort = jest.fn().mockImplementation(() => { this.signal.aborted = true; }); } } // 模拟全局fetch global.fetch = jest.fn().mockImplementation((url) => { return Promise.resolve({ ok: true, status: 200, arrayBuffer: () => Promise.resolve(Buffer.from('测试图片数据').buffer) }); }) as unknown as typeof global.fetch; // 模拟fs模块 jest.mock('fs', () => { const originalFs = jest.requireActual<typeof fs>('fs'); return { writeFileSync: jest.fn(), existsSync: jest.fn().mockReturnValue(true), mkdirSync: originalFs.mkdirSync, promises: { ...originalFs.promises, mkdir: jest.fn().mockResolvedValue(undefined) } }; }); // 模拟puppeteer-core jest.mock('puppeteer-core', () => puppeteerMock); // 模拟全局AbortController global.AbortController = MockAbortController as any; describe('PinterestScraper搜索和下载取消测试', () => { let PinterestScraper: any; let scraper: any; let tempDir: string; let mockPage: MockPage; let mockBrowser: MockBrowser; let mockRequest: MockRequest; let abortController: MockAbortController; beforeAll(async () => { // 动态导入真实模块,但完全控制导入的模块 const module = await import('../pinterest-scraper.js'); PinterestScraper = module.default; // 创建临时目录用于下载测试 tempDir = path.join(os.tmpdir(), 'pinterest-test-' + Date.now()); if (!fs.existsSync(tempDir)) { fs.mkdirSync(tempDir, { recursive: true }); } // 设置更长的测试超时时间 - 全局设置30秒超时 jest.setTimeout(30000); }); beforeEach(() => { // 重置所有模拟 jest.clearAllMocks(); // 创建模拟的页面、浏览器和请求对象 mockRequest = new MockRequest(); mockPage = new MockPage(); mockBrowser = new MockBrowser(); // 重置浏览器启动模拟 puppeteerMock.launch.mockResolvedValue(mockBrowser); mockBrowser.newPage.mockResolvedValue(mockPage); // 创建AbortController实例 abortController = new MockAbortController(); // 创建新的爬虫实例 scraper = new PinterestScraper(); }); afterAll(() => { jest.restoreAllMocks(); }); describe('取消搜索操作', () => { // 增加单个测试的超时时间 it('应能取消正在进行的搜索', async () => { // 修复页面模拟,不再依赖puppeteer-core jest.spyOn(scraper, 'search').mockImplementation(async (keyword, limit, headless, signal) => { // 等待一段时间以模拟搜索过程 await new Promise(resolve => setTimeout(resolve, 200)); // 检查是否被取消 if (signal && signal.aborted) { throw new Error('操作被取消'); } return []; }); // 启动搜索,但不等待它完成 const searchPromise = scraper.search('测试', 10, true, abortController.signal); // 模拟用户取消操作 setTimeout(() => { abortController.abort(); }, 100); // 验证搜索被取消 try { await searchPromise; // 如果没有抛出错误,则测试失败 expect(true).toBe(false); } catch (error) { expect(error.message).toBe('操作被取消'); } }, 6000); // 增加超时时间到6秒 it('应正确处理搜索完成前的取消', async () => { // 使用模拟实现替代原始方法 jest.spyOn(scraper, 'search').mockImplementation(async (keyword, limit, headless, signal) => { await new Promise<void>((resolve, reject) => { const timeoutId = setTimeout(() => { resolve(); }, 500); // 检查是否被取消 const checkInterval = setInterval(() => { if (signal && signal.aborted) { clearTimeout(timeoutId); clearInterval(checkInterval); reject(new Error('操作被取消')); } }, 100); }); return []; }); // 启动搜索,但不等待它完成 const searchPromise = scraper.search('测试', 10, true, abortController.signal); // 模拟用户取消操作 setTimeout(() => { abortController.abort(); }, 200); // 验证搜索被取消 try { await searchPromise; // 如果没有抛出错误,则测试失败 expect(true).toBe(false); } catch (error) { expect(error.message).toBe('操作被取消'); } }); }); describe('取消下载操作', () => { it('应能取消正在进行的图片下载', async () => { // 模拟一个长时间运行的fetch请求,但现在使用自己的实现替换 jest.spyOn(scraper, 'downloadImage').mockImplementation(async (imageUrl, outputPath, signal) => { if (!imageUrl || !outputPath) { return false; } try { // 模拟下载过程 await new Promise<void>((resolve, reject) => { const timeoutId = setTimeout(() => { resolve(); }, 500); // 检查是否被取消 const checkInterval = setInterval(() => { if (signal && signal.aborted) { clearTimeout(timeoutId); clearInterval(checkInterval); reject(new Error('操作被取消')); } }, 50); }); // 如果到达这里,说明没有被取消 return true; } catch (error) { // 处理取消情况 if (error.message === '操作被取消') { console.log('下载被取消了'); return false; } throw error; } }); // 启动下载,但不等待它完成 const imageUrl = 'https://i.pinimg.com/originals/test.jpg'; const outputPath = path.join(tempDir, 'test.jpg'); // 确保异步操作的正确顺序 const downloadPromise = scraper.downloadImage(imageUrl, outputPath, abortController.signal); // 等待一段时间确保下载开始 await new Promise(resolve => setTimeout(resolve, 50)); // 模拟用户取消操作 abortController.abort(); // 验证下载被取消 const result = await downloadPromise; expect(result).toBe(false); }); it('应处理多个并行下载的取消', async () => { // 修改测试,更可靠地模拟取消行为 const mockDownload = jest.fn().mockImplementation(async (url, path, signal, delay) => { return new Promise((resolve, reject) => { const timer = setTimeout(() => { resolve({ status: 'fulfilled', value: true }); }, delay); // 检查信号 const checkInterval = setInterval(() => { if (signal && signal.aborted) { clearTimeout(timer); clearInterval(checkInterval); reject(new Error('操作被取消')); } }, 50); }); }); // 创建多个下载任务 const imageUrls = [ 'https://i.pinimg.com/originals/test1.jpg', 'https://i.pinimg.com/originals/test2.jpg', 'https://i.pinimg.com/originals/test3.jpg' ]; const outputPaths = imageUrls.map((_, index) => path.join(tempDir, `test${index + 1}.jpg`)); // 并行启动所有下载 const downloadPromises = imageUrls.map((url, index) => mockDownload(url, outputPaths[index], abortController.signal, 500 + index * 500) ); // 模拟用户取消操作 setTimeout(() => { abortController.abort(); }, 300); // 等待所有下载完成或被取消 try { await Promise.all(downloadPromises); // 如果没有错误,则测试失败 expect(true).toBe(false); } catch (error) { expect(error.message).toBe('操作被取消'); } }); }); describe('取消整个搜索和下载流程', () => { it('应能在搜索和下载过程中的任何阶段取消操作', async () => { // 模拟搜索结果 - 使用可控制的模拟 jest.spyOn(scraper, 'search').mockImplementation(async (keyword, limit, headless, signal) => { // 模拟延迟 await new Promise<void>((resolve, reject) => { const timeoutId = setTimeout(() => { resolve(); }, 500); // 检查是否被取消 const checkInterval = setInterval(() => { if (signal && signal.aborted) { clearTimeout(timeoutId); clearInterval(checkInterval); reject(new Error('操作被取消')); } }, 50); }); // 返回模拟数据 return [ { title: '图片1', image_url: 'https://i.pinimg.com/236x/img1.jpg', link: 'https://pinterest.com/pin/1', source: 'pinterest' }, { title: '图片2', image_url: 'https://i.pinimg.com/236x/img2.jpg', link: 'https://pinterest.com/pin/2', source: 'pinterest' } ]; }); // 模拟下载实现 jest.spyOn(scraper, 'downloadImage').mockImplementation(async (imageUrl, outputPath, signal) => { // 模拟延迟 await new Promise<void>((resolve, reject) => { const timeoutId = setTimeout(() => { resolve(); }, 500); // 检查是否被取消 const checkInterval = setInterval(() => { if (signal && signal.aborted) { clearTimeout(timeoutId); clearInterval(checkInterval); reject(new Error('操作被取消')); } }, 50); }); return true; }); // 模拟完整流程: 搜索+下载 const processImagesAndDownload = async () => { // 搜索图片 const results = await scraper.search('测试', 2, true, abortController.signal); // 如果搜索成功,开始下载 if (results && results.length > 0) { for (const result of results) { // 检查是否已取消 if (abortController.signal.aborted) { throw new Error('操作已取消'); } // 下载图片 await scraper.downloadImage( result.image_url, path.join(tempDir, path.basename(result.image_url)), abortController.signal ); } } return results; }; // 启动流程 const processPromise = processImagesAndDownload(); // 模拟用户取消 setTimeout(() => { abortController.abort(); }, 300); // 验证流程被取消 try { await processPromise; // 如果没有抛出错误,则测试失败 expect(true).toBe(false); } catch (error) { expect(error.message).toMatch(/操作(已|被)取消/); } }); }); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/terryso/mcp-pinterest'

If you have feedback or need assistance with the MCP directory API, please join our Discord server