// server/lib/code-worker-pool.ts — Persistent pre-forked worker pool for code steps
// Eliminates fork-per-execution overhead. Workers are recycled after MAX_USES executions.
import { fork, type ChildProcess } from "node:child_process";
import { join, dirname } from "node:path";
import { fileURLToPath } from "node:url";
const __dirname_pool = dirname(fileURLToPath(import.meta.url));
const WORKER_PATH = join(__dirname_pool, "code-worker.js");
const POOL_SIZE = parseInt(process.env.CODE_WORKER_POOL_SIZE || "4", 10);
const MAX_USES = 50; // Recycle worker after N executions (prevent memory leaks)
const ACQUIRE_TIMEOUT_MS = 10_000; // Max wait to acquire a worker
interface PoolWorker {
child: ChildProcess;
busy: boolean;
uses: number;
ready: boolean;
}
interface ExecuteRequest {
code: string;
context: { steps: Record<string, unknown>; trigger: Record<string, unknown>; input?: unknown };
timeoutMs: number;
}
interface ExecuteResult {
success: boolean;
output?: unknown;
error?: string;
logs?: string[];
}
const pool: PoolWorker[] = [];
const waitQueue: Array<{ resolve: (w: PoolWorker) => void; timer: ReturnType<typeof setTimeout> }> = [];
let initialized = false;
// Restricted env for worker processes — strip server secrets (API keys, DB URLs, etc.)
const WORKER_ENV: Record<string, string> = {
NODE_ENV: process.env.NODE_ENV || "production",
PATH: process.env.PATH || "/usr/local/bin:/usr/bin:/bin",
};
function spawnWorker(): PoolWorker {
const child = fork(WORKER_PATH, [], {
stdio: "pipe",
serialization: "json",
env: WORKER_ENV,
});
const worker: PoolWorker = { child, busy: false, uses: 0, ready: false };
child.on("message", (msg: any) => {
if (msg.type === "ready") {
worker.ready = true;
}
});
child.on("exit", () => {
// Remove dead worker from pool and replace it
const idx = pool.indexOf(worker);
if (idx >= 0) {
pool.splice(idx, 1);
try {
pool.push(spawnWorker());
} catch {
// Can't respawn — pool shrinks
}
}
});
child.on("error", () => {
// Will trigger exit event above
});
return worker;
}
/** Initialize the worker pool. Safe to call multiple times. */
export function initWorkerPool(): void {
if (initialized) return;
initialized = true;
for (let i = 0; i < POOL_SIZE; i++) {
try {
pool.push(spawnWorker());
} catch {
console.error(`[worker-pool] Failed to spawn worker ${i + 1}/${POOL_SIZE}`);
}
}
console.log(`[worker-pool] Initialized with ${pool.length} workers`);
}
/** Acquire an available worker, or wait in queue. */
function acquireWorker(): Promise<PoolWorker> {
// Try to find an idle worker
const idle = pool.find(w => !w.busy && w.ready);
if (idle) {
idle.busy = true;
return Promise.resolve(idle);
}
// No idle worker — wait in queue
return new Promise<PoolWorker>((resolve, reject) => {
const timer = setTimeout(() => {
const idx = waitQueue.findIndex(q => q.resolve === resolve);
if (idx >= 0) waitQueue.splice(idx, 1);
reject(new Error("Timed out waiting for available code worker"));
}, ACQUIRE_TIMEOUT_MS);
waitQueue.push({ resolve, timer });
});
}
/** Release a worker back to the pool. Recycle if MAX_USES reached. */
function releaseWorker(worker: PoolWorker): void {
worker.busy = false;
worker.uses++;
// Recycle if too many uses (prevent memory leaks in vm)
if (worker.uses >= MAX_USES) {
const idx = pool.indexOf(worker);
if (idx >= 0) pool.splice(idx, 1);
try { worker.child.kill("SIGTERM"); } catch { /* process already dead */ }
try { pool.push(spawnWorker()); } catch { /* spawn failure handled by pool size checks */ }
return;
}
// Service waiting queue
if (waitQueue.length > 0) {
const next = waitQueue.shift()!;
clearTimeout(next.timer);
worker.busy = true;
next.resolve(worker);
}
}
/**
* Execute code using the worker pool. Falls back to direct fork if pool is unavailable.
*/
export async function executeWithPool(req: ExecuteRequest): Promise<ExecuteResult> {
if (!initialized || pool.length === 0) {
// Pool not available — fall back to one-shot fork
return executeDirectFork(req);
}
let worker: PoolWorker;
try {
worker = await acquireWorker();
} catch {
return executeDirectFork(req);
}
return new Promise<ExecuteResult>((resolve) => {
let resolved = false;
const finish = (result: ExecuteResult) => {
if (resolved) return;
resolved = true;
clearTimeout(killTimer);
releaseWorker(worker);
resolve(result);
};
const killTimer = setTimeout(() => {
finish({ success: false, error: `Code execution timed out after ${req.timeoutMs}ms` });
// Kill and replace the stuck worker
const idx = pool.indexOf(worker);
if (idx >= 0) pool.splice(idx, 1);
try { worker.child.kill("SIGKILL"); } catch { /* process already dead */ }
try { pool.push(spawnWorker()); } catch { /* spawn failure handled by pool size checks */ }
}, req.timeoutMs + 3000);
// Listen for result (one-shot)
const onMessage = (msg: any) => {
if (msg.type === "result") {
worker.child.removeListener("message", onMessage);
finish(msg.success
? { success: true, output: msg.output, logs: msg.logs }
: { success: false, error: msg.error, logs: msg.logs }
);
}
};
worker.child.on("message", onMessage);
// Send code
worker.child.send({
type: "execute",
code: req.code,
context: req.context,
timeoutMs: req.timeoutMs,
});
});
}
/** One-shot fork fallback (same as original) */
function executeDirectFork(req: ExecuteRequest): Promise<ExecuteResult> {
return new Promise<ExecuteResult>((resolve) => {
let resolved = false;
let child: ChildProcess;
const finish = (result: ExecuteResult) => {
if (resolved) return;
resolved = true;
try { child?.kill("SIGKILL"); } catch { /* process already dead */ }
resolve(result);
};
try {
child = fork(WORKER_PATH, [], { timeout: req.timeoutMs + 2000, stdio: "pipe", serialization: "json", env: WORKER_ENV });
} catch {
return resolve({ success: false, error: "Failed to fork code worker" });
}
const killTimer = setTimeout(() => {
finish({ success: false, error: `Code execution timed out after ${req.timeoutMs}ms` });
}, req.timeoutMs + 3000);
child.on("message", (msg: any) => {
if (msg.type === "result") {
clearTimeout(killTimer);
finish(msg.success ? { success: true, output: msg.output } : { success: false, error: msg.error });
}
});
child.on("error", (err) => { clearTimeout(killTimer); finish({ success: false, error: `Worker error: ${err.message}` }); });
child.on("exit", (code) => { clearTimeout(killTimer); if (!resolved) finish({ success: false, error: `Worker crashed (exit ${code})` }); });
child.send({ type: "execute", code: req.code, context: req.context, timeoutMs: req.timeoutMs });
});
}
/** Gracefully shutdown all workers */
export function shutdownPool(): void {
for (const w of pool) {
try { w.child.kill("SIGTERM"); } catch { /* process already dead */ }
}
pool.length = 0;
initialized = false;
}
/** Pool stats for monitoring */
export function getPoolStats(): { total: number; busy: number; idle: number; queue: number } {
return {
total: pool.length,
busy: pool.filter(w => w.busy).length,
idle: pool.filter(w => !w.busy && w.ready).length,
queue: waitQueue.length,
};
}