/**
* Worker thread for isolated RxJS code execution
* This runs in a separate thread to prevent main process pollution
*/
import { parentPort, workerData } from 'worker_threads';
import { WorkerInput, WorkerResult } from '../types.js';
import { createExecutionContext, Observable, EMPTY } from '../data/rxjs-context.js';
import { take, tap, catchError, finalize } from 'rxjs';
/**
* Execute user-provided RxJS code in an isolated context
*/
async function executeCode(input: WorkerInput): Promise<WorkerResult> {
const result: WorkerResult = {
values: [],
errors: [],
completed: false,
hasError: false,
timeline: [],
executionTime: 0,
memoryUsage: {
before: 0,
after: 0,
peak: 0,
},
};
const startTime = Date.now();
const startMemory = process.memoryUsage().heapUsed;
result.memoryUsage.before = startMemory;
try {
// Create safe execution context with RxJS imports
const context = createExecutionContext();
// Create function with context
const func = new Function(...Object.keys(context), `
"use strict";
${input.code}
`);
// Execute the function with context
const observable$ = func(...Object.values(context));
if (!(observable$ instanceof Observable)) {
throw new Error('Code must return an Observable');
}
// Execute the observable with limits
await new Promise<void>((resolve, reject) => {
const timeoutHandle = setTimeout(() => {
reject(new Error(`Stream execution timeout after ${input.timeoutMs}ms`));
}, input.timeoutMs);
let errorOccurred = false;
const subscription = observable$
.pipe(
take(input.takeCount),
tap(value => {
const time = Date.now() - startTime;
result.values.push(value);
result.timeline.push({ time, type: 'next', value });
}),
catchError(error => {
const time = Date.now() - startTime;
const errorMessage = error instanceof Error ? error.message : String(error);
result.errors.push(errorMessage);
result.timeline.push({ time, type: 'error', value: errorMessage });
result.hasError = true;
errorOccurred = true;
return EMPTY;
}),
finalize(() => {
clearTimeout(timeoutHandle);
if (!errorOccurred) {
result.completed = true;
}
const time = Date.now() - startTime;
result.timeline.push({ time, type: 'complete' });
})
)
.subscribe({
complete: () => resolve(),
error: (err) => {
clearTimeout(timeoutHandle);
reject(err);
}
});
// Clean up on timeout
setTimeout(() => {
subscription.unsubscribe();
}, input.timeoutMs);
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
result.errors.push(errorMessage);
result.hasError = true;
}
result.executionTime = Date.now() - startTime;
const endMemory = process.memoryUsage().heapUsed;
result.memoryUsage.after = endMemory;
result.memoryUsage.peak = Math.max(startMemory, endMemory);
return result;
}
// Main worker entry point
const input = workerData as WorkerInput;
executeCode(input)
.then(result => {
parentPort?.postMessage({ success: true, result });
})
.catch(error => {
parentPort?.postMessage({
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
});
});