/**
* Worker Manager - Spawns and manages stage-based workers
*
* Each worker runs in its own process on a dedicated port:
* - Research: Port 3001
* - Planning: Port 3002
* - Implementation: Port 3003
* - Testing: Port 3004
*/
import { spawn } from 'child_process';
import { EventEmitter } from 'events';
import path from 'path';
import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// Stage to port mapping
export const STAGE_PORTS = {
research: 3001,
planning: 3002,
implementation: 3003,
testing: 3004
};
/**
* Manages worker processes for each stage
*/
class WorkerManager extends EventEmitter {
constructor() {
super();
this.workers = new Map(); // stage -> { process, port, status }
this.serverUrl = process.env.SERVER_URL || 'http://localhost:3000';
}
/**
* Start a worker for a specific stage
* @param {string} stage - Stage name
* @returns {object} Worker info
*/
startWorker(stage) {
if (this.workers.has(stage)) {
console.error(`[WorkerManager] Worker for ${stage} already running`);
return this.workers.get(stage);
}
const port = STAGE_PORTS[stage];
if (!port) {
throw new Error(`Unknown stage: ${stage}`);
}
console.error(`[WorkerManager] Starting ${stage} worker on port ${port}...`);
// Spawn worker process
const workerProcess = spawn('node', [path.join(__dirname, 'worker.js')], {
env: {
...process.env,
WORKER_STAGE: stage,
WORKER_PORT: port.toString(),
SERVER_URL: this.serverUrl
},
stdio: ['pipe', 'pipe', 'pipe']
});
const workerInfo = {
process: workerProcess,
port,
stage,
status: 'starting',
startedAt: new Date(),
pid: workerProcess.pid
};
// Handle stdout
workerProcess.stdout.on('data', (data) => {
console.error(`[Worker:${stage}] ${data.toString().trim()}`);
});
// Handle stderr
workerProcess.stderr.on('data', (data) => {
console.error(`[Worker:${stage}:err] ${data.toString().trim()}`);
});
// Handle process exit
workerProcess.on('exit', (code, signal) => {
console.error(`[WorkerManager] ${stage} worker exited (code: ${code}, signal: ${signal})`);
workerInfo.status = 'stopped';
this.workers.delete(stage);
this.emit('worker_stopped', { stage, code, signal });
});
// Handle errors
workerProcess.on('error', (error) => {
console.error(`[WorkerManager] ${stage} worker error:`, error);
workerInfo.status = 'error';
this.emit('worker_error', { stage, error });
});
workerInfo.status = 'running';
this.workers.set(stage, workerInfo);
this.emit('worker_started', { stage, port, pid: workerProcess.pid });
return workerInfo;
}
/**
* Stop a worker for a specific stage
* @param {string} stage
*/
stopWorker(stage) {
const workerInfo = this.workers.get(stage);
if (!workerInfo) {
console.error(`[WorkerManager] No worker running for ${stage}`);
return false;
}
console.error(`[WorkerManager] Stopping ${stage} worker...`);
workerInfo.process.kill('SIGTERM');
this.workers.delete(stage);
return true;
}
/**
* Start workers for selected stages
* @param {string[]} stages - Array of stages to start workers for
*/
startWorkers(stages) {
const results = {};
for (const stage of stages) {
try {
results[stage] = this.startWorker(stage);
} catch (error) {
results[stage] = { error: error.message };
}
}
return results;
}
/**
* Start all workers (for full pipeline)
*/
startAllWorkers() {
return this.startWorkers(Object.keys(STAGE_PORTS));
}
/**
* Stop all running workers
*/
stopAllWorkers() {
const stopped = [];
for (const stage of this.workers.keys()) {
if (this.stopWorker(stage)) {
stopped.push(stage);
}
}
return stopped;
}
/**
* Get status of all workers
*/
getStatus() {
const workers = {};
for (const [stage, info] of this.workers) {
workers[stage] = {
port: info.port,
status: info.status,
pid: info.pid,
startedAt: info.startedAt
};
}
return {
activeWorkers: this.workers.size,
workers
};
}
/**
* Check if a worker is running for a stage
* @param {string} stage
*/
isWorkerRunning(stage) {
const worker = this.workers.get(stage);
return worker?.status === 'running';
}
}
// Singleton instance
const workerManager = new WorkerManager();
export { WorkerManager, workerManager };
export default workerManager;