process-manager.ts•3.29 kB
import { spawn } from "node:child_process";
import { ErrorCode, VoicepeakError } from "./errors.js";
import { CONFIG } from "./types.js";
interface ProcessOptions {
timeout?: number;
stdin?: string;
}
export class ProcessManager {
private activeProcesses = new Set<ReturnType<typeof spawn>>();
private readonly maxConcurrent: number;
constructor(maxConcurrent = CONFIG.PROCESS.MAX_CONCURRENT) {
this.maxConcurrent = maxConcurrent;
// Register cleanup handlers
process.on("SIGINT", () => this.cleanup());
process.on("SIGTERM", () => this.cleanup());
process.on("exit", () => this.cleanup());
}
async spawn(
command: string,
args: string[],
options: ProcessOptions = {},
): Promise<string> {
if (this.activeProcesses.size >= this.maxConcurrent) {
throw new VoicepeakError(
`Too many concurrent processes (max ${this.maxConcurrent})`,
ErrorCode.TOO_MANY_PROCESSES,
);
}
const timeout = options.timeout || CONFIG.PROCESS.TIMEOUT_MS;
const proc = spawn(command, args, {
stdio: ["pipe", "pipe", "pipe"],
});
this.activeProcesses.add(proc);
// Set up timeout
const timeoutId = setTimeout(() => {
proc.kill("SIGTERM");
// Force kill after grace period
setTimeout(() => {
if (proc.killed === false) {
proc.kill("SIGKILL");
}
}, 5000);
}, timeout);
try {
return await new Promise<string>((resolve, reject) => {
let stdout = "";
let stderr = "";
let processExited = false;
// Handle stdout
proc.stdout?.on("data", (data) => {
stdout += data.toString();
});
// Handle stderr
proc.stderr?.on("data", (data) => {
stderr += data.toString();
});
// Handle process exit
proc.on("exit", (code, signal) => {
processExited = true;
clearTimeout(timeoutId);
if (signal === "SIGTERM" || signal === "SIGKILL") {
reject(
new VoicepeakError(
`Process timeout after ${timeout}ms`,
ErrorCode.PROCESS_TIMEOUT,
),
);
} else if (code !== 0) {
// Filter out debug messages from stderr
const cleanStderr = stderr
.split("\n")
.filter((line) => !line.includes("[debug]"))
.join("\n");
reject(
new VoicepeakError(
`Process exited with code ${code}: ${cleanStderr}`,
ErrorCode.PROCESS_FAILED,
{ code, stderr: cleanStderr },
),
);
} else {
resolve(stdout);
}
});
// Handle process error
proc.on("error", (error) => {
if (!processExited) {
clearTimeout(timeoutId);
reject(
new VoicepeakError(
`Failed to spawn process: ${error.message}`,
ErrorCode.PROCESS_FAILED,
error,
),
);
}
});
// Write stdin if provided
if (options.stdin) {
proc.stdin?.write(options.stdin);
proc.stdin?.end();
}
});
} finally {
this.activeProcesses.delete(proc);
}
}
async cleanup(): Promise<void> {
for (const proc of this.activeProcesses) {
try {
proc.kill("SIGTERM");
} catch {
// Ignore errors during cleanup
}
}
this.activeProcesses.clear();
}
get activeCount(): number {
return this.activeProcesses.size;
}
}
// Singleton instance
export const processManager = new ProcessManager();