/**
* Task Queue System with Stage-Based Pipeline Support
*
* Manages task execution through configurable pipeline stages:
* - Research → Planning → Implementation → Testing
* - Users can select which stages to run at execution time
*/
import { EventEmitter } from 'events';
// Pipeline stages in execution order
export const STAGES = ['research', 'planning', 'implementation', 'testing'];
// Stage presets for quick selection
export const PRESETS = {
quick: ['implementation'],
standard: ['planning', 'implementation', 'testing'],
full: ['research', 'planning', 'implementation', 'testing']
};
/**
* Represents a task in the pipeline
*/
class PipelineTask {
constructor(taskId, stages, taskData = {}) {
this.taskId = taskId;
this.stages = stages; // Array of stages to execute
this.currentStageIndex = 0;
this.status = 'queued'; // queued, in_progress, completed, failed
this.stageResults = {}; // Results from each stage
this.taskData = taskData; // Original task data from DB
this.createdAt = new Date();
this.updatedAt = new Date();
}
get currentStage() {
return this.stages[this.currentStageIndex] || null;
}
get isComplete() {
return this.currentStageIndex >= this.stages.length;
}
advanceStage(result) {
this.stageResults[this.currentStage] = result;
this.currentStageIndex++;
this.updatedAt = new Date();
if (this.isComplete) {
this.status = 'completed';
}
}
fail(error) {
this.status = 'failed';
this.error = error;
this.updatedAt = new Date();
}
}
/**
* Stage-based Task Queue
* Each stage has its own queue that workers poll from
*/
class TaskQueue extends EventEmitter {
constructor() {
super();
// Queue per stage
this.stageQueues = {
research: [],
planning: [],
implementation: [],
testing: []
};
// Track all pipeline tasks by ID
this.pipelineTasks = new Map();
}
/**
* Add a task to the pipeline with selected stages
* @param {string} taskId - Task ID from database
* @param {string[]} stages - Array of stages to execute (e.g., ['research', 'planning'])
* @param {object} taskData - Task data from database
* @returns {PipelineTask}
*/
enqueue(taskId, stages, taskData = {}) {
// Validate stages
const validStages = stages.filter(s => STAGES.includes(s));
if (validStages.length === 0) {
throw new Error('At least one valid stage is required');
}
// Sort stages in correct order
const orderedStages = STAGES.filter(s => validStages.includes(s));
// Create pipeline task
const pipelineTask = new PipelineTask(taskId, orderedStages, taskData);
this.pipelineTasks.set(taskId, pipelineTask);
// Add to first stage's queue
const firstStage = orderedStages[0];
this.stageQueues[firstStage].push(taskId);
this.emit('enqueued', { taskId, stage: firstStage, pipelineTask });
return pipelineTask;
}
/**
* Get next task for a specific stage (called by workers)
* @param {string} stage - Stage name (research, planning, etc.)
* @returns {PipelineTask|null}
*/
dequeue(stage) {
if (!this.stageQueues[stage]) {
return null;
}
const taskId = this.stageQueues[stage].shift();
if (!taskId) {
return null;
}
const pipelineTask = this.pipelineTasks.get(taskId);
if (pipelineTask) {
pipelineTask.status = 'in_progress';
this.emit('dequeued', { taskId, stage, pipelineTask });
}
return pipelineTask;
}
/**
* Complete a stage and move task to next stage
* @param {string} taskId
* @param {string} stage - Current stage that completed
* @param {object} result - Result from the stage
*/
completeStage(taskId, stage, result) {
const pipelineTask = this.pipelineTasks.get(taskId);
if (!pipelineTask) {
throw new Error(`Task ${taskId} not found`);
}
if (pipelineTask.currentStage !== stage) {
throw new Error(`Task ${taskId} is not in stage ${stage}`);
}
// Advance to next stage
pipelineTask.advanceStage(result);
if (pipelineTask.isComplete) {
// Task is done!
this.emit('completed', { taskId, pipelineTask });
} else {
// Queue for next stage
const nextStage = pipelineTask.currentStage;
this.stageQueues[nextStage].push(taskId);
this.emit('stage_advanced', { taskId, fromStage: stage, toStage: nextStage, pipelineTask });
}
return pipelineTask;
}
/**
* Mark a task as failed
* @param {string} taskId
* @param {string} error
*/
failTask(taskId, error) {
const pipelineTask = this.pipelineTasks.get(taskId);
if (!pipelineTask) {
throw new Error(`Task ${taskId} not found`);
}
pipelineTask.fail(error);
this.emit('failed', { taskId, error, pipelineTask });
return pipelineTask;
}
/**
* Get status of all queues
*/
getStatus() {
const queueLengths = {};
for (const [stage, queue] of Object.entries(this.stageQueues)) {
queueLengths[stage] = queue.length;
}
return {
queueLengths,
totalTasks: this.pipelineTasks.size,
tasks: Array.from(this.pipelineTasks.values()).map(t => ({
taskId: t.taskId,
status: t.status,
currentStage: t.currentStage,
stages: t.stages,
stageResults: t.stageResults
}))
};
}
/**
* Get a specific pipeline task
* @param {string} taskId
*/
getTask(taskId) {
return this.pipelineTasks.get(taskId);
}
/**
* Check if a stage has pending work
* @param {string} stage
*/
hasWork(stage) {
return this.stageQueues[stage]?.length > 0;
}
}
// Singleton instance
const taskQueue = new TaskQueue();
export { TaskQueue, PipelineTask, taskQueue };
export default taskQueue;