bulk_aggregate_multi_files
Aggregate data from the same column across multiple Excel or CSV files in parallel using operations like sum, average, count, min, or max. Process multiple files simultaneously to consolidate results or view per-file breakdowns.
Instructions
Aggregate same column across multiple files in parallel
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| filePaths | Yes | Array of file paths to process | |
| column | Yes | Column name or index (0-based) to aggregate | |
| operation | Yes | Aggregation operation | |
| consolidate | No | Whether to return consolidated result or per-file breakdown (default: true) | |
| sheet | No | Sheet name for Excel files (optional) | |
| filters | No | Optional filters to apply before aggregation |
Implementation Reference
- src/bulk/bulk-operations.ts:53-122 (handler)Core handler function implementing bulk aggregation across multiple files (CSV/Excel). Processes files in parallel batches, applies optional filters, performs sum/average/count/min/max operations per file, optionally consolidates results.async aggregateMultiFiles(args: BulkAggregateArgs): Promise<BulkResult> { const startTime = Date.now(); const errors: string[] = []; const fileResults: BulkResult['fileResults'] = []; // Process files in batches to avoid overwhelming the system const batches = this.createBatches(args.filePaths, this.maxConcurrency); for (const batch of batches) { const batchPromises = batch.map(async (filePath) => { try { const data = await this.readFileContent(filePath, args.sheet); if (data.length <= 1) { throw new Error('No data rows found'); } // Apply filters if specified let filteredData = data; if (args.filters && args.filters.length > 0) { filteredData = this.applyFilters(data, args.filters); } const result = await this.performAggregation(filteredData, args.column, args.operation); return { filePath, result: result.value, rowsProcessed: result.rowsProcessed, error: undefined }; } catch (error) { const errorMsg = `${filePath}: ${error instanceof Error ? error.message : 'Unknown error'}`; errors.push(errorMsg); return { filePath, result: 0, rowsProcessed: 0, error: errorMsg }; } }); const batchResults = await Promise.all(batchPromises); fileResults.push(...batchResults); } // Calculate consolidated result if requested let consolidatedResult: number | undefined; const validResults = fileResults.filter(r => !r.error); if (args.consolidate && validResults.length > 0) { consolidatedResult = this.consolidateResults(validResults, args.operation); } const totalRowsProcessed = fileResults.reduce((sum, r) => sum + r.rowsProcessed, 0); const processingTimeMs = Date.now() - startTime; return { operation: args.operation, column: args.column, consolidatedResult, fileResults: args.consolidate ? undefined : fileResults, totalFilesProcessed: validResults.length, totalRowsProcessed, processingTimeMs, errors }; }
- src/bulk/bulk-operations.ts:11-18 (schema)Input schema defining parameters for bulk aggregation: list of file paths, target column, operation type, optional consolidation, sheet name, and filters.export interface BulkAggregateArgs { filePaths: string[]; column: string; operation: 'sum' | 'average' | 'count' | 'min' | 'max'; consolidate?: boolean; sheet?: string; filters?: BulkFilter[]; }
- src/bulk/bulk-operations.ts:26-40 (schema)Output schema for bulk aggregation results, including per-file results or consolidated value, processing stats, and errors.export interface BulkResult { operation: string; column: string; consolidatedResult?: number; fileResults?: Array<{ filePath: string; result: number; rowsProcessed: number; error?: string; }>; totalFilesProcessed: number; totalRowsProcessed: number; processingTimeMs: number; errors: string[]; }
- src/bulk/bulk-operations.ts:20-24 (schema)Schema for individual filters used in bulk operations.export interface BulkFilter { column: string; condition: 'equals' | 'contains' | 'greater_than' | 'less_than' | 'not_equals'; value: string | number; }
- src/bulk/bulk-operations.ts:50-122 (helper)Main class containing the bulk operations engine with parallel processing, file reading for CSV/Excel, aggregation logic, filtering, and batching utilities.export class BulkOperations { private maxConcurrency = 10; // Limit concurrent file operations async aggregateMultiFiles(args: BulkAggregateArgs): Promise<BulkResult> { const startTime = Date.now(); const errors: string[] = []; const fileResults: BulkResult['fileResults'] = []; // Process files in batches to avoid overwhelming the system const batches = this.createBatches(args.filePaths, this.maxConcurrency); for (const batch of batches) { const batchPromises = batch.map(async (filePath) => { try { const data = await this.readFileContent(filePath, args.sheet); if (data.length <= 1) { throw new Error('No data rows found'); } // Apply filters if specified let filteredData = data; if (args.filters && args.filters.length > 0) { filteredData = this.applyFilters(data, args.filters); } const result = await this.performAggregation(filteredData, args.column, args.operation); return { filePath, result: result.value, rowsProcessed: result.rowsProcessed, error: undefined }; } catch (error) { const errorMsg = `${filePath}: ${error instanceof Error ? error.message : 'Unknown error'}`; errors.push(errorMsg); return { filePath, result: 0, rowsProcessed: 0, error: errorMsg }; } }); const batchResults = await Promise.all(batchPromises); fileResults.push(...batchResults); } // Calculate consolidated result if requested let consolidatedResult: number | undefined; const validResults = fileResults.filter(r => !r.error); if (args.consolidate && validResults.length > 0) { consolidatedResult = this.consolidateResults(validResults, args.operation); } const totalRowsProcessed = fileResults.reduce((sum, r) => sum + r.rowsProcessed, 0); const processingTimeMs = Date.now() - startTime; return { operation: args.operation, column: args.column, consolidatedResult, fileResults: args.consolidate ? undefined : fileResults, totalFilesProcessed: validResults.length, totalRowsProcessed, processingTimeMs, errors }; }