Skip to main content
Glama
task-executor.js23.6 kB
/** * 任务执行器 - 管理和执行各种小红书相关任务 * 支持并发执行、重试机制、状态追踪和结果收集 */ import { EventEmitter } from 'events'; import Bull from 'bull'; import cron from 'node-cron'; import { logger } from '../utils/logger.js'; class TaskExecutor extends EventEmitter { constructor(options) { super(); this.dbManager = options.dbManager; this.browserManager = options.browserManager; this.config = options.config; this.logger = logger; this.queues = new Map(); this.workers = new Map(); this.scheduledTasks = new Map(); this.initializeQueues(); } /** * 初始化任务队列 */ initializeQueues() { // 创建不同类型的任务队列 const queueTypes = [ 'account_login', 'post_publish', 'content_search', 'user_info', 'post_info', 'comments_fetch', 'trending_fetch', 'data_analysis', 'account_sync' ]; queueTypes.forEach(type => { const queue = new Bull(type, { redis: this.config.redis, defaultJobOptions: { removeOnComplete: 50, removeOnFail: 100, attempts: 3, backoff: { type: 'exponential', delay: 2000 } } }); this.queues.set(type, queue); this.setupQueueProcessor(type, queue); }); this.logger.info('任务队列初始化完成', { types: queueTypes }); } /** * 设置队列处理器 */ setupQueueProcessor(type, queue) { queue.process(this.config.maxConcurrency, async (job) => { const { taskId, accountId, ...data } = job.data; try { // 更新任务状态为运行中 await this.updateTaskStatus(taskId, 'running'); // 执行任务逻辑 const result = await this.executeTask(type, { taskId, accountId, ...data }); // 更新任务状态为完成 await this.updateTaskStatus(taskId, 'completed', result); return result; } catch (error) { // 更新任务状态为失败 await this.updateTaskStatus(taskId, 'failed', null, error.message); // 记录失败日志 await this.logTaskError(taskId, error); throw error; } }); // 监听队列事件 queue.on('completed', (job, result) => { this.emit('task:completed', { job, result }); }); queue.on('failed', (job, error) => { this.emit('task:failed', { job, error }); }); queue.on('stalled', (job) => { this.logger.warn('任务停滞', { jobId: job.id, taskId: job.data.taskId }); }); } /** * 初始化任务执行器 */ async initialize() { try { this.logger.info('⚙️ 初始化任务执行器...'); // 加载待处理任务 await this.loadPendingTasks(); // 启动定时任务检查器 this.startScheduledTaskChecker(); // 启动任务清理器 this.startTaskCleaner(); this.logger.info('✅ 任务执行器初始化完成'); } catch (error) { this.logger.error('❌ 任务执行器初始化失败', { error: error.message }); throw error; } } /** * 创建任务 */ async createTask(taskData) { const { type, accountId, priority = 1, scheduledTime, cronExpression, ...data } = taskData; try { // 插入任务到数据库 const result = await this.dbManager.query( `INSERT INTO idea_xiaohongshu_tasks (task_type, account_id, task_data, cron_expression, priority, scheduled_time) VALUES (?, ?, ?, ?, ?, ?)`, [type, accountId, JSON.stringify(data), cronExpression, priority, scheduledTime] ); const taskId = result.insertId; // 如果是定时任务 if (cronExpression) { this.scheduleCronTask(taskId, cronExpression, taskData); } else if (scheduledTime && new Date(scheduledTime) > new Date()) { // 如果是延迟任务 this.scheduleDelayedTask(taskId, scheduledTime, taskData); } else { // 立即执行的任务 await this.enqueueTask(type, { taskId, accountId, ...data }, priority); } this.emit('task:created', { taskId, type, accountId }); return taskId; } catch (error) { this.logger.error('创建任务失败', { error: error.message, taskData }); throw error; } } /** * 将任务加入队列 */ async enqueueTask(type, data, priority = 1) { const queue = this.queues.get(type); if (!queue) { throw new Error(`任务类型不支持: ${type}`); } await queue.add(data, { priority }); } /** * 执行任务 */ async executeTask(type, taskData) { switch (type) { case 'account_login': return await this.executeAccountLogin(taskData); case 'post_publish': return await this.executePostPublish(taskData); case 'content_search': return await this.executeContentSearch(taskData); case 'user_info': return await this.executeUserInfo(taskData); case 'post_info': return await this.executePostInfo(taskData); case 'comments_fetch': return await this.executeCommentsFetch(taskData); case 'trending_fetch': return await this.executeTrendingFetch(taskData); case 'data_analysis': return await this.executeDataAnalysis(taskData); case 'account_sync': return await this.executeAccountSync(taskData); default: throw new Error(`未知任务类型: ${type}`); } } /** * 执行账号登录任务 */ async executeAccountLogin({ taskId, accountId, method, credentials }) { try { // 获取账号信息 const accounts = await this.dbManager.query( 'SELECT * FROM idea_xiaohongshu_accounts WHERE id = ?', [accountId] ); if (accounts.length === 0) { throw new Error('账号不存在'); } const account = accounts[0]; // 获取代理和指纹信息 const proxies = await this.dbManager.query( 'SELECT * FROM idea_xiaohongshu_proxies WHERE id = ?', [account.proxy_id] ); const fingerprints = await this.dbManager.query( 'SELECT * FROM idea_xiaohongshu_fingerprints WHERE id = ?', [account.fingerprint_id] ); const proxy = proxies[0]; const fingerprint = fingerprints[0]; // 创建浏览器上下文 const context = await this.browserManager.createContext({ proxy, fingerprint, accountId }); // 执行登录操作 const loginResult = await this.performLogin(context, account, method, credentials); // 更新账号状态 await this.dbManager.query( 'UPDATE idea_xiaohongshu_accounts SET login_status = ?, cookies_encrypted = ?, last_login_time = NOW() WHERE id = ?', [true, JSON.stringify(loginResult.cookies), accountId] ); return { success: true, message: '登录成功', accountId, username: account.username }; } catch (error) { this.logger.error('账号登录失败', { error: error.message, accountId }); throw error; } } /** * 执行笔记发布任务 */ async executePostPublish({ taskId, accountId, postId, immediate }) { try { // 获取账号和笔记信息 const accounts = await this.dbManager.query( 'SELECT * FROM idea_xiaohongshu_accounts WHERE id = ? AND login_status = TRUE', [accountId] ); if (accounts.length === 0) { throw new Error('账号不存在或未登录'); } const posts = await this.dbManager.query( 'SELECT * FROM idea_xiaohongshu_posts WHERE id = ? AND account_id = ?', [postId, accountId] ); if (posts.length === 0) { throw new Error('笔记不存在'); } const account = accounts[0]; const post = posts[0]; // 获取浏览器上下文 const context = await this.browserManager.getContext(accountId); // 执行发布操作 const publishResult = await this.performPublish(context, post); // 更新笔记状态 await this.dbManager.query( 'UPDATE idea_xiaohongshu_posts SET status = "published", post_id = ?, published_time = NOW() WHERE id = ?', [publishResult.postId, postId] ); return { success: true, message: '发布成功', postId: publishResult.postId, url: publishResult.url }; } catch (error) { // 更新笔记状态为失败 await this.dbManager.query( 'UPDATE idea_xiaohongshu_posts SET status = "failed" WHERE id = ?', [postId] ); this.logger.error('笔记发布失败', { error: error.message, postId }); throw error; } } /** * 执行内容搜索任务 */ async executeContentSearch({ taskId, accountId, keyword, searchType, limit, sort }) { try { // 获取账号信息 const accounts = await this.dbManager.query( 'SELECT * FROM idea_xiaohongshu_accounts WHERE id = ? AND login_status = TRUE', [accountId] ); if (accounts.length === 0) { throw new Error('账号不存在或未登录'); } // 获取浏览器上下文 const context = await this.browserManager.getContext(accountId); // 执行搜索操作 const searchResults = await this.performSearch(context, { keyword, type: searchType, limit, sort }); // 保存搜索结果 for (const result of searchResults) { await this.saveSearchResult(taskId, result); } return { success: true, message: '搜索完成', resultsCount: searchResults.length, keyword }; } catch (error) { this.logger.error('内容搜索失败', { error: error.message, keyword }); throw error; } } /** * 执行用户信息获取任务 */ async executeUserInfo({ taskId, accountId, userId }) { try { // 获取账号信息 const accounts = await this.dbManager.query( 'SELECT * FROM idea_xiaohongshu_accounts WHERE id = ? AND login_status = TRUE', [accountId] ); if (accounts.length === 0) { throw new Error('账号不存在或未登录'); } // 获取浏览器上下文 const context = await this.browserManager.getContext(accountId); // 获取用户信息 const userInfo = await this.performGetUserInfo(context, userId); // 保存用户信息 await this.saveUserInfo(userInfo); return { success: true, message: '用户信息获取完成', userId, username: userInfo.nickname }; } catch (error) { this.logger.error('获取用户信息失败', { error: error.message, userId }); throw error; } } /** * 执行笔记详情获取任务 */ async executePostInfo({ taskId, accountId, postId }) { try { // 获取账号信息 const accounts = await this.dbManager.query( 'SELECT * FROM idea_xiaohongshu_accounts WHERE id = ? AND login_status = TRUE', [accountId] ); if (accounts.length === 0) { throw new Error('账号不存在或未登录'); } // 获取浏览器上下文 const context = await this.browserManager.getContext(accountId); // 获取笔记详情 const postInfo = await this.performGetPostInfo(context, postId); // 保存笔记信息 await this.savePostInfo(postInfo); return { success: true, message: '笔记详情获取完成', postId, title: postInfo.title }; } catch (error) { this.logger.error('获取笔记详情失败', { error: error.message, postId }); throw error; } } /** * 执行评论获取任务 */ async executeCommentsFetch({ taskId, accountId, postId, limit, offset }) { try { // 获取账号信息 const accounts = await this.dbManager.query( 'SELECT * FROM idea_xiaohongshu_accounts WHERE id = ? AND login_status = TRUE', [accountId] ); if (accounts.length === 0) { throw new Error('账号不存在或未登录'); } // 获取浏览器上下文 const context = await this.browserManager.getContext(accountId); // 获取评论 const comments = await this.performGetComments(context, postId, limit, offset); // 保存评论信息 await this.saveComments(taskId, comments); return { success: true, message: '评论获取完成', postId, commentsCount: comments.length }; } catch (error) { this.logger.error('获取评论失败', { error: error.message, postId }); throw error; } } /** * 执行热门内容获取任务 */ async executeTrendingFetch({ taskId, accountId, category, limit }) { try { // 获取账号信息 const accounts = await this.dbManager.query( 'SELECT * FROM idea_xiaohongshu_accounts WHERE id = ? AND login_status = TRUE', [accountId] ); if (accounts.length === 0) { throw new Error('账号不存在或未登录'); } // 获取浏览器上下文 const context = await this.browserManager.getContext(accountId); // 获取热门内容 const trendingPosts = await this.performGetTrending(context, category, limit); // 保存热门内容 await this.saveTrendingPosts(taskId, trendingPosts); return { success: true, message: '热门内容获取完成', category, postsCount: trendingPosts.length }; } catch (error) { this.logger.error('获取热门内容失败', { error: error.message, category }); throw error; } } // ===== 辅助方法 ===== /** * 加载待处理任务 */ async loadPendingTasks() { try { const pendingTasks = await this.dbManager.query( 'SELECT * FROM idea_xiaohongshu_tasks WHERE status = "pending" AND scheduled_time <= NOW()' ); for (const task of pendingTasks) { const taskData = JSON.parse(task.task_data); await this.enqueueTask(task.task_type, { taskId: task.id, accountId: task.account_id, ...taskData }, task.priority); await this.updateTaskStatus(task.id, 'queued'); } this.logger.info('加载待处理任务完成', { count: pendingTasks.length }); } catch (error) { this.logger.error('加载待处理任务失败', { error: error.message }); } } /** * 更新任务状态 */ async updateTaskStatus(taskId, status, result = null, error = null) { try { const updates = ['status = ?']; const values = [status]; if (status === 'running') { updates.push('started_time = NOW()'); } else if (status === 'completed' || status === 'failed') { updates.push('completed_time = NOW()'); if (result) { updates.push('result_data = ?'); values.push(JSON.stringify(result)); } if (error) { updates.push('error_message = ?'); values.push(error); } } values.push(taskId); await this.dbManager.query( `UPDATE idea_xiaohongshu_tasks SET ${updates.join(', ')} WHERE id = ?`, values ); this.emit('task:statusChanged', { taskId, status, result, error }); } catch (error) { this.logger.error('更新任务状态失败', { error: error.message, taskId, status }); } } /** * 记录任务错误 */ async logTaskError(taskId, error) { try { await this.dbManager.query( 'UPDATE idea_xiaohongshu_tasks SET retry_count = retry_count + 1 WHERE id = ?', [taskId] ); } catch (updateError) { this.logger.error('记录任务错误失败', { error: updateError.message, taskId }); } } /** * 调度定时任务 */ scheduleCronTask(taskId, cronExpression, taskData) { const task = cron.schedule(cronExpression, async () => { try { await this.createTask(taskData); } catch (error) { this.logger.error('定时任务执行失败', { error: error.message, taskId }); } }); this.scheduledTasks.set(taskId, task); } /** * 调度延迟任务 */ scheduleDelayedTask(taskId, scheduledTime, taskData) { const delay = new Date(scheduledTime).getTime() - Date.now(); setTimeout(async () => { try { await this.createTask(taskData); } catch (error) { this.logger.error('延迟任务执行失败', { error: error.message, taskId }); } }, delay); } /** * 启动定时任务检查器 */ startScheduledTaskChecker() { setInterval(async () => { try { await this.loadPendingTasks(); } catch (error) { this.logger.error('定时任务检查失败', { error: error.message }); } }, 60000); // 每分钟检查一次 } /** * 启动任务清理器 */ startTaskCleaner() { setInterval(async () => { try { // 清理过期任务(7天前的已完成任务) await this.dbManager.query( 'DELETE FROM idea_xiaohongshu_tasks WHERE status IN ("completed", "failed", "cancelled") AND completed_time < DATE_SUB(NOW(), INTERVAL 7 DAY)' ); this.logger.info('任务清理完成'); } catch (error) { this.logger.error('任务清理失败', { error: error.message }); } }, 3600000); // 每小时清理一次 } /** * 取消任务 */ async cancelTask(taskId) { try { // 更新任务状态 await this.updateTaskStatus(taskId, 'cancelled'); // 取消定时任务 if (this.scheduledTasks.has(taskId)) { this.scheduledTasks.get(taskId).stop(); this.scheduledTasks.delete(taskId); } this.logger.info('任务已取消', { taskId }); } catch (error) { this.logger.error('取消任务失败', { error: error.message, taskId }); throw error; } } /** * 获取任务状态 */ async getTaskStatus(taskId) { try { const tasks = await this.dbManager.query( 'SELECT * FROM idea_xiaohongshu_tasks WHERE id = ?', [taskId] ); if (tasks.length === 0) { throw new Error('任务不存在'); } const task = tasks[0]; task.task_data = task.task_data ? JSON.parse(task.task_data) : null; task.result_data = task.result_data ? JSON.parse(task.result_data) : null; return task; } catch (error) { this.logger.error('获取任务状态失败', { error: error.message, taskId }); throw error; } } /** * 停止任务执行器 */ async stop() { this.logger.info('⚙️ 停止任务执行器...'); // 停止所有队列 for (const [type, queue] of this.queues) { await queue.close(); } // 停止所有定时任务 for (const [taskId, task] of this.scheduledTasks) { task.stop(); } this.queues.clear(); this.workers.clear(); this.scheduledTasks.clear(); this.logger.info('✅ 任务执行器已停止'); } /** * 健康检查 */ async healthCheck() { try { let status = 'healthy'; let details = {}; // 检查队列状态 for (const [type, queue] of this.queues) { const waiting = await queue.getWaiting(); const active = await queue.getActive(); const completed = await queue.getCompleted(); const failed = await queue.getFailed(); details[type] = { waiting: waiting.length, active: active.length, completed: completed.length, failed: failed.length }; if (failed.length > 100) { status = 'warning'; } } return { status, details, timestamp: new Date().toISOString() }; } catch (error) { return { status: 'unhealthy', error: error.message, timestamp: new Date().toISOString() }; } } // ===== 模拟执行方法(实际实现时会调用小红书API) ===== async performLogin(context, account, method, credentials) { // 这里实现实际的小红书登录逻辑 // 返回模拟数据 return { success: true, cookies: [ { name: 'a1', value: 'xxx', domain: '.xiaohongshu.com' }, { name: 'webId', value: 'xxx', domain: '.xiaohongshu.com' } ] }; } async performPublish(context, post) { // 这里实现实际的小红书发布逻辑 // 返回模拟数据 return { postId: `post_${Date.now()}`, url: `https://www.xiaohongshu.com/explore/post_${Date.now()}` }; } async performSearch(context, params) { // 这里实现实际的小红书搜索逻辑 // 返回模拟数据 return [ { id: 'note1', title: '搜索结果1', author: 'user1' }, { id: 'note2', title: '搜索结果2', author: 'user2' } ]; } async performGetUserInfo(context, userId) { // 这里实现实际的小红书用户信息获取逻辑 // 返回模拟数据 return { userId, nickname: '用户昵称', followerCount: 1000, followingCount: 500, postCount: 50 }; } async performGetPostInfo(context, postId) { // 这里实现实际的小红书笔记详情获取逻辑 // 返回模拟数据 return { postId, title: '笔记标题', content: '笔记内容', likeCount: 100, commentCount: 20, shareCount: 5 }; } async performGetComments(context, postId, limit, offset) { // 这里实现实际的小红书评论获取逻辑 // 返回模拟数据 return [ { id: 'comment1', content: '评论1', author: 'user1' }, { id: 'comment2', content: '评论2', author: 'user2' } ]; } async performGetTrending(context, category, limit) { // 这里实现实际的小红书热门内容获取逻辑 // 返回模拟数据 return [ { id: 'trending1', title: '热门内容1', category }, { id: 'trending2', title: '热门内容2', category } ]; } // ===== 数据保存方法 ===== async saveSearchResult(taskId, result) { // 保存搜索结果到数据库 // 实现省略... } async saveUserInfo(userInfo) { // 保存用户信息到数据库 await this.dbManager.query( `INSERT INTO idea_xiaohongshu_users (user_id, nickname, follower_count, following_count, post_count) VALUES (?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE nickname = VALUES(nickname), follower_count = VALUES(follower_count), following_count = VALUES(following_count), post_count = VALUES(post_count)`, [userInfo.userId, userInfo.nickname, userInfo.followerCount, userInfo.followingCount, userInfo.postCount] ); } async savePostInfo(postInfo) { // 保存笔记信息到数据库 // 实现省略... } async saveComments(taskId, comments) { // 保存评论信息到数据库 // 实现省略... } async saveTrendingPosts(taskId, posts) { // 保存热门内容到数据库 // 实现省略... } } export default TaskExecutor;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/billyangbc/xiaohongshu-mcp-nodejs'

If you have feedback or need assistance with the MCP directory API, please join our Discord server