import { createLogger } from './logger.js';
import { DatabaseError } from './errors.js';
const logger = createLogger({ component: 'BatchProcessor' });
export interface BatchConfig {
maxBatchSize: number;
flushIntervalMs: number;
maxRetries: number;
concurrency: number;
}
export interface BatchItem<T> {
id: string;
data: T;
retries: number;
}
export interface BatchResult<T, R> {
successful: Array<{ item: BatchItem<T>; result: R }>;
failed: Array<{ item: BatchItem<T>; error: Error }>;
}
/**
* Generic batch processor for database operations
*/
export class BatchProcessor<T, R> {
private queue: BatchItem<T>[] = [];
private processing = false;
private flushTimer?: NodeJS.Timeout;
private config: BatchConfig;
private processFn: (items: T[]) => Promise<R[]>;
constructor(
processFn: (items: T[]) => Promise<R[]>,
config: Partial<BatchConfig> = {}
) {
this.processFn = processFn;
this.config = {
maxBatchSize: config.maxBatchSize || 100,
flushIntervalMs: config.flushIntervalMs || 1000,
maxRetries: config.maxRetries || 3,
concurrency: config.concurrency || 5
};
logger.info('Batch processor initialized', this.config);
}
/**
* Add item to batch queue
*/
async add(data: T): Promise<R> {
return new Promise((resolve, reject) => {
const item: BatchItem<T> = {
id: this.generateId(),
data,
retries: 0
};
// Store resolve/reject callbacks
this.callbacks.set(item.id, { resolve, reject });
// Add to queue
this.queue.push(item);
logger.debug('Item added to batch', {
id: item.id,
queueSize: this.queue.length
});
// Check if we should flush
if (this.queue.length >= this.config.maxBatchSize) {
this.flush();
} else {
this.scheduleFlush();
}
});
}
/**
* Add multiple items to batch queue
*/
async addMany(items: T[]): Promise<R[]> {
const promises = items.map(item => this.add(item));
return Promise.all(promises);
}
/**
* Process all pending items immediately
*/
async flush(): Promise<void> {
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = undefined;
}
if (this.processing || this.queue.length === 0) {
return;
}
this.processing = true;
try {
// Process in chunks based on concurrency
const chunks = this.createChunks(this.queue, this.config.maxBatchSize);
for (const chunk of chunks) {
await this.processChunk(chunk);
}
} finally {
this.processing = false;
}
}
/**
* Process a chunk of items
*/
private async processChunk(chunk: BatchItem<T>[]): Promise<void> {
logger.debug('Processing batch chunk', { size: chunk.length });
try {
const data = chunk.map(item => item.data);
const results = await this.processFn(data);
// Match results to items
chunk.forEach((item, index) => {
const callback = this.callbacks.get(item.id);
if (callback && results[index] !== undefined) {
callback.resolve(results[index]);
this.callbacks.delete(item.id);
}
});
// Remove processed items from queue
this.queue = this.queue.filter(
item => !chunk.some(c => c.id === item.id)
);
} catch (error) {
logger.error('Batch processing failed', error as Error);
// Handle retries
for (const item of chunk) {
item.retries++;
if (item.retries >= this.config.maxRetries) {
const callback = this.callbacks.get(item.id);
if (callback) {
callback.reject(new DatabaseError(
'Batch operation failed after max retries',
{ retries: item.retries, error: (error as Error).message }
));
this.callbacks.delete(item.id);
}
// Remove from queue
this.queue = this.queue.filter(q => q.id !== item.id);
}
}
}
}
/**
* Schedule a flush operation
*/
private scheduleFlush(): void {
if (this.flushTimer) {
return;
}
this.flushTimer = setTimeout(() => {
this.flushTimer = undefined;
this.flush();
}, this.config.flushIntervalMs);
}
/**
* Create chunks from array
*/
private createChunks<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
/**
* Generate unique ID
*/
private generateId(): string {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
/**
* Callbacks storage
*/
private callbacks = new Map<string, {
resolve: (value: R) => void;
reject: (error: Error) => void;
}>();
/**
* Get queue statistics
*/
getStats(): {
queueSize: number;
processing: boolean;
pendingCallbacks: number;
} {
return {
queueSize: this.queue.length,
processing: this.processing,
pendingCallbacks: this.callbacks.size
};
}
/**
* Clear the queue
*/
clear(): void {
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = undefined;
}
// Reject all pending callbacks
this.callbacks.forEach(callback => {
callback.reject(new Error('Batch processor cleared'));
});
this.queue = [];
this.callbacks.clear();
this.processing = false;
logger.info('Batch processor cleared');
}
}
/**
* Create batch processors for common database operations
*/
export const batchProcessors = {
/**
* Batch insert processor
*/
createInsertProcessor<T>(
tableName: string,
insertFn: (items: T[]) => Promise<any[]>
): BatchProcessor<T, any> {
return new BatchProcessor(
async (items) => {
logger.debug(`Batch inserting ${items.length} items into ${tableName}`);
return insertFn(items);
},
{
maxBatchSize: 100,
flushIntervalMs: 500,
maxRetries: 3
}
);
},
/**
* Batch update processor
*/
createUpdateProcessor<T>(
tableName: string,
updateFn: (items: T[]) => Promise<any[]>
): BatchProcessor<T, any> {
return new BatchProcessor(
async (items) => {
logger.debug(`Batch updating ${items.length} items in ${tableName}`);
return updateFn(items);
},
{
maxBatchSize: 50,
flushIntervalMs: 1000,
maxRetries: 3
}
);
}
};