generator.js•3.75 kB
import {Transform, getDefaultHighWaterMark} from 'node:stream';
import {isAsyncGenerator} from '../stdio/type.js';
import {getSplitLinesGenerator, getAppendNewlineGenerator} from './split.js';
import {getValidateTransformInput, getValidateTransformReturn} from './validate.js';
import {getEncodingTransformGenerator} from './encoding-transform.js';
import {
	pushChunks,
	transformChunk,
	finalChunks,
	destroyTransform,
} from './run-async.js';
import {
	pushChunksSync,
	transformChunkSync,
	finalChunksSync,
	runTransformSync,
} from './run-sync.js';
/*
Generators can be used to transform/filter standard streams.
Generators have a simple syntax, yet allows all of the following:
- Sharing `state` between chunks
- Flushing logic, by using a `final` function
- Asynchronous logic
- Emitting multiple chunks from a single source chunk, even if spaced in time, by using multiple `yield`
- Filtering, by using no `yield`
Therefore, there is no need to allow Node.js or web transform streams.
The `highWaterMark` is kept as the default value, since this is what `subprocess.std*` uses.
Chunks are currently processed serially. We could add a `concurrency` option to parallelize in the future.
Transform an array of generator functions into a `Transform` stream.
`Duplex.from(generator)` cannot be used because it does not allow setting the `objectMode` and `highWaterMark`.
*/
export const generatorToStream = ({
	value,
	value: {transform, final, writableObjectMode, readableObjectMode},
	optionName,
}, {encoding}) => {
	const state = {};
	const generators = addInternalGenerators(value, encoding, optionName);
	const transformAsync = isAsyncGenerator(transform);
	const finalAsync = isAsyncGenerator(final);
	const transformMethod = transformAsync
		? pushChunks.bind(undefined, transformChunk, state)
		: pushChunksSync.bind(undefined, transformChunkSync);
	const finalMethod = transformAsync || finalAsync
		? pushChunks.bind(undefined, finalChunks, state)
		: pushChunksSync.bind(undefined, finalChunksSync);
	const destroyMethod = transformAsync || finalAsync
		? destroyTransform.bind(undefined, state)
		: undefined;
	const stream = new Transform({
		writableObjectMode,
		writableHighWaterMark: getDefaultHighWaterMark(writableObjectMode),
		readableObjectMode,
		readableHighWaterMark: getDefaultHighWaterMark(readableObjectMode),
		transform(chunk, encoding, done) {
			transformMethod([chunk, generators, 0], this, done);
		},
		flush(done) {
			finalMethod([generators], this, done);
		},
		destroy: destroyMethod,
	});
	return {stream};
};
// Applies transform generators in sync mode
export const runGeneratorsSync = (chunks, stdioItems, encoding, isInput) => {
	const generators = stdioItems.filter(({type}) => type === 'generator');
	const reversedGenerators = isInput ? generators.reverse() : generators;
	for (const {value, optionName} of reversedGenerators) {
		const generators = addInternalGenerators(value, encoding, optionName);
		chunks = runTransformSync(generators, chunks);
	}
	return chunks;
};
// Generators used internally to convert the chunk type, validate it, and split into lines
const addInternalGenerators = (
	{transform, final, binary, writableObjectMode, readableObjectMode, preserveNewlines},
	encoding,
	optionName,
) => {
	const state = {};
	return [
		{transform: getValidateTransformInput(writableObjectMode, optionName)},
		getEncodingTransformGenerator(binary, encoding, writableObjectMode),
		getSplitLinesGenerator(binary, preserveNewlines, writableObjectMode, state),
		{transform, final},
		{transform: getValidateTransformReturn(readableObjectMode, optionName)},
		getAppendNewlineGenerator({
			binary,
			preserveNewlines,
			readableObjectMode,
			state,
		}),
	].filter(Boolean);
};