/**
* Background Processor for LearnMCP
* Handles async content extraction and processing with queue management
*/
import { createLearnLogger } from './utils/custom-logger.js';
export class BackgroundProcessor {
constructor(options = {}) {
this.logger = createLearnLogger('BackgroundProcessor');
// Configuration
this.maxQueueSize = options.maxQueueSize || 100;
this.processingInterval = options.processingInterval || 5000; // 5 seconds
this.workerTimeout = options.workerTimeout || 300000; // 5 minutes
this.maxConcurrent = options.maxConcurrent || 3;
this.retryAttempts = options.retryAttempts || 3;
this.retryDelay = options.retryDelay || 5000; // 5 seconds
// State
this.taskQueue = [];
this.processingTasks = new Map();
this.isRunning = false;
this.processingInterval = null;
// Metrics
this.metrics = {
tasksProcessed: 0,
tasksQueued: 0,
tasksFailed: 0,
averageProcessingTime: 0,
queueOverflows: 0,
timeouts: 0,
retries: 0,
};
}
/**
* Start the background processor
*/
start() {
if (this.isRunning) {
this.logger.warn('Background processor already running');
return;
}
this.isRunning = true;
this.processingInterval = setInterval(() => {
this.processQueue();
}, this.processingInterval);
this.logger.info('Background processor started', {
maxQueueSize: this.maxQueueSize,
maxConcurrent: this.maxConcurrent,
processingInterval: this.processingInterval,
});
}
/**
* Stop the background processor
*/
async stop() {
if (!this.isRunning) {
return;
}
this.isRunning = false;
if (this.processingInterval) {
clearInterval(this.processingInterval);
this.processingInterval = null;
}
// Wait for current tasks to complete
await this.waitForCompletion();
this.logger.info('Background processor stopped');
}
/**
* Add task to processing queue
*/
async addTask(task) {
if (this.taskQueue.length >= this.maxQueueSize) {
this.metrics.queueOverflows++;
throw new Error(`Queue is full (${this.maxQueueSize} tasks)`);
}
const queuedTask = {
id: task.id || `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
type: task.type,
data: task.data,
processor: task.processor,
priority: task.priority || 0,
retryCount: 0,
maxRetries: task.maxRetries || this.retryAttempts,
timeout: task.timeout || this.workerTimeout,
addedAt: new Date().toISOString(),
status: 'queued',
};
// Insert task based on priority (higher priority first)
const insertIndex = this.taskQueue.findIndex(t => t.priority < queuedTask.priority);
if (insertIndex === -1) {
this.taskQueue.push(queuedTask);
} else {
this.taskQueue.splice(insertIndex, 0, queuedTask);
}
this.metrics.tasksQueued++;
this.logger.debug('Task added to queue', {
taskId: queuedTask.id,
type: queuedTask.type,
priority: queuedTask.priority,
queueSize: this.taskQueue.length,
});
return queuedTask.id;
}
/**
* Process tasks from queue
*/
async processQueue() {
if (!this.isRunning) {
return;
}
// Check if we can process more tasks
const availableSlots = this.maxConcurrent - this.processingTasks.size;
if (availableSlots <= 0 || this.taskQueue.length === 0) {
return;
}
// Process up to available slots
const tasksToProcess = this.taskQueue.splice(0, availableSlots);
for (const task of tasksToProcess) {
this.processTask(task);
}
}
/**
* Process individual task
*/
async processTask(task) {
const startTime = Date.now();
task.status = 'processing';
task.startedAt = new Date().toISOString();
this.processingTasks.set(task.id, task);
this.logger.debug('Processing task', {
taskId: task.id,
type: task.type,
attempt: task.retryCount + 1,
});
try {
// Set up timeout
const timeoutPromise = new Promise((_resolve, reject) => {
setTimeout(() => reject(new Error('Task timeout')), task.timeout);
});
// Process task with timeout
const result = await Promise.race([task.processor(task.data), timeoutPromise]);
// Task completed successfully
task.status = 'completed';
task.completedAt = new Date().toISOString();
task.result = result;
const processingTime = Date.now() - startTime;
this.updateMetrics(processingTime, 'success');
this.logger.info('Task completed', {
taskId: task.id,
type: task.type,
processingTime: `${processingTime}ms`,
});
} catch (error) {
const processingTime = Date.now() - startTime;
if (error.message === 'Task timeout') {
this.metrics.timeouts++;
}
// Check if we should retry
if (task.retryCount < task.maxRetries) {
task.retryCount++;
task.status = 'retrying';
task.lastError = error.message;
this.metrics.retries++;
this.logger.warn('Task failed, retrying', {
taskId: task.id,
type: task.type,
attempt: task.retryCount,
maxRetries: task.maxRetries,
error: error.message,
});
// Add back to queue with delay
setTimeout(() => {
if (this.isRunning) {
this.taskQueue.unshift(task); // Add to front for retry
}
}, this.retryDelay);
} else {
// Task failed permanently
task.status = 'failed';
task.failedAt = new Date().toISOString();
task.error = error.message;
this.updateMetrics(processingTime, 'failure');
this.logger.error('Task failed permanently', {
taskId: task.id,
type: task.type,
attempts: task.retryCount + 1,
error: error.message,
});
}
} finally {
// Remove from processing tasks if not retrying
if (task.status !== 'retrying') {
this.processingTasks.delete(task.id);
}
}
}
/**
* Update processing metrics
*/
updateMetrics(processingTime, outcome) {
if (outcome === 'success') {
this.metrics.tasksProcessed++;
// Update average processing time
const totalTime =
this.metrics.averageProcessingTime * (this.metrics.tasksProcessed - 1) + processingTime;
this.metrics.averageProcessingTime = Math.round(totalTime / this.metrics.tasksProcessed);
} else if (outcome === 'failure') {
this.metrics.tasksFailed++;
}
}
/**
* Get current status
*/
getStatus() {
return {
isRunning: this.isRunning,
queueSize: this.taskQueue.length,
processingCount: this.processingTasks.size,
metrics: { ...this.metrics },
configuration: {
maxQueueSize: this.maxQueueSize,
maxConcurrent: this.maxConcurrent,
processingInterval: this.processingInterval,
workerTimeout: this.workerTimeout,
},
};
}
/**
* Wait for all tasks to complete
*/
async waitForCompletion(timeout = 30000) {
const startTime = Date.now();
while (this.processingTasks.size > 0 && Date.now() - startTime < timeout) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
if (this.processingTasks.size > 0) {
this.logger.warn('Timeout waiting for task completion', {
remainingTasks: this.processingTasks.size,
});
}
}
/**
* Clear queue and reset metrics
*/
reset() {
this.taskQueue = [];
this.processingTasks.clear();
this.metrics = {
tasksProcessed: 0,
tasksQueued: 0,
tasksFailed: 0,
averageProcessingTime: 0,
queueOverflows: 0,
timeouts: 0,
retries: 0,
};
this.logger.info('Background processor reset');
}
}