Skip to main content
Glama
ooples

MCP Console Automation Server

WorkerPool.ts12.5 kB
/** * WorkerPool - Manages worker threads for parallel test execution */ import { Worker } from 'worker_threads'; import * as path from 'path'; import { EventEmitter } from 'events'; export interface WorkerTask { id: string; test: any; // TestDefinition timeout: number; } export interface WorkerResult { taskId: string; result: any; // TestResult error?: Error; } export interface WorkerInfo { id: number; worker: Worker; busy: boolean; currentTask?: WorkerTask; taskTimeout?: NodeJS.Timeout; tasksCompleted: number; startTime: number; lastHeartbeat: number; } export interface WorkerPoolConfig { maxWorkers: number; workerTimeout: number; heartbeatInterval: number; gracefulShutdownTimeout: number; } export class WorkerPool extends EventEmitter { private workers: Map<number, WorkerInfo> = new Map(); private taskQueue: WorkerTask[] = []; private nextWorkerId = 0; private shuttingDown = false; private config: WorkerPoolConfig; private heartbeatTimer?: NodeJS.Timeout; constructor(config: Partial<WorkerPoolConfig> = {}) { super(); this.config = { maxWorkers: config.maxWorkers || 4, workerTimeout: config.workerTimeout || 300000, // 5 minutes heartbeatInterval: config.heartbeatInterval || 5000, gracefulShutdownTimeout: config.gracefulShutdownTimeout || 30000, }; } /** * Initialize worker pool */ async initialize(): Promise<void> { if (this.workers.size > 0) { throw new Error('Worker pool already initialized'); } // Create initial workers for (let i = 0; i < this.config.maxWorkers; i++) { await this.createWorker(); } // Start heartbeat monitoring this.startHeartbeatMonitoring(); this.emit('initialized', { workerCount: this.workers.size }); } /** * Create a new worker */ private async createWorker(): Promise<WorkerInfo> { const workerId = this.nextWorkerId++; // Determine the correct worker path // Worker threads require compiled JavaScript, so we need to find the dist directory let workerPath: string; if (__filename.endsWith('.ts')) { // Running in TypeScript (e.g., Jest with ts-jest) // Need to point to the compiled JavaScript in dist/ const projectRoot = path.resolve(__dirname, '../..'); workerPath = path.join(projectRoot, 'dist', 'testing', 'test-worker.js'); } else { // Already running compiled JavaScript workerPath = path.join(__dirname, 'test-worker.js'); } const worker = new Worker(workerPath, { workerData: { workerId }, }); const workerInfo: WorkerInfo = { id: workerId, worker, busy: false, tasksCompleted: 0, startTime: Date.now(), lastHeartbeat: Date.now(), }; // Handle worker messages worker.on('message', (message) => { this.handleWorkerMessage(workerId, message); }); // Handle worker errors worker.on('error', (error) => { this.handleWorkerError(workerId, error); }); // Handle worker exit worker.on('exit', (code) => { this.handleWorkerExit(workerId, code); }); this.workers.set(workerId, workerInfo); this.emit('worker-created', { workerId }); return workerInfo; } /** * Execute a task on an available worker */ async executeTask(task: WorkerTask): Promise<WorkerResult> { if (this.shuttingDown) { throw new Error('Worker pool is shutting down'); } return new Promise((resolve, reject) => { // Add to queue this.taskQueue.push(task); // Set up one-time listener for this task const resultHandler = (result: WorkerResult) => { if (result.taskId === task.id) { this.removeListener('task-complete', resultHandler); this.removeListener('task-error', errorHandler); resolve(result); } }; const errorHandler = (error: { taskId: string; error: Error }) => { if (error.taskId === task.id) { this.removeListener('task-complete', resultHandler); this.removeListener('task-error', errorHandler); reject(error.error); } }; this.on('task-complete', resultHandler); this.on('task-error', errorHandler); // Try to assign task immediately this.assignTasks(); }); } /** * Assign queued tasks to available workers */ private assignTasks(): void { while (this.taskQueue.length > 0) { const availableWorker = this.findAvailableWorker(); if (!availableWorker) { break; // No available workers } const task = this.taskQueue.shift()!; this.assignTaskToWorker(availableWorker, task); } } /** * Find an available worker */ private findAvailableWorker(): WorkerInfo | undefined { for (const worker of this.workers.values()) { if (!worker.busy) { return worker; } } return undefined; } /** * Assign a task to a specific worker */ private assignTaskToWorker(workerInfo: WorkerInfo, task: WorkerTask): void { workerInfo.busy = true; workerInfo.currentTask = task; workerInfo.lastHeartbeat = Date.now(); // Send task to worker workerInfo.worker.postMessage({ type: 'execute', task, }); this.emit('task-assigned', { workerId: workerInfo.id, taskId: task.id }); // Set timeout for task - STORE THE HANDLE workerInfo.taskTimeout = setTimeout(() => { if (workerInfo.currentTask?.id === task.id) { this.handleTaskTimeout(workerInfo, task); } }, task.timeout || this.config.workerTimeout); } /** * Handle worker message */ private handleWorkerMessage(workerId: number, message: any): void { const workerInfo = this.workers.get(workerId); if (!workerInfo) return; workerInfo.lastHeartbeat = Date.now(); switch (message.type) { case 'heartbeat': // Worker is alive break; case 'result': this.handleTaskComplete(workerInfo, message.result); break; case 'error': this.handleTaskError(workerInfo, message.error); break; default: console.warn( `Unknown message type from worker ${workerId}:`, message.type ); } } /** * Handle task completion */ private handleTaskComplete(workerInfo: WorkerInfo, result: any): void { const task = workerInfo.currentTask; if (!task) return; // CLEAR THE TIMEOUT if (workerInfo.taskTimeout) { clearTimeout(workerInfo.taskTimeout); workerInfo.taskTimeout = undefined; } workerInfo.busy = false; workerInfo.currentTask = undefined; workerInfo.tasksCompleted++; const workerResult: WorkerResult = { taskId: task.id, result, }; this.emit('task-complete', workerResult); // Assign next task if available this.assignTasks(); } /** * Handle task error */ private handleTaskError(workerInfo: WorkerInfo, error: any): void { const task = workerInfo.currentTask; if (!task) return; // CLEAR THE TIMEOUT if (workerInfo.taskTimeout) { clearTimeout(workerInfo.taskTimeout); workerInfo.taskTimeout = undefined; } workerInfo.busy = false; workerInfo.currentTask = undefined; this.emit('task-error', { taskId: task.id, error: error instanceof Error ? error : new Error(String(error)), }); // Assign next task if available this.assignTasks(); } /** * Handle task timeout */ private handleTaskTimeout(workerInfo: WorkerInfo, task: WorkerTask): void { console.warn(`Task ${task.id} timed out on worker ${workerInfo.id}`); // Terminate the worker and create a new one this.terminateWorker(workerInfo.id, true); this.emit('task-error', { taskId: task.id, error: new Error(`Task timed out after ${task.timeout}ms`), }); // Create replacement worker this.createWorker().catch((err) => { console.error('Failed to create replacement worker:', err); }); } /** * Handle worker error */ private handleWorkerError(workerId: number, error: Error): void { console.error(`Worker ${workerId} error:`, error); const workerInfo = this.workers.get(workerId); if (workerInfo?.currentTask) { this.emit('task-error', { taskId: workerInfo.currentTask.id, error, }); } this.emit('worker-error', { workerId, error }); } /** * Handle worker exit */ private handleWorkerExit(workerId: number, code: number): void { console.log(`Worker ${workerId} exited with code ${code}`); const workerInfo = this.workers.get(workerId); if (workerInfo?.currentTask) { this.emit('task-error', { taskId: workerInfo.currentTask.id, error: new Error(`Worker exited with code ${code}`), }); } this.workers.delete(workerId); this.emit('worker-exit', { workerId, code }); // Create replacement worker if not shutting down if (!this.shuttingDown && this.workers.size < this.config.maxWorkers) { this.createWorker().catch((err) => { console.error('Failed to create replacement worker:', err); }); } } /** * Start heartbeat monitoring */ private startHeartbeatMonitoring(): void { this.heartbeatTimer = setInterval(() => { const now = Date.now(); for (const [workerId, workerInfo] of this.workers.entries()) { const timeSinceHeartbeat = now - workerInfo.lastHeartbeat; if (timeSinceHeartbeat > this.config.heartbeatInterval * 3) { console.warn( `Worker ${workerId} missed heartbeat (${timeSinceHeartbeat}ms)` ); this.handleWorkerError( workerId, new Error('Worker heartbeat timeout') ); this.terminateWorker(workerId, true); } } }, this.config.heartbeatInterval); } /** * Stop heartbeat monitoring */ private stopHeartbeatMonitoring(): void { if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = undefined; } } /** * Terminate a specific worker */ private async terminateWorker( workerId: number, force: boolean = false ): Promise<void> { const workerInfo = this.workers.get(workerId); if (!workerInfo) return; try { if (force) { await workerInfo.worker.terminate(); } else { workerInfo.worker.postMessage({ type: 'shutdown' }); // Wait a bit for graceful shutdown await new Promise((resolve) => setTimeout(resolve, 1000)); await workerInfo.worker.terminate(); } } catch (error) { console.error(`Error terminating worker ${workerId}:`, error); } this.workers.delete(workerId); } /** * Get pool statistics */ getStatistics() { const workers = Array.from(this.workers.values()); return { totalWorkers: workers.length, busyWorkers: workers.filter((w) => w.busy).length, idleWorkers: workers.filter((w) => !w.busy).length, queuedTasks: this.taskQueue.length, totalTasksCompleted: workers.reduce( (sum, w) => sum + w.tasksCompleted, 0 ), averageTasksPerWorker: workers.length > 0 ? workers.reduce((sum, w) => sum + w.tasksCompleted, 0) / workers.length : 0, }; } /** * Gracefully shutdown the worker pool */ async shutdown(): Promise<void> { if (this.shuttingDown) { return; } this.shuttingDown = true; this.stopHeartbeatMonitoring(); // Wait for active tasks to complete or timeout const shutdownStart = Date.now(); while (this.workers.size > 0) { const busyWorkers = Array.from(this.workers.values()).filter( (w) => w.busy ); if (busyWorkers.length === 0) { break; // All workers idle } const elapsed = Date.now() - shutdownStart; if (elapsed > this.config.gracefulShutdownTimeout) { console.warn('Graceful shutdown timeout, forcing termination'); break; } await new Promise((resolve) => setTimeout(resolve, 100)); } // Terminate all remaining workers const terminationPromises = Array.from(this.workers.keys()).map( (workerId) => this.terminateWorker(workerId, true) ); await Promise.all(terminationPromises); this.emit('shutdown'); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ooples/mcp-console-automation'

If you have feedback or need assistance with the MCP directory API, please join our Discord server