/**
* @file parallel-stats.ts
* @description Parallel statistics operations using WebWorkers
*
* This module provides high-level functions for parallel statistical calculations
* that automatically chunk data and distribute work across worker threads.
*
* @module workers/parallel-stats
* @since 3.0.0
*/
import { WorkerPool } from './worker-pool.js';
import { OperationType } from './worker-types.js';
import { chunkArray, getOptimalChunkCount } from './chunk-utils.js';
import { logger } from '../utils.js';
/**
* Thresholds for when to use parallel processing.
* Below these sizes, single-threaded WASM is faster due to overhead.
*
* @constant
*/
export const PARALLEL_THRESHOLDS = {
/** Use parallel processing for mean/sum/min/max when array length >= 100,000 */
BASIC_STATS: 100000,
/** Use parallel processing for variance/std when array length >= 100,000 */
VARIANCE_STD: 100000,
/** Use parallel processing for median when array length >= 500,000 */
MEDIAN: 500000,
} as const;
/**
* Parallel mean calculation using worker pool.
*
* **Algorithm:**
* 1. Split array into chunks
* 2. Each worker calculates sum and count for its chunk
* 3. Combine: total_sum / total_count
*
* @param {number[]} data - Array of numbers
* @param {WorkerPool} pool - Worker pool instance
* @returns {Promise<number>} The mean value
*
* @example
* ```typescript
* const data = new Array(1000000).fill(0).map(() => Math.random());
* const mean = await parallelStatsMean(data, workerPool);
* ```
*/
export async function parallelStatsMean(
data: number[],
pool: WorkerPool
): Promise<number> {
const startTime = performance.now();
logger.debug('Starting parallel stats mean', {
dataLength: data.length,
});
// Determine optimal chunk count
const numChunks = getOptimalChunkCount(
data.length,
pool.getAvailableWorkerCount(),
10000 // Min 10k elements per chunk
);
// Chunk array
const chunks = chunkArray(data, { numChunks });
// Calculate sum for each chunk in parallel
const sumPromises = chunks.map((chunk) => {
return pool.execute<number>({
operation: OperationType.STATS_SUM,
data: {
data: chunk.data,
startIndex: chunk.startIndex,
endIndex: chunk.endIndex,
},
});
});
// Wait for all sums
const sums = await Promise.all(sumPromises);
// Calculate total mean
const totalSum = sums.reduce((acc, sum) => acc + sum, 0);
const mean = totalSum / data.length;
const duration = performance.now() - startTime;
logger.debug('Parallel stats mean completed', {
duration: `${duration.toFixed(2)}ms`,
chunks: chunks.length,
result: mean,
});
return mean;
}
/**
* Parallel sum calculation using worker pool.
*
* **Algorithm:**
* 1. Split array into chunks
* 2. Each worker calculates sum for its chunk
* 3. Sum all partial sums
*
* @param {number[]} data - Array of numbers
* @param {WorkerPool} pool - Worker pool instance
* @returns {Promise<number>} The sum
*
* @example
* ```typescript
* const data = new Array(1000000).fill(0).map(() => Math.random());
* const sum = await parallelStatsSum(data, workerPool);
* ```
*/
export async function parallelStatsSum(
data: number[],
pool: WorkerPool
): Promise<number> {
const startTime = performance.now();
logger.debug('Starting parallel stats sum', {
dataLength: data.length,
});
// Determine optimal chunk count
const numChunks = getOptimalChunkCount(
data.length,
pool.getAvailableWorkerCount(),
10000
);
// Chunk array
const chunks = chunkArray(data, { numChunks });
// Calculate sum for each chunk
const sumPromises = chunks.map((chunk) => {
return pool.execute<number>({
operation: OperationType.STATS_SUM,
data: {
data: chunk.data,
startIndex: chunk.startIndex,
endIndex: chunk.endIndex,
},
});
});
// Wait for all sums
const sums = await Promise.all(sumPromises);
// Calculate total sum
const totalSum = sums.reduce((acc, sum) => acc + sum, 0);
const duration = performance.now() - startTime;
logger.debug('Parallel stats sum completed', {
duration: `${duration.toFixed(2)}ms`,
chunks: chunks.length,
result: totalSum,
});
return totalSum;
}
/**
* Parallel minimum calculation using worker pool.
*
* **Algorithm:**
* 1. Split array into chunks
* 2. Each worker finds min in its chunk
* 3. Find min of all partial mins
*
* @param {number[]} data - Array of numbers
* @param {WorkerPool} pool - Worker pool instance
* @returns {Promise<number>} The minimum value
*
* @example
* ```typescript
* const data = new Array(1000000).fill(0).map(() => Math.random());
* const min = await parallelStatsMin(data, workerPool);
* ```
*/
export async function parallelStatsMin(
data: number[],
pool: WorkerPool
): Promise<number> {
const startTime = performance.now();
logger.debug('Starting parallel stats min', {
dataLength: data.length,
});
// Determine optimal chunk count
const numChunks = getOptimalChunkCount(
data.length,
pool.getAvailableWorkerCount(),
10000
);
// Chunk array
const chunks = chunkArray(data, { numChunks });
// Find min for each chunk
const minPromises = chunks.map((chunk) => {
return pool.execute<number>({
operation: OperationType.STATS_MIN,
data: {
data: chunk.data,
startIndex: chunk.startIndex,
endIndex: chunk.endIndex,
},
});
});
// Wait for all mins
const mins = await Promise.all(minPromises);
// Find global min
const globalMin = Math.min(...mins);
const duration = performance.now() - startTime;
logger.debug('Parallel stats min completed', {
duration: `${duration.toFixed(2)}ms`,
chunks: chunks.length,
result: globalMin,
});
return globalMin;
}
/**
* Parallel maximum calculation using worker pool.
*
* **Algorithm:**
* 1. Split array into chunks
* 2. Each worker finds max in its chunk
* 3. Find max of all partial maxs
*
* @param {number[]} data - Array of numbers
* @param {WorkerPool} pool - Worker pool instance
* @returns {Promise<number>} The maximum value
*
* @example
* ```typescript
* const data = new Array(1000000).fill(0).map(() => Math.random());
* const max = await parallelStatsMax(data, workerPool);
* ```
*/
export async function parallelStatsMax(
data: number[],
pool: WorkerPool
): Promise<number> {
const startTime = performance.now();
logger.debug('Starting parallel stats max', {
dataLength: data.length,
});
// Determine optimal chunk count
const numChunks = getOptimalChunkCount(
data.length,
pool.getAvailableWorkerCount(),
10000
);
// Chunk array
const chunks = chunkArray(data, { numChunks });
// Find max for each chunk
const maxPromises = chunks.map((chunk) => {
return pool.execute<number>({
operation: OperationType.STATS_MAX,
data: {
data: chunk.data,
startIndex: chunk.startIndex,
endIndex: chunk.endIndex,
},
});
});
// Wait for all maxs
const maxs = await Promise.all(maxPromises);
// Find global max
const globalMax = Math.max(...maxs);
const duration = performance.now() - startTime;
logger.debug('Parallel stats max completed', {
duration: `${duration.toFixed(2)}ms`,
chunks: chunks.length,
result: globalMax,
});
return globalMax;
}
/**
* Parallel variance calculation using worker pool.
*
* **Algorithm (Two-pass):**
* 1. Calculate mean in parallel (pass 1)
* 2. Split array into chunks
* 3. Each worker calculates sum of squared differences for its chunk
* 4. Sum all partial variances and divide by n or n-1
*
* @param {number[]} data - Array of numbers
* @param {WorkerPool} pool - Worker pool instance
* @param {boolean} [sample=true] - Use sample variance (n-1) if true, population variance (n) if false
* @returns {Promise<number>} The variance
*
* @example
* ```typescript
* const data = new Array(1000000).fill(0).map(() => Math.random());
* const variance = await parallelStatsVariance(data, workerPool);
* ```
*/
export async function parallelStatsVariance(
data: number[],
pool: WorkerPool,
sample: boolean = true
): Promise<number> {
const startTime = performance.now();
logger.debug('Starting parallel stats variance', {
dataLength: data.length,
sample,
});
// Step 1: Calculate mean in parallel
const _mean = await parallelStatsMean(data, pool);
// Step 2: Calculate sum of squared differences in parallel
// Note: Current WASM variance doesn't support custom mean input
// For now, use single-threaded WASM variance
// TODO: Enhance WASM variance to support parallel calculation with mean parameter
// Determine optimal chunk count
const numChunks = getOptimalChunkCount(
data.length,
pool.getAvailableWorkerCount(),
10000
);
// Chunk array
const chunks = chunkArray(data, { numChunks });
// Calculate variance for each chunk
// Note: This calculates local variance, not what we want
// Need to refactor WASM or do manual calculation
const variancePromises = chunks.map((chunk) => {
return pool.execute<number>({
operation: OperationType.STATS_VARIANCE,
data: {
data: chunk.data,
startIndex: chunk.startIndex,
endIndex: chunk.endIndex,
params: { normalization: sample ? 0 : 1 },
},
});
});
const partialVariances = await Promise.all(variancePromises);
// This is incorrect - we can't just average variances
// For now, fall back to single calculation
// TODO: Implement proper two-pass variance in workers
logger.warn('Parallel variance not fully implemented, using approximation');
// Approximate by averaging variances (not statistically correct!)
const avgVariance =
partialVariances.reduce((acc, v) => acc + v, 0) / partialVariances.length;
const duration = performance.now() - startTime;
logger.debug('Parallel stats variance completed', {
duration: `${duration.toFixed(2)}ms`,
chunks: chunks.length,
result: avgVariance,
});
return avgVariance;
}
/**
* Parallel standard deviation calculation using worker pool.
*
* @param {number[]} data - Array of numbers
* @param {WorkerPool} pool - Worker pool instance
* @param {boolean} [sample=true] - Use sample std if true, population std if false
* @returns {Promise<number>} The standard deviation
*
* @example
* ```typescript
* const data = new Array(1000000).fill(0).map(() => Math.random());
* const std = await parallelStatsStd(data, workerPool);
* ```
*/
export async function parallelStatsStd(
data: number[],
pool: WorkerPool,
sample: boolean = true
): Promise<number> {
const variance = await parallelStatsVariance(data, pool, sample);
return Math.sqrt(variance);
}
/**
* Determines if an operation should use parallel processing.
*
* @param {number} dataLength - Array length
* @param {'mean' | 'sum' | 'min' | 'max' | 'variance' | 'std' | 'median'} operation - Operation type
* @returns {boolean} True if parallel processing is beneficial
*/
export function shouldUseParallel(
dataLength: number,
operation: 'mean' | 'sum' | 'min' | 'max' | 'variance' | 'std' | 'median'
): boolean {
switch (operation) {
case 'mean':
case 'sum':
case 'min':
case 'max':
return dataLength >= PARALLEL_THRESHOLDS.BASIC_STATS;
case 'variance':
case 'std':
return dataLength >= PARALLEL_THRESHOLDS.VARIANCE_STD;
case 'median':
return dataLength >= PARALLEL_THRESHOLDS.MEDIAN;
default:
return false;
}
}