html-markdown-pool.ts•6.78 kB
import { existsSync } from 'fs';
import { cpus } from 'os';
import { fileURLToPath } from 'url';
import { Worker } from 'worker_threads';
import { server_defaults } from '../default.js';
interface PendingTask {
resolve: (markdown: string) => void;
reject: (error: Error) => void;
timeoutHandle: NodeJS.Timeout;
}
export class HtmlMarkdownPool {
private workers: Worker[] = [];
private availableWorkers: Worker[] = [];
private pendingTasks = new Map<string, PendingTask>();
private taskQueue: Array<{ html: string; taskId: string; task: PendingTask }> = [];
private nextTaskId = 0;
private isShuttingDown = false;
private readonly poolSize = Math.min(server_defaults.HTML_MARKDOWN_POOL.MAX_WORKERS, cpus().length);
private readonly taskTimeout = server_defaults.HTML_MARKDOWN_POOL.TASK_TIMEOUT;
private readonly maxQueueSize = server_defaults.HTML_MARKDOWN_POOL.MAX_QUEUE_SIZE;
constructor() {
this.initializeWorkers();
}
private initializeWorkers(): void {
for (let i = 0; i < this.poolSize; i++) {
const worker = this.createWorker();
this.workers.push(worker);
this.availableWorkers.push(worker);
}
}
private createWorker(): Worker {
// For local dev and running tests, try compiled JavaScript in dist folder first, checking for .ts files here is pointless
const distUrl = new URL('../dist/utils/html-markdown-worker.js', import.meta.url);
const distPath = fileURLToPath(distUrl);
if (existsSync(distPath)) {
const worker = new Worker(distUrl);
this.attachWorkerEvents(worker);
return worker;
}
// Try JavaScript file (for production)
const jsUrl = new URL('./html-markdown-worker.js', import.meta.url);
const jsPath = fileURLToPath(jsUrl);
if (existsSync(jsPath)) {
const worker = new Worker(jsUrl);
this.attachWorkerEvents(worker);
return worker;
}
throw new Error('Worker file not found: No compiled .js file found in dist/utils/ or utils/. Run "npm run build" first.');
}
private attachWorkerEvents(worker: Worker): void {
worker.on('message', (result: { taskId: string; success: boolean; markdown?: string; error?: string }) => {
const task = this.pendingTasks.get(result.taskId);
if (!task) return;
clearTimeout(task.timeoutHandle);
this.pendingTasks.delete(result.taskId);
if (result.success) task.resolve(result.markdown ?? '');
else task.reject(new Error(result.error || 'Worker error'));
this.availableWorkers.push(worker);
this.processQueue();
});
worker.on('error', () => {
this.handleWorkerFailure(worker);
});
worker.on('exit', (code) => {
if (code !== 0 && !this.isShuttingDown) this.handleWorkerFailure(worker);
});
}
private handleWorkerFailure(failedWorker: Worker): void {
console.error('Worker failed', failedWorker);
const availableIndex = this.availableWorkers.indexOf(failedWorker);
if (availableIndex > -1) this.availableWorkers.splice(availableIndex, 1);
const workerIndex = this.workers.indexOf(failedWorker);
if (workerIndex > -1) this.workers.splice(workerIndex, 1);
if (!this.isShuttingDown && this.workers.length < this.poolSize) {
const newWorker = this.createWorker();
this.workers.push(newWorker);
this.availableWorkers.push(newWorker);
this.processQueue();
}
}
async convert(html: string): Promise<string> {
if (this.isShuttingDown) throw new Error('Worker pool is shutting down');
return new Promise((resolve, reject) => {
const taskId = `task-${this.nextTaskId++}`;
const timeoutHandle = setTimeout(() => {
this.pendingTasks.delete(taskId);
reject(new Error('HTML conversion timeout'));
}, this.taskTimeout);
const task: PendingTask = { resolve, reject, timeoutHandle };
const worker = this.availableWorkers.pop();
if (worker) {
this.pendingTasks.set(taskId, task);
worker.postMessage({ html, taskId });
} else {
if (this.taskQueue.length >= this.maxQueueSize) {
clearTimeout(timeoutHandle);
reject(new Error(`Queue full (${this.maxQueueSize} tasks waiting)`));
return;
}
this.taskQueue.push({ html, taskId, task });
}
});
}
private processQueue(): void {
if (this.taskQueue.length === 0 || this.availableWorkers.length === 0) return;
const worker = this.availableWorkers.pop();
const queued = this.taskQueue.shift();
if (worker && queued) {
this.pendingTasks.set(queued.taskId, queued.task);
worker.postMessage({ html: queued.html, taskId: queued.taskId });
}
}
async shutdown(): Promise<void> {
this.isShuttingDown = true;
for (const { task } of this.taskQueue) {
clearTimeout(task.timeoutHandle);
task.reject(new Error('Worker pool shutting down'));
}
this.taskQueue = [];
for (const [, task] of this.pendingTasks) {
clearTimeout(task.timeoutHandle);
task.reject(new Error('Worker pool shutting down'));
}
this.pendingTasks.clear();
await Promise.all(this.workers.map(w => w.terminate()));
this.workers = [];
this.availableWorkers = [];
}
getStats() {
return {
poolSize: this.poolSize,
totalWorkers: this.workers.length,
availableWorkers: this.availableWorkers.length,
pendingTasks: this.pendingTasks.size,
queuedTasks: this.taskQueue.length
};
}
}
let sharedPool: HtmlMarkdownPool | null = null;
export function getSharedHtmlMarkdownPool(): HtmlMarkdownPool {
if (!sharedPool) {
sharedPool = new HtmlMarkdownPool();
}
return sharedPool;
}
export async function shutdownSharedHtmlMarkdownPool(): Promise<void> {
if (sharedPool) {
await sharedPool.shutdown();
sharedPool = null;
}
}
let isShuttingDown = false;
const shutdownHandler = async () => {
if (!isShuttingDown) {
isShuttingDown = true;
await shutdownSharedHtmlMarkdownPool();
}
};
process.once('SIGINT', shutdownHandler);
process.once('SIGTERM', shutdownHandler);
process.once('beforeExit', shutdownHandler);