Skip to main content
Glama
ssv445

Lorem Ipsum MCP Server

by ssv445
sink.ts72 kB
import * as Arr from "../Array.js" import * as Cause from "../Cause.js" import type * as Channel from "../Channel.js" import * as Chunk from "../Chunk.js" import * as Clock from "../Clock.js" import type * as Context from "../Context.js" import * as Duration from "../Duration.js" import * as Effect from "../Effect.js" import * as Either from "../Either.js" import * as Exit from "../Exit.js" import { constTrue, dual, identity, pipe } from "../Function.js" import type { LazyArg } from "../Function.js" import * as HashMap from "../HashMap.js" import * as HashSet from "../HashSet.js" import type * as MergeDecision from "../MergeDecision.js" import * as Option from "../Option.js" import { pipeArguments } from "../Pipeable.js" import { hasProperty, type Predicate, type Refinement } from "../Predicate.js" import * as PubSub from "../PubSub.js" import * as Queue from "../Queue.js" import * as Ref from "../Ref.js" import * as Scope from "../Scope.js" import type * as Sink from "../Sink.js" import type * as Types from "../Types.js" import * as channel from "./channel.js" import * as mergeDecision from "./channel/mergeDecision.js" import * as core from "./core-stream.js" /** @internal */ export const SinkTypeId: Sink.SinkTypeId = Symbol.for("effect/Sink") as Sink.SinkTypeId const sinkVariance = { /* c8 ignore next */ _A: (_: never) => _, /* c8 ignore next */ _In: (_: unknown) => _, /* c8 ignore next */ _L: (_: never) => _, /* c8 ignore next */ _E: (_: never) => _, /* c8 ignore next */ _R: (_: never) => _ } /** @internal */ export class SinkImpl<out A, in In = unknown, out L = never, out E = never, out R = never> implements Sink.Sink<A, In, L, E, R> { readonly [SinkTypeId] = sinkVariance constructor( readonly channel: Channel.Channel<Chunk.Chunk<L>, Chunk.Chunk<In>, E, never, A, unknown, R> ) { } pipe() { return pipeArguments(this, arguments) } } /** @internal */ export const isSink = (u: unknown): u is Sink.Sink<unknown, unknown, unknown, unknown, unknown> => hasProperty(u, SinkTypeId) /** @internal */ export const suspend = <A, In, L, E, R>(evaluate: LazyArg<Sink.Sink<A, In, L, E, R>>): Sink.Sink<A, In, L, E, R> => new SinkImpl(core.suspend(() => toChannel(evaluate()))) /** @internal */ export const as = dual< <A2>(a: A2) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A2, In, L, E, R>, <A, In, L, E, R, A2>(self: Sink.Sink<A, In, L, E, R>, a: A2) => Sink.Sink<A2, In, L, E, R> >( 2, (self, a) => pipe(self, map(() => a)) ) /** @internal */ export const collectAll = <In>(): Sink.Sink<Chunk.Chunk<In>, In> => new SinkImpl(collectAllLoop(Chunk.empty())) /** @internal */ const collectAllLoop = <In>( acc: Chunk.Chunk<In> ): Channel.Channel<never, Chunk.Chunk<In>, never, never, Chunk.Chunk<In>, unknown> => core.readWithCause({ onInput: (chunk: Chunk.Chunk<In>) => collectAllLoop(pipe(acc, Chunk.appendAll(chunk))), onFailure: core.failCause, onDone: () => core.succeed(acc) }) /** @internal */ export const collectAllN = <In>(n: number): Sink.Sink<Chunk.Chunk<In>, In, In> => suspend(() => fromChannel(collectAllNLoop(n, Chunk.empty()))) /** @internal */ const collectAllNLoop = <In>( n: number, acc: Chunk.Chunk<In> ): Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, never, never, Chunk.Chunk<In>, unknown> => core.readWithCause({ onInput: (chunk: Chunk.Chunk<In>) => { const [collected, leftovers] = Chunk.splitAt(chunk, n) if (collected.length < n) { return collectAllNLoop(n - collected.length, Chunk.appendAll(acc, collected)) } if (Chunk.isEmpty(leftovers)) { return core.succeed(Chunk.appendAll(acc, collected)) } return core.flatMap(core.write(leftovers), () => core.succeed(Chunk.appendAll(acc, collected))) }, onFailure: core.failCause, onDone: () => core.succeed(acc) }) /** @internal */ export const collectAllFrom = <A, In, L extends In, E, R>( self: Sink.Sink<A, In, L, E, R> ): Sink.Sink<Chunk.Chunk<A>, In, L, E, R> => collectAllWhileWith(self, { initial: Chunk.empty<A>(), while: constTrue, body: (chunk, a) => pipe(chunk, Chunk.append(a)) }) /** @internal */ export const collectAllToMap = <In, K>( key: (input: In) => K, merge: (x: In, y: In) => In ): Sink.Sink<HashMap.HashMap<K, In>, In> => { return pipe( foldLeftChunks(HashMap.empty<K, In>(), (map, chunk) => pipe( chunk, Chunk.reduce(map, (map, input) => { const k: K = key(input) const v: In = pipe(map, HashMap.has(k)) ? merge(pipe(map, HashMap.unsafeGet(k)), input) : input return pipe(map, HashMap.set(k, v)) }) )) ) } /** @internal */ export const collectAllToMapN = <In, K>( n: number, key: (input: In) => K, merge: (x: In, y: In) => In ): Sink.Sink<HashMap.HashMap<K, In>, In, In> => { return foldWeighted<HashMap.HashMap<K, In>, In>({ initial: HashMap.empty(), maxCost: n, cost: (acc, input) => pipe(acc, HashMap.has(key(input))) ? 0 : 1, body: (acc, input) => { const k: K = key(input) const v: In = pipe(acc, HashMap.has(k)) ? merge(pipe(acc, HashMap.unsafeGet(k)), input) : input return pipe(acc, HashMap.set(k, v)) } }) } /** @internal */ export const collectAllToSet = <In>(): Sink.Sink<HashSet.HashSet<In>, In> => foldLeftChunks<HashSet.HashSet<In>, In>( HashSet.empty(), (acc, chunk) => pipe(chunk, Chunk.reduce(acc, (acc, input) => pipe(acc, HashSet.add(input)))) ) /** @internal */ export const collectAllToSetN = <In>(n: number): Sink.Sink<HashSet.HashSet<In>, In, In> => foldWeighted<HashSet.HashSet<In>, In>({ initial: HashSet.empty(), maxCost: n, cost: (acc, input) => HashSet.has(acc, input) ? 0 : 1, body: (acc, input) => HashSet.add(acc, input) }) /** @internal */ export const collectAllUntil = <In>(p: Predicate<In>): Sink.Sink<Chunk.Chunk<In>, In, In> => { return pipe( fold<[Chunk.Chunk<In>, boolean], In>( [Chunk.empty(), true], (tuple) => tuple[1], ([chunk, _], input) => [pipe(chunk, Chunk.append(input)), !p(input)] ), map((tuple) => tuple[0]) ) } /** @internal */ export const collectAllUntilEffect = <In, E, R>(p: (input: In) => Effect.Effect<boolean, E, R>) => { return pipe( foldEffect<[Chunk.Chunk<In>, boolean], In, E, R>( [Chunk.empty(), true], (tuple) => tuple[1], ([chunk, _], input) => pipe(p(input), Effect.map((bool) => [pipe(chunk, Chunk.append(input)), !bool])) ), map((tuple) => tuple[0]) ) } /** @internal */ export const collectAllWhile: { <In, Out extends In>(refinement: Refinement<In, Out>): Sink.Sink<Chunk.Chunk<Out>, In, In> <In>(predicate: Predicate<In>): Sink.Sink<Chunk.Chunk<In>, In, In> } = <In>(predicate: Predicate<In>): Sink.Sink<Chunk.Chunk<In>, In, In> => fromChannel(collectAllWhileReader(predicate, Chunk.empty())) /** @internal */ const collectAllWhileReader = <In>( predicate: Predicate<In>, done: Chunk.Chunk<In> ): Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, never, never, Chunk.Chunk<In>, unknown> => core.readWith({ onInput: (input: Chunk.Chunk<In>) => { const [collected, leftovers] = pipe(Chunk.toReadonlyArray(input), Arr.span(predicate)) if (leftovers.length === 0) { return collectAllWhileReader( predicate, pipe(done, Chunk.appendAll(Chunk.unsafeFromArray(collected))) ) } return pipe( core.write(Chunk.unsafeFromArray(leftovers)), channel.zipRight(core.succeed(pipe(done, Chunk.appendAll(Chunk.unsafeFromArray(collected))))) ) }, onFailure: core.fail, onDone: () => core.succeed(done) }) /** @internal */ export const collectAllWhileEffect = <In, E, R>( predicate: (input: In) => Effect.Effect<boolean, E, R> ): Sink.Sink<Chunk.Chunk<In>, In, In, E, R> => fromChannel(collectAllWhileEffectReader(predicate, Chunk.empty())) /** @internal */ const collectAllWhileEffectReader = <In, R, E>( predicate: (input: In) => Effect.Effect<boolean, E, R>, done: Chunk.Chunk<In> ): Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, E, never, Chunk.Chunk<In>, unknown, R> => core.readWith({ onInput: (input: Chunk.Chunk<In>) => pipe( core.fromEffect(pipe(input, Effect.takeWhile(predicate), Effect.map(Chunk.unsafeFromArray))), core.flatMap((collected) => { const leftovers = pipe(input, Chunk.drop(collected.length)) if (Chunk.isEmpty(leftovers)) { return collectAllWhileEffectReader(predicate, pipe(done, Chunk.appendAll(collected))) } return pipe(core.write(leftovers), channel.zipRight(core.succeed(pipe(done, Chunk.appendAll(collected))))) }) ), onFailure: core.fail, onDone: () => core.succeed(done) }) /** @internal */ export const collectAllWhileWith: { <A, S>( options: { readonly initial: S readonly while: Predicate<A> readonly body: (s: S, a: A) => S } ): <In, L extends In, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<S, In, L, E, R> <A, In, L extends In, E, R, S>( self: Sink.Sink<A, In, L, E, R>, options: { readonly initial: S readonly while: Predicate<A> readonly body: (s: S, a: A) => S } ): Sink.Sink<S, In, L, E, R> } = dual( 2, <A, In, L extends In, E, R, S>( self: Sink.Sink<A, In, L, E, R>, options: { readonly initial: S readonly while: Predicate<A> readonly body: (s: S, a: A) => S } ): Sink.Sink<S, In, L, E, R> => { const refs = pipe( Ref.make(Chunk.empty<In>()), Effect.zip(Ref.make(false)) ) const newChannel = pipe( core.fromEffect(refs), core.flatMap(([leftoversRef, upstreamDoneRef]) => { const upstreamMarker: Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, never, never, unknown, unknown> = core .readWith({ onInput: (input) => pipe(core.write(input), core.flatMap(() => upstreamMarker)), onFailure: core.fail, onDone: (done) => pipe(core.fromEffect(Ref.set(upstreamDoneRef, true)), channel.as(done)) }) return pipe( upstreamMarker, core.pipeTo(channel.bufferChunk(leftoversRef)), core.pipeTo( collectAllWhileWithLoop(self, leftoversRef, upstreamDoneRef, options.initial, options.while, options.body) ) ) }) ) return new SinkImpl(newChannel) } ) const collectAllWhileWithLoop = <Z, In, L extends In, E, R, S>( self: Sink.Sink<Z, In, L, E, R>, leftoversRef: Ref.Ref<Chunk.Chunk<In>>, upstreamDoneRef: Ref.Ref<boolean>, currentResult: S, p: Predicate<Z>, f: (s: S, z: Z) => S ): Channel.Channel<Chunk.Chunk<L>, Chunk.Chunk<In>, E, never, S, unknown, R> => { return pipe( toChannel(self), channel.doneCollect, channel.foldChannel({ onFailure: core.fail, onSuccess: ([leftovers, doneValue]) => p(doneValue) ? pipe( core.fromEffect( Ref.set(leftoversRef, Chunk.flatten(leftovers as Chunk.Chunk<Chunk.Chunk<In>>)) ), core.flatMap(() => pipe( core.fromEffect(Ref.get(upstreamDoneRef)), core.flatMap((upstreamDone) => { const accumulatedResult = f(currentResult, doneValue) return upstreamDone ? pipe(core.write(Chunk.flatten(leftovers)), channel.as(accumulatedResult)) : collectAllWhileWithLoop(self, leftoversRef, upstreamDoneRef, accumulatedResult, p, f) }) ) ) ) : pipe(core.write(Chunk.flatten(leftovers)), channel.as(currentResult)) }) ) } /** @internal */ export const collectLeftover = <A, In, L, E, R>( self: Sink.Sink<A, In, L, E, R> ): Sink.Sink<[A, Chunk.Chunk<L>], In, never, E, R> => new SinkImpl(pipe(core.collectElements(toChannel(self)), channel.map(([chunks, z]) => [z, Chunk.flatten(chunks)]))) /** @internal */ export const mapInput = dual< <In0, In>(f: (input: In0) => In) => <A, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In0, L, E, R>, <A, In, L, E, R, In0>(self: Sink.Sink<A, In, L, E, R>, f: (input: In0) => In) => Sink.Sink<A, In0, L, E, R> >( 2, <A, In, L, E, R, In0>(self: Sink.Sink<A, In, L, E, R>, f: (input: In0) => In): Sink.Sink<A, In0, L, E, R> => pipe(self, mapInputChunks(Chunk.map(f))) ) /** @internal */ export const mapInputEffect = dual< <In0, In, E2, R2>( f: (input: In0) => Effect.Effect<In, E2, R2> ) => <A, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In0, L, E2 | E, R2 | R>, <A, In, L, E, R, In0, E2, R2>( self: Sink.Sink<A, In, L, E, R>, f: (input: In0) => Effect.Effect<In, E2, R2> ) => Sink.Sink<A, In0, L, E2 | E, R2 | R> >( 2, <A, In, L, E, R, In0, E2, R2>( self: Sink.Sink<A, In, L, E, R>, f: (input: In0) => Effect.Effect<In, E2, R2> ): Sink.Sink<A, In0, L, E | E2, R | R2> => mapInputChunksEffect( self, (chunk) => Effect.map( Effect.forEach(chunk, (v) => f(v)), Chunk.unsafeFromArray ) ) ) /** @internal */ export const mapInputChunks = dual< <In0, In>( f: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In> ) => <A, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In0, L, E, R>, <A, In, L, E, R, In0>( self: Sink.Sink<A, In, L, E, R>, f: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In> ) => Sink.Sink<A, In0, L, E, R> >( 2, <A, In, L, E, R, In0>( self: Sink.Sink<A, In, L, E, R>, f: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In> ): Sink.Sink<A, In0, L, E, R> => { const loop: Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In0>, never, never, unknown, unknown, R> = core.readWith({ onInput: (chunk) => pipe(core.write(f(chunk)), core.flatMap(() => loop)), onFailure: core.fail, onDone: core.succeed }) return new SinkImpl(pipe(loop, core.pipeTo(toChannel(self)))) } ) /** @internal */ export const mapInputChunksEffect = dual< <In0, In, E2, R2>( f: (chunk: Chunk.Chunk<In0>) => Effect.Effect<Chunk.Chunk<In>, E2, R2> ) => <A, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In0, L, E2 | E, R2 | R>, <A, In, L, E, R, In0, E2, R2>( self: Sink.Sink<A, In, L, E, R>, f: (chunk: Chunk.Chunk<In0>) => Effect.Effect<Chunk.Chunk<In>, E2, R2> ) => Sink.Sink<A, In0, L, E2 | E, R2 | R> >( 2, <A, In, L, E, R, In0, E2, R2>( self: Sink.Sink<A, In, L, E, R>, f: (chunk: Chunk.Chunk<In0>) => Effect.Effect<Chunk.Chunk<In>, E2, R2> ): Sink.Sink<A, In0, L, E | E2, R | R2> => { const loop: Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In0>, E2, never, unknown, unknown, R | R2> = core .readWith({ onInput: (chunk) => pipe(core.fromEffect(f(chunk)), core.flatMap(core.write), core.flatMap(() => loop)), onFailure: core.fail, onDone: core.succeed }) return new SinkImpl(pipe(loop, channel.pipeToOrFail(toChannel(self)))) } ) /** @internal */ export const die = (defect: unknown): Sink.Sink<never, unknown> => failCause(Cause.die(defect)) /** @internal */ export const dieMessage = (message: string): Sink.Sink<never, unknown> => failCause(Cause.die(new Cause.RuntimeException(message))) /** @internal */ export const dieSync = (evaluate: LazyArg<unknown>): Sink.Sink<never, unknown> => failCauseSync(() => Cause.die(evaluate())) /** @internal */ export const dimap = dual< <In0, In, A, A2>( options: { readonly onInput: (input: In0) => In readonly onDone: (a: A) => A2 } ) => <L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A2, In0, L, E, R>, <A, In, L, E, R, In0, A2>( self: Sink.Sink<A, In, L, E, R>, options: { readonly onInput: (input: In0) => In readonly onDone: (a: A) => A2 } ) => Sink.Sink<A2, In0, L, E, R> >( 2, <A, In, L, E, R, In0, A2>( self: Sink.Sink<A, In, L, E, R>, options: { readonly onInput: (input: In0) => In readonly onDone: (a: A) => A2 } ): Sink.Sink<A2, In0, L, E, R> => map(mapInput(self, options.onInput), options.onDone) ) /** @internal */ export const dimapEffect = dual< <In0, In, E2, R2, A, A2, E3, R3>( options: { readonly onInput: (input: In0) => Effect.Effect<In, E2, R2> readonly onDone: (a: A) => Effect.Effect<A2, E3, R3> } ) => <L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A2, In0, L, E2 | E3 | E, R2 | R3 | R>, <A, In, L, E, R, In0, E2, R2, A2, E3, R3>( self: Sink.Sink<A, In, L, E, R>, options: { readonly onInput: (input: In0) => Effect.Effect<In, E2, R2> readonly onDone: (a: A) => Effect.Effect<A2, E3, R3> } ) => Sink.Sink<A2, In0, L, E2 | E3 | E, R2 | R3 | R> >( 2, (self, options) => mapEffect( mapInputEffect(self, options.onInput), options.onDone ) ) /** @internal */ export const dimapChunks = dual< <In0, In, A, A2>( options: { readonly onInput: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In> readonly onDone: (a: A) => A2 } ) => <L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A2, In0, L, E, R>, <A, In, L, E, R, In0, A2>( self: Sink.Sink<A, In, L, E, R>, options: { readonly onInput: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In> readonly onDone: (a: A) => A2 } ) => Sink.Sink<A2, In0, L, E, R> >( 2, (self, options) => map( mapInputChunks(self, options.onInput), options.onDone ) ) /** @internal */ export const dimapChunksEffect = dual< <In0, In, E2, R2, A, A2, E3, R3>( options: { readonly onInput: (chunk: Chunk.Chunk<In0>) => Effect.Effect<Chunk.Chunk<In>, E2, R2> readonly onDone: (a: A) => Effect.Effect<A2, E3, R3> } ) => <L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A2, In0, L, E2 | E3 | E, R2 | R3 | R>, <A, In, L, E, R, In0, E2, R2, A2, E3, R3>( self: Sink.Sink<A, In, L, E, R>, options: { readonly onInput: (chunk: Chunk.Chunk<In0>) => Effect.Effect<Chunk.Chunk<In>, E2, R2> readonly onDone: (a: A) => Effect.Effect<A2, E3, R3> } ) => Sink.Sink<A2, In0, L, E2 | E3 | E, R2 | R3 | R> >( 2, (self, options) => mapEffect(mapInputChunksEffect(self, options.onInput), options.onDone) ) /** @internal */ export const drain: Sink.Sink<void, unknown> = new SinkImpl( channel.drain(channel.identityChannel()) ) /** @internal */ export const drop = <In>(n: number): Sink.Sink<unknown, In, In> => suspend(() => new SinkImpl(dropLoop(n))) /** @internal */ const dropLoop = <In>( n: number ): Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, never, never, unknown, unknown> => core.readWith({ onInput: (input: Chunk.Chunk<In>) => { const dropped = pipe(input, Chunk.drop(n)) const leftover = Math.max(n - input.length, 0) const more = Chunk.isEmpty(input) || leftover > 0 if (more) { return dropLoop(leftover) } return pipe( core.write(dropped), channel.zipRight(channel.identityChannel<Chunk.Chunk<In>, never, unknown>()) ) }, onFailure: core.fail, onDone: () => core.void }) /** @internal */ export const dropUntil = <In>(predicate: Predicate<In>): Sink.Sink<unknown, In, In> => new SinkImpl( pipe(toChannel(dropWhile((input: In) => !predicate(input))), channel.pipeToOrFail(toChannel(drop<In>(1)))) ) /** @internal */ export const dropUntilEffect = <In, E, R>( predicate: (input: In) => Effect.Effect<boolean, E, R> ): Sink.Sink<unknown, In, In, E, R> => suspend(() => new SinkImpl(dropUntilEffectReader(predicate))) /** @internal */ const dropUntilEffectReader = <In, R, E>( predicate: (input: In) => Effect.Effect<boolean, E, R> ): Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, E, E, unknown, unknown, R> => core.readWith({ onInput: (input: Chunk.Chunk<In>) => pipe( input, Effect.dropUntil(predicate), Effect.map((leftover) => { const more = leftover.length === 0 return more ? dropUntilEffectReader(predicate) : pipe( core.write(Chunk.unsafeFromArray(leftover)), channel.zipRight(channel.identityChannel<Chunk.Chunk<In>, E, unknown>()) ) }), channel.unwrap ), onFailure: core.fail, onDone: () => core.void }) /** @internal */ export const dropWhile = <In>(predicate: Predicate<In>): Sink.Sink<unknown, In, In> => new SinkImpl(dropWhileReader(predicate)) /** @internal */ const dropWhileReader = <In>( predicate: Predicate<In> ): Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, never, never, unknown, unknown> => core.readWith({ onInput: (input: Chunk.Chunk<In>) => { const out = pipe(input, Chunk.dropWhile(predicate)) if (Chunk.isEmpty(out)) { return dropWhileReader(predicate) } return pipe(core.write(out), channel.zipRight(channel.identityChannel<Chunk.Chunk<In>, never, unknown>())) }, onFailure: core.fail, onDone: core.succeedNow }) /** @internal */ export const dropWhileEffect = <In, E, R>( predicate: (input: In) => Effect.Effect<boolean, E, R> ): Sink.Sink<unknown, In, In, E, R> => suspend(() => new SinkImpl(dropWhileEffectReader(predicate))) /** @internal */ const dropWhileEffectReader = <In, R, E>( predicate: (input: In) => Effect.Effect<boolean, E, R> ): Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, E, E, unknown, unknown, R> => core.readWith({ onInput: (input: Chunk.Chunk<In>) => pipe( input, Effect.dropWhile(predicate), Effect.map((leftover) => { const more = leftover.length === 0 return more ? dropWhileEffectReader(predicate) : pipe( core.write(Chunk.unsafeFromArray(leftover)), channel.zipRight(channel.identityChannel<Chunk.Chunk<In>, E, unknown>()) ) }), channel.unwrap ), onFailure: core.fail, onDone: () => core.void }) /** @internal */ export const ensuring = dual< <X, R2>( finalizer: Effect.Effect<X, never, R2> ) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In, L, E, R2 | R>, <A, In, L, E, R, X, R2>( self: Sink.Sink<A, In, L, E, R>, finalizer: Effect.Effect<X, never, R2> ) => Sink.Sink<A, In, L, E, R2 | R> >( 2, (self, finalizer) => new SinkImpl(pipe(self, toChannel, channel.ensuring(finalizer))) ) /** @internal */ export const ensuringWith = dual< <A, E, X, R2>( finalizer: (exit: Exit.Exit<A, E>) => Effect.Effect<X, never, R2> ) => <In, L, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In, L, E, R2 | R>, <A, In, L, E, R, X, R2>( self: Sink.Sink<A, In, L, E, R>, finalizer: (exit: Exit.Exit<A, E>) => Effect.Effect<X, never, R2> ) => Sink.Sink<A, In, L, E, R2 | R> >( 2, (self, finalizer) => new SinkImpl(pipe(self, toChannel, core.ensuringWith(finalizer))) ) /** @internal */ export const context = <R>(): Sink.Sink<Context.Context<R>, unknown, never, never, R> => fromEffect(Effect.context<R>()) /** @internal */ export const contextWith = <R, Z>( f: (context: Context.Context<R>) => Z ): Sink.Sink<Z, unknown, never, never, R> => pipe(context<R>(), map(f)) /** @internal */ export const contextWithEffect = <R0, A, E, R>( f: (context: Context.Context<R0>) => Effect.Effect<A, E, R> ): Sink.Sink<A, unknown, never, E, R0 | R> => pipe(context<R0>(), mapEffect(f)) /** @internal */ export const contextWithSink = <R0, A, In, L, E, R>( f: (context: Context.Context<R0>) => Sink.Sink<A, In, L, E, R> ): Sink.Sink<A, In, L, E, R0 | R> => new SinkImpl(channel.unwrap(pipe(Effect.contextWith((context) => toChannel(f(context)))))) /** @internal */ export const every = <In>(predicate: Predicate<In>): Sink.Sink<boolean, In, In> => fold(true, identity, (acc, input) => acc && predicate(input)) /** @internal */ export const fail = <E>(e: E): Sink.Sink<never, unknown, never, E> => new SinkImpl(core.fail(e)) /** @internal */ export const failSync = <E>(evaluate: LazyArg<E>): Sink.Sink<never, unknown, never, E> => new SinkImpl(core.failSync(evaluate)) /** @internal */ export const failCause = <E>(cause: Cause.Cause<E>): Sink.Sink<never, unknown, never, E> => new SinkImpl(core.failCause(cause)) /** @internal */ export const failCauseSync = <E>(evaluate: LazyArg<Cause.Cause<E>>): Sink.Sink<never, unknown, never, E> => new SinkImpl(core.failCauseSync(evaluate)) /** @internal */ export const filterInput: { <In, In1 extends In, In2 extends In1>( f: Refinement<In1, In2> ): <A, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In2, L, E, R> <In, In1 extends In>(f: Predicate<In1>): <A, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In1, L, E, R> } = <In, In1 extends In>(f: Predicate<In1>) => { return <A, L, E, R>(self: Sink.Sink<A, In, L, E, R>): Sink.Sink<A, In1, L, E, R> => pipe(self, mapInputChunks(Chunk.filter(f))) } /** @internal */ export const filterInputEffect = dual< <In, In1 extends In, E2, R2>( f: (input: In1) => Effect.Effect<boolean, E2, R2> ) => <A, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In1, L, E2 | E, R2 | R>, <A, In, L, E, R, In1 extends In, E2, R2>( self: Sink.Sink<A, In, L, E, R>, f: (input: In1) => Effect.Effect<boolean, E2, R2> ) => Sink.Sink<A, In1, L, E2 | E, R2 | R> >( 2, (self, f) => mapInputChunksEffect( self, (chunk) => Effect.map(Effect.filter(chunk, f), Chunk.unsafeFromArray) ) ) /** @internal */ export const findEffect = dual< <A, E2, R2>( f: (a: A) => Effect.Effect<boolean, E2, R2> ) => <In, L extends In, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<Option.Option<A>, In, L, E2 | E, R2 | R>, <A, In, L extends In, E, R, E2, R2>( self: Sink.Sink<A, In, L, E, R>, f: (a: A) => Effect.Effect<boolean, E2, R2> ) => Sink.Sink<Option.Option<A>, In, L, E2 | E, R2 | R> >( 2, <A, In, L extends In, E, R, E2, R2>( self: Sink.Sink<A, In, L, E, R>, f: (a: A) => Effect.Effect<boolean, E2, R2> ): Sink.Sink<Option.Option<A>, In, L, E2 | E, R2 | R> => { const newChannel = pipe( core.fromEffect(pipe( Ref.make(Chunk.empty<In>()), Effect.zip(Ref.make(false)) )), core.flatMap(([leftoversRef, upstreamDoneRef]) => { const upstreamMarker: Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, never, never, unknown, unknown> = core .readWith({ onInput: (input) => pipe(core.write(input), core.flatMap(() => upstreamMarker)), onFailure: core.fail, onDone: (done) => pipe(core.fromEffect(Ref.set(upstreamDoneRef, true)), channel.as(done)) }) const loop: Channel.Channel<Chunk.Chunk<L>, Chunk.Chunk<In>, E | E2, never, Option.Option<A>, unknown, R | R2> = channel.foldChannel(core.collectElements(toChannel(self)), { onFailure: core.fail, onSuccess: ([leftovers, doneValue]) => pipe( core.fromEffect(f(doneValue)), core.flatMap((satisfied) => pipe( core.fromEffect(Ref.set(leftoversRef, Chunk.flatten(leftovers))), channel.zipRight( pipe( core.fromEffect(Ref.get(upstreamDoneRef)), core.flatMap((upstreamDone) => { if (satisfied) { return pipe(core.write(Chunk.flatten(leftovers)), channel.as(Option.some(doneValue))) } if (upstreamDone) { return pipe(core.write(Chunk.flatten(leftovers)), channel.as(Option.none())) } return loop }) ) ) ) ) ) }) return pipe(upstreamMarker, core.pipeTo(channel.bufferChunk(leftoversRef)), core.pipeTo(loop)) }) ) return new SinkImpl(newChannel) } ) /** @internal */ export const fold = <S, In>( s: S, contFn: Predicate<S>, f: (s: S, input: In) => S ): Sink.Sink<S, In, In> => suspend(() => new SinkImpl(foldReader(s, contFn, f))) /** @internal */ const foldReader = <S, In>( s: S, contFn: Predicate<S>, f: (z: S, input: In) => S ): Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, never, never, S, unknown> => { if (!contFn(s)) { return core.succeedNow(s) } return core.readWith({ onInput: (input: Chunk.Chunk<In>) => { const [nextS, leftovers] = foldChunkSplit(s, input, contFn, f, 0, input.length) if (Chunk.isNonEmpty(leftovers)) { return pipe(core.write(leftovers), channel.as(nextS)) } return foldReader(nextS, contFn, f) }, onFailure: core.fail, onDone: () => core.succeedNow(s) }) } /** @internal */ const foldChunkSplit = <S, In>( s: S, chunk: Chunk.Chunk<In>, contFn: Predicate<S>, f: (z: S, input: In) => S, index: number, length: number ): [S, Chunk.Chunk<In>] => { if (index === length) { return [s, Chunk.empty()] } const s1 = f(s, pipe(chunk, Chunk.unsafeGet(index))) if (contFn(s1)) { return foldChunkSplit(s1, chunk, contFn, f, index + 1, length) } return [s1, pipe(chunk, Chunk.drop(index + 1))] } /** @internal */ export const foldSink = dual< <E, A1, In, In1 extends In, L1, E1, R1, A, A2, In2 extends In, L2, E2, R2>( options: { readonly onFailure: (err: E) => Sink.Sink<A1, In1, L1, E1, R1> readonly onSuccess: (a: A) => Sink.Sink<A2, In2, L2, E2, R2> } ) => <L, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A1 | A2, In1 & In2, L1 | L2, E1 | E2, R1 | R2 | R>, <A, In, L, E, R, A1, In1 extends In, L1, E1, R1, A2, In2 extends In, L2, E2, R2>( self: Sink.Sink<A, In, L, E, R>, options: { readonly onFailure: (err: E) => Sink.Sink<A1, In1, L1, E1, R1> readonly onSuccess: (a: A) => Sink.Sink<A2, In2, L2, E2, R2> } ) => Sink.Sink<A1 | A2, In1 & In2, L1 | L2, E1 | E2, R1 | R2 | R> >( 2, <A, In, L, E, R, A1, In1 extends In, L1, E1, R1, A2, In2 extends In, L2, E2, R2>( self: Sink.Sink<A, In, L, E, R>, options: { readonly onFailure: (err: E) => Sink.Sink<A1, In1, L1, E1, R1> readonly onSuccess: (z: A) => Sink.Sink<A2, In2, L2, E2, R2> } ): Sink.Sink<A1 | A2, In1 & In2, L1 | L2, E1 | E2, R | R1 | R2> => { const newChannel: Channel.Channel< Chunk.Chunk<L1 | L2>, Chunk.Chunk<In1 & In2>, E1 | E2, never, A1 | A2, unknown, R | R1 | R2 > = pipe( toChannel(self), core.collectElements, channel.foldChannel({ onFailure: (error) => toChannel(options.onFailure(error)), onSuccess: ([leftovers, z]) => core.suspend(() => { const leftoversRef = { ref: pipe(leftovers, Chunk.filter(Chunk.isNonEmpty)) as Chunk.Chunk<Chunk.Chunk<L1 | L2>> } const refReader = pipe( core.sync(() => { const ref = leftoversRef.ref leftoversRef.ref = Chunk.empty() return ref }), // This cast is safe because of the L1 >: L <: In1 bound. It follows that // L <: In1 and therefore Chunk[L] can be safely cast to Chunk[In1]. core.flatMap((chunk) => channel.writeChunk(chunk as Chunk.Chunk<Chunk.Chunk<In1 & In2>>)) ) const passthrough = channel.identityChannel<Chunk.Chunk<In1 & In2>, never, unknown>() const continuationSink = pipe( refReader, channel.zipRight(passthrough), core.pipeTo(toChannel(options.onSuccess(z))) ) return core.flatMap( core.collectElements(continuationSink), ([newLeftovers, z1]) => pipe( core.succeed(leftoversRef.ref), core.flatMap(channel.writeChunk), channel.zipRight(channel.writeChunk(newLeftovers)), channel.as(z1) ) ) }) }) ) return new SinkImpl(newChannel) } ) /** @internal */ export const foldChunks = <S, In>( s: S, contFn: Predicate<S>, f: (s: S, chunk: Chunk.Chunk<In>) => S ): Sink.Sink<S, In> => suspend(() => new SinkImpl(foldChunksReader(s, contFn, f))) /** @internal */ const foldChunksReader = <S, In>( s: S, contFn: Predicate<S>, f: (s: S, chunk: Chunk.Chunk<In>) => S ): Channel.Channel<never, Chunk.Chunk<In>, never, never, S, unknown> => { if (!contFn(s)) { return core.succeedNow(s) } return core.readWith({ onInput: (input: Chunk.Chunk<In>) => foldChunksReader(f(s, input), contFn, f), onFailure: core.fail, onDone: () => core.succeedNow(s) }) } /** @internal */ export const foldChunksEffect = <S, In, E, R>( s: S, contFn: Predicate<S>, f: (s: S, chunk: Chunk.Chunk<In>) => Effect.Effect<S, E, R> ): Sink.Sink<S, In, In, E, R> => suspend(() => new SinkImpl(foldChunksEffectReader(s, contFn, f))) /** @internal */ const foldChunksEffectReader = <S, R, E, In>( s: S, contFn: Predicate<S>, f: (s: S, chunk: Chunk.Chunk<In>) => Effect.Effect<S, E, R> ): Channel.Channel<never, Chunk.Chunk<In>, E, E, S, unknown, R> => { if (!contFn(s)) { return core.succeedNow(s) } return core.readWith({ onInput: (input: Chunk.Chunk<In>) => pipe( core.fromEffect(f(s, input)), core.flatMap((s) => foldChunksEffectReader(s, contFn, f)) ), onFailure: core.fail, onDone: () => core.succeedNow(s) }) } /** @internal */ export const foldEffect = <S, In, E, R>( s: S, contFn: Predicate<S>, f: (s: S, input: In) => Effect.Effect<S, E, R> ): Sink.Sink<S, In, In, E, R> => suspend(() => new SinkImpl(foldEffectReader(s, contFn, f))) /** @internal */ const foldEffectReader = <S, In, R, E>( s: S, contFn: Predicate<S>, f: (s: S, input: In) => Effect.Effect<S, E, R> ): Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, E, E, S, unknown, R> => { if (!contFn(s)) { return core.succeedNow(s) } return core.readWith({ onInput: (input: Chunk.Chunk<In>) => pipe( core.fromEffect(foldChunkSplitEffect(s, input, contFn, f)), core.flatMap(([nextS, leftovers]) => pipe( leftovers, Option.match({ onNone: () => foldEffectReader(nextS, contFn, f), onSome: (leftover) => pipe(core.write(leftover), channel.as(nextS)) }) ) ) ), onFailure: core.fail, onDone: () => core.succeedNow(s) }) } /** @internal */ const foldChunkSplitEffect = <S, R, E, In>( s: S, chunk: Chunk.Chunk<In>, contFn: Predicate<S>, f: (s: S, input: In) => Effect.Effect<S, E, R> ): Effect.Effect<[S, Option.Option<Chunk.Chunk<In>>], E, R> => foldChunkSplitEffectInternal(s, chunk, 0, chunk.length, contFn, f) /** @internal */ const foldChunkSplitEffectInternal = <S, R, E, In>( s: S, chunk: Chunk.Chunk<In>, index: number, length: number, contFn: Predicate<S>, f: (s: S, input: In) => Effect.Effect<S, E, R> ): Effect.Effect<[S, Option.Option<Chunk.Chunk<In>>], E, R> => { if (index === length) { return Effect.succeed([s, Option.none()]) } return pipe( f(s, pipe(chunk, Chunk.unsafeGet(index))), Effect.flatMap((s1) => contFn(s1) ? foldChunkSplitEffectInternal(s1, chunk, index + 1, length, contFn, f) : Effect.succeed([s1, Option.some(pipe(chunk, Chunk.drop(index + 1)))]) ) ) } /** @internal */ export const foldLeft = <S, In>(s: S, f: (s: S, input: In) => S): Sink.Sink<S, In> => ignoreLeftover(fold(s, constTrue, f)) /** @internal */ export const foldLeftChunks = <S, In>( s: S, f: (s: S, chunk: Chunk.Chunk<In>) => S ): Sink.Sink<S, In> => foldChunks(s, constTrue, f) /** @internal */ export const foldLeftChunksEffect = <S, In, E, R>( s: S, f: (s: S, chunk: Chunk.Chunk<In>) => Effect.Effect<S, E, R> ): Sink.Sink<S, In, never, E, R> => ignoreLeftover(foldChunksEffect(s, constTrue, f)) /** @internal */ export const foldLeftEffect = <S, In, E, R>( s: S, f: (s: S, input: In) => Effect.Effect<S, E, R> ): Sink.Sink<S, In, In, E, R> => foldEffect(s, constTrue, f) /** @internal */ export const foldUntil = <S, In>(s: S, max: number, f: (s: S, input: In) => S): Sink.Sink<S, In, In> => pipe( fold<[S, number], In>( [s, 0], (tuple) => tuple[1] < max, ([output, count], input) => [f(output, input), count + 1] ), map((tuple) => tuple[0]) ) /** @internal */ export const foldUntilEffect = <S, In, E, R>( s: S, max: number, f: (s: S, input: In) => Effect.Effect<S, E, R> ): Sink.Sink<S, In, In, E, R> => pipe( foldEffect( [s, 0 as number] as const, (tuple) => tuple[1] < max, ([output, count], input: In) => pipe(f(output, input), Effect.map((s) => [s, count + 1] as const)) ), map((tuple) => tuple[0]) ) /** @internal */ export const foldWeighted = <S, In>( options: { readonly initial: S readonly maxCost: number readonly cost: (s: S, input: In) => number readonly body: (s: S, input: In) => S } ): Sink.Sink<S, In, In> => foldWeightedDecompose({ ...options, decompose: Chunk.of }) /** @internal */ export const foldWeightedDecompose = <S, In>( options: { readonly initial: S readonly maxCost: number readonly cost: (s: S, input: In) => number readonly decompose: (input: In) => Chunk.Chunk<In> readonly body: (s: S, input: In) => S } ): Sink.Sink<S, In, In> => suspend(() => new SinkImpl( foldWeightedDecomposeLoop( options.initial, 0, false, options.maxCost, options.cost, options.decompose, options.body ) ) ) /** @internal */ const foldWeightedDecomposeLoop = <S, In>( s: S, cost: number, dirty: boolean, max: number, costFn: (s: S, input: In) => number, decompose: (input: In) => Chunk.Chunk<In>, f: (s: S, input: In) => S ): Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, never, never, S, unknown> => core.readWith({ onInput: (input: Chunk.Chunk<In>) => { const [nextS, nextCost, nextDirty, leftovers] = foldWeightedDecomposeFold( input, 0, s, cost, dirty, max, costFn, decompose, f ) if (Chunk.isNonEmpty(leftovers)) { return pipe(core.write(leftovers), channel.zipRight(core.succeedNow(nextS))) } if (cost > max) { return core.succeedNow(nextS) } return foldWeightedDecomposeLoop(nextS, nextCost, nextDirty, max, costFn, decompose, f) }, onFailure: core.fail, onDone: () => core.succeedNow(s) }) /** @internal */ const foldWeightedDecomposeFold = <In, S>( input: Chunk.Chunk<In>, index: number, s: S, cost: number, dirty: boolean, max: number, costFn: (s: S, input: In) => number, decompose: (input: In) => Chunk.Chunk<In>, f: (s: S, input: In) => S ): [S, number, boolean, Chunk.Chunk<In>] => { if (index === input.length) { return [s, cost, dirty, Chunk.empty<In>()] } const elem = pipe(input, Chunk.unsafeGet(index)) const total = cost + costFn(s, elem) if (total <= max) { return foldWeightedDecomposeFold(input, index + 1, f(s, elem), total, true, max, costFn, decompose, f) } const decomposed = decompose(elem) if (decomposed.length <= 1 && !dirty) { // If `elem` cannot be decomposed, we need to cross the `max` threshold. To // minimize "injury", we only allow this when we haven't added anything else // to the aggregate (dirty = false). return [f(s, elem), total, true, pipe(input, Chunk.drop(index + 1))] } if (decomposed.length <= 1 && dirty) { // If the state is dirty and `elem` cannot be decomposed, we stop folding // and include `elem` in the leftovers. return [s, cost, dirty, pipe(input, Chunk.drop(index))] } // `elem` got decomposed, so we will recurse with the decomposed elements pushed // into the chunk we're processing and see if we can aggregate further. const next = pipe(decomposed, Chunk.appendAll(pipe(input, Chunk.drop(index + 1)))) return foldWeightedDecomposeFold(next, 0, s, cost, dirty, max, costFn, decompose, f) } /** @internal */ export const foldWeightedDecomposeEffect = <S, In, E, R, E2, R2, E3, R3>( options: { readonly initial: S readonly maxCost: number readonly cost: (s: S, input: In) => Effect.Effect<number, E, R> readonly decompose: (input: In) => Effect.Effect<Chunk.Chunk<In>, E2, R2> readonly body: (s: S, input: In) => Effect.Effect<S, E3, R3> } ): Sink.Sink<S, In, In, E | E2 | E3, R | R2 | R3> => suspend(() => new SinkImpl( foldWeightedDecomposeEffectLoop( options.initial, options.maxCost, options.cost, options.decompose, options.body, 0, false ) ) ) /** @internal */ export const foldWeightedEffect = <S, In, E, R, E2, R2>( options: { readonly initial: S readonly maxCost: number readonly cost: (s: S, input: In) => Effect.Effect<number, E, R> readonly body: (s: S, input: In) => Effect.Effect<S, E2, R2> } ): Sink.Sink<S, In, In, E | E2, R | R2> => foldWeightedDecomposeEffect({ ...options, decompose: (input) => Effect.succeed(Chunk.of(input)) }) const foldWeightedDecomposeEffectLoop = <S, In, E, R, E2, R2, E3, R3>( s: S, max: number, costFn: (s: S, input: In) => Effect.Effect<number, E, R>, decompose: (input: In) => Effect.Effect<Chunk.Chunk<In>, E2, R2>, f: (s: S, input: In) => Effect.Effect<S, E3, R3>, cost: number, dirty: boolean ): Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, E | E2 | E3, E | E2 | E3, S, unknown, R | R2 | R3> => core.readWith({ onInput: (input: Chunk.Chunk<In>) => pipe( core.fromEffect(foldWeightedDecomposeEffectFold(s, max, costFn, decompose, f, input, dirty, cost, 0)), core.flatMap(([nextS, nextCost, nextDirty, leftovers]) => { if (Chunk.isNonEmpty(leftovers)) { return pipe(core.write(leftovers), channel.zipRight(core.succeedNow(nextS))) } if (cost > max) { return core.succeedNow(nextS) } return foldWeightedDecomposeEffectLoop(nextS, max, costFn, decompose, f, nextCost, nextDirty) }) ), onFailure: core.fail, onDone: () => core.succeedNow(s) }) /** @internal */ const foldWeightedDecomposeEffectFold = <S, In, E, R, E2, R2, E3, R3>( s: S, max: number, costFn: (s: S, input: In) => Effect.Effect<number, E, R>, decompose: (input: In) => Effect.Effect<Chunk.Chunk<In>, E2, R2>, f: (s: S, input: In) => Effect.Effect<S, E3, R3>, input: Chunk.Chunk<In>, dirty: boolean, cost: number, index: number ): Effect.Effect<[S, number, boolean, Chunk.Chunk<In>], E | E2 | E3, R | R2 | R3> => { if (index === input.length) { return Effect.succeed([s, cost, dirty, Chunk.empty<In>()]) } const elem = pipe(input, Chunk.unsafeGet(index)) return pipe( costFn(s, elem), Effect.map((newCost) => cost + newCost), Effect.flatMap((total) => { if (total <= max) { return pipe( f(s, elem), Effect.flatMap((s) => foldWeightedDecomposeEffectFold(s, max, costFn, decompose, f, input, true, total, index + 1) ) ) } return pipe( decompose(elem), Effect.flatMap((decomposed) => { if (decomposed.length <= 1 && !dirty) { // If `elem` cannot be decomposed, we need to cross the `max` threshold. To // minimize "injury", we only allow this when we haven't added anything else // to the aggregate (dirty = false). return pipe( f(s, elem), Effect.map((s) => [s, total, true, pipe(input, Chunk.drop(index + 1))]) ) } if (decomposed.length <= 1 && dirty) { // If the state is dirty and `elem` cannot be decomposed, we stop folding // and include `elem` in th leftovers. return Effect.succeed([s, cost, dirty, pipe(input, Chunk.drop(index))]) } // `elem` got decomposed, so we will recurse with the decomposed elements pushed // into the chunk we're processing and see if we can aggregate further. const next = pipe(decomposed, Chunk.appendAll(pipe(input, Chunk.drop(index + 1)))) return foldWeightedDecomposeEffectFold(s, max, costFn, decompose, f, next, dirty, cost, 0) }) ) }) ) } /** @internal */ export const flatMap = dual< <A, A1, In, In1 extends In, L1, E1, R1>( f: (a: A) => Sink.Sink<A1, In1, L1, E1, R1> ) => <L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A1, In & In1, L | L1, E1 | E, R1 | R>, <A, In, L, E, R, A1, In1 extends In, L1, E1, R1>( self: Sink.Sink<A, In, L, E, R>, f: (a: A) => Sink.Sink<A1, In1, L1, E1, R1> ) => Sink.Sink<A1, In & In1, L | L1, E1 | E, R1 | R> >( 2, (self, f) => foldSink(self, { onFailure: fail, onSuccess: f }) ) /** @internal */ export const forEach = <In, X, E, R>(f: (input: In) => Effect.Effect<X, E, R>): Sink.Sink<void, In, never, E, R> => { const process: Channel.Channel<never, Chunk.Chunk<In>, E, E, void, unknown, R> = core.readWithCause({ onInput: (input: Chunk.Chunk<In>) => pipe(core.fromEffect(Effect.forEach(input, (v) => f(v), { discard: true })), core.flatMap(() => process)), onFailure: core.failCause, onDone: () => core.void }) return new SinkImpl(process) } /** @internal */ export const forEachChunk = <In, X, E, R>( f: (input: Chunk.Chunk<In>) => Effect.Effect<X, E, R> ): Sink.Sink<void, In, never, E, R> => { const process: Channel.Channel<never, Chunk.Chunk<In>, E, E, void, unknown, R> = core.readWithCause({ onInput: (input: Chunk.Chunk<In>) => pipe(core.fromEffect(f(input)), core.flatMap(() => process)), onFailure: core.failCause, onDone: () => core.void }) return new SinkImpl(process) } /** @internal */ export const forEachWhile = <In, E, R>( f: (input: In) => Effect.Effect<boolean, E, R> ): Sink.Sink<void, In, In, E, R> => { const process: Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, E, E, void, unknown, R> = core.readWithCause({ onInput: (input: Chunk.Chunk<In>) => forEachWhileReader(f, input, 0, input.length, process), onFailure: core.failCause, onDone: () => core.void }) return new SinkImpl(process) } /** @internal */ const forEachWhileReader = <In, E, R>( f: (input: In) => Effect.Effect<boolean, E, R>, input: Chunk.Chunk<In>, index: number, length: number, cont: Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, E, E, void, unknown, R> ): Channel.Channel<Chunk.Chunk<In>, Chunk.Chunk<In>, E, E, void, unknown, R> => { if (index === length) { return cont } return pipe( core.fromEffect(f(pipe(input, Chunk.unsafeGet(index)))), core.flatMap((bool) => bool ? forEachWhileReader(f, input, index + 1, length, cont) : core.write(pipe(input, Chunk.drop(index))) ), channel.catchAll((error) => pipe(core.write(pipe(input, Chunk.drop(index))), channel.zipRight(core.fail(error)))) ) } /** @internal */ export const forEachChunkWhile = <In, E, R>( f: (input: Chunk.Chunk<In>) => Effect.Effect<boolean, E, R> ): Sink.Sink<void, In, In, E, R> => { const reader: Channel.Channel<never, Chunk.Chunk<In>, E, E, void, unknown, R> = core.readWith({ onInput: (input: Chunk.Chunk<In>) => pipe( core.fromEffect(f(input)), core.flatMap((cont) => cont ? reader : core.void) ), onFailure: core.fail, onDone: () => core.void }) return new SinkImpl(reader) } /** @internal */ export const fromChannel = <L, In, E, A, R>( channel: Channel.Channel<Chunk.Chunk<L>, Chunk.Chunk<In>, E, never, A, unknown, R> ): Sink.Sink<A, In, L, E, R> => new SinkImpl(channel) /** @internal */ export const fromEffect = <A, E, R>(effect: Effect.Effect<A, E, R>): Sink.Sink<A, unknown, never, E, R> => new SinkImpl(core.fromEffect(effect)) /** @internal */ export const fromPubSub = <In>( pubsub: PubSub.PubSub<In>, options?: { readonly shutdown?: boolean | undefined } ): Sink.Sink<void, In> => fromQueue(pubsub, options) /** @internal */ export const fromPush = <In, L0, R0, L, R>( push: Effect.Effect< (_: Option.Option<Chunk.Chunk<In>>) => Effect.Effect<void, readonly [Either.Either<R0, L0>, Chunk.Chunk<L>], R>, never, R > ): Sink.Sink<R0, In, L, L0, Exclude<R, Scope.Scope>> => new SinkImpl(channel.unwrapScoped(pipe(push, Effect.map(fromPushPull)))) const fromPushPull = <In, Z, E, L, R>( push: ( option: Option.Option<Chunk.Chunk<In>> ) => Effect.Effect<void, readonly [Either.Either<Z, E>, Chunk.Chunk<L>], R> ): Channel.Channel<Chunk.Chunk<L>, Chunk.Chunk<In>, E, never, Z, unknown, R> => core.readWith({ onInput: (input: Chunk.Chunk<In>) => channel.foldChannel(core.fromEffect(push(Option.some(input))), { onFailure: ([either, leftovers]) => Either.match(either, { onLeft: (error) => pipe(core.write(leftovers), channel.zipRight(core.fail(error))), onRight: (z) => pipe(core.write(leftovers), channel.zipRight(core.succeedNow(z))) }), onSuccess: () => fromPushPull(push) }), onFailure: core.fail, onDone: () => channel.foldChannel(core.fromEffect(push(Option.none())), { onFailure: ([either, leftovers]) => Either.match(either, { onLeft: (error) => pipe(core.write(leftovers), channel.zipRight(core.fail(error))), onRight: (z) => pipe(core.write(leftovers), channel.zipRight(core.succeedNow(z))) }), onSuccess: () => core.fromEffect( Effect.dieMessage( "BUG: Sink.fromPush - please report an issue at https://github.com/Effect-TS/effect/issues" ) ) }) }) /** @internal */ export const fromQueue = <In>( queue: Queue.Enqueue<In>, options?: { readonly shutdown?: boolean | undefined } ): Sink.Sink<void, In> => options?.shutdown ? unwrapScoped( Effect.map( Effect.acquireRelease(Effect.succeed(queue), Queue.shutdown), fromQueue ) ) : forEachChunk((input: Chunk.Chunk<In>) => pipe(Queue.offerAll(queue, input))) /** @internal */ export const head = <In>(): Sink.Sink<Option.Option<In>, In, In> => fold( Option.none() as Option.Option<In>, Option.isNone, (option, input) => Option.match(option, { onNone: () => Option.some(input), onSome: () => option }) ) /** @internal */ export const ignoreLeftover = <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>): Sink.Sink<A, In, never, E, R> => new SinkImpl(channel.drain(toChannel(self))) /** @internal */ export const last = <In>(): Sink.Sink<Option.Option<In>, In, In> => foldLeftChunks(Option.none<In>(), (s, input) => Option.orElse(Chunk.last(input), () => s)) /** @internal */ export const leftover = <L>(chunk: Chunk.Chunk<L>): Sink.Sink<void, unknown, L> => new SinkImpl(core.suspend(() => core.write(chunk))) /** @internal */ export const map = dual< <A, A2>(f: (a: A) => A2) => <In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A2, In, L, E, R>, <A, In, L, E, R, A2>(self: Sink.Sink<A, In, L, E, R>, f: (a: A) => A2) => Sink.Sink<A2, In, L, E, R> >(2, (self, f) => { return new SinkImpl(pipe(toChannel(self), channel.map(f))) }) /** @internal */ export const mapEffect = dual< <A, A2, E2, R2>( f: (a: A) => Effect.Effect<A2, E2, R2> ) => <In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A2, In, L, E2 | E, R2 | R>, <A, In, L, E, R, A2, E2, R2>( self: Sink.Sink<A, In, L, E, R>, f: (a: A) => Effect.Effect<A2, E2, R2> ) => Sink.Sink<A2, In, L, E2 | E, R2 | R> >( 2, (self, f) => new SinkImpl(pipe(toChannel(self), channel.mapEffect(f))) ) /** @internal */ export const mapError = dual< <E, E2>(f: (error: E) => E2) => <A, In, L, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In, L, E2, R>, <A, In, L, E, R, E2>(self: Sink.Sink<A, In, L, E, R>, f: (error: E) => E2) => Sink.Sink<A, In, L, E2, R> >( 2, (self, f) => new SinkImpl(pipe(toChannel(self), channel.mapError(f))) ) /** @internal */ export const mapLeftover = dual< <L, L2>(f: (leftover: L) => L2) => <A, In, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In, L2, E, R>, <A, In, L, E, R, L2>(self: Sink.Sink<A, In, L, E, R>, f: (leftover: L) => L2) => Sink.Sink<A, In, L2, E, R> >( 2, (self, f) => new SinkImpl(pipe(toChannel(self), channel.mapOut(Chunk.map(f)))) ) /** @internal */ export const never: Sink.Sink<never, unknown> = fromEffect(Effect.never) /** @internal */ export const orElse = dual< <A2, In2, L2, E2, R2>( that: LazyArg<Sink.Sink<A2, In2, L2, E2, R2>> ) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A2 | A, In & In2, L2 | L, E2 | E, R2 | R>, <A, In, L, E, R, A2, In2, L2, E2, R2>( self: Sink.Sink<A, In, L, E, R>, that: LazyArg<Sink.Sink<A2, In2, L2, E2, R2>> ) => Sink.Sink<A2 | A, In & In2, L2 | L, E2 | E, R2 | R> >( 2, <A, In, L, E, R, A2, In2, L2, E2, R2>( self: Sink.Sink<A, In, L, E, R>, that: LazyArg<Sink.Sink<A2, In2, L2, E2, R2>> ): Sink.Sink<A | A2, In & In2, L | L2, E | E2, R | R2> => new SinkImpl<A | A2, In & In2, L | L2, E | E2, R | R2>( pipe(toChannel(self), channel.orElse(() => toChannel(that()))) ) ) /** @internal */ export const provideContext = dual< <R>(context: Context.Context<R>) => <A, In, L, E>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In, L, E>, <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>, context: Context.Context<R>) => Sink.Sink<A, In, L, E> >( 2, (self, context) => new SinkImpl(pipe(toChannel(self), core.provideContext(context))) ) /** @internal */ export const race = dual< <R1, E1, In1, L1, A1>( that: Sink.Sink<A1, In1, L1, E1, R1> ) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A1 | A, In & In1, L1 | L, E1 | E, R1 | R>, <A, In, L, E, R, A1, In1, L1, E1, R1>( self: Sink.Sink<A, In, L, E, R>, that: Sink.Sink<A1, In1, L1, E1, R1> ) => Sink.Sink<A1 | A, In & In1, L1 | L, E1 | E, R1 | R> >( 2, (self, that) => pipe(self, raceBoth(that), map(Either.merge)) ) /** @internal */ export const raceBoth = dual< <A1, In1, L1, E1, R1>( that: Sink.Sink<A1, In1, L1, E1, R1>, options?: { readonly capacity?: number | undefined } ) => <A, In, L, E, R>( self: Sink.Sink<A, In, L, E, R> ) => Sink.Sink<Either.Either<A1, A>, In & In1, L1 | L, E1 | E, R1 | R>, <A, In, L, E, R, A1, In1, L1, E1, R1>( self: Sink.Sink<A, In, L, E, R>, that: Sink.Sink<A1, In1, L1, E1, R1>, options?: { readonly capacity?: number | undefined } ) => Sink.Sink<Either.Either<A1, A>, In & In1, L1 | L, E1 | E, R1 | R> >( (args) => isSink(args[1]), (self, that, options) => raceWith(self, { other: that, onSelfDone: (selfDone) => mergeDecision.Done(Effect.map(selfDone, Either.left)), onOtherDone: (thatDone) => mergeDecision.Done(Effect.map(thatDone, Either.right)), capacity: options?.capacity ?? 16 }) ) /** @internal */ export const raceWith = dual< <A2, In2, L2, E2, R2, A, E, A3, A4>( options: { readonly other: Sink.Sink<A2, In2, L2, E2, R2> readonly onSelfDone: (exit: Exit.Exit<A, E>) => MergeDecision.MergeDecision<R2, E2, A2, E2 | E, A3> readonly onOtherDone: (exit: Exit.Exit<A2, E2>) => MergeDecision.MergeDecision<R2, E, A, E2 | E, A4> readonly capacity?: number | undefined } ) => <In, L, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A3 | A4, In & In2, L2 | L, E2 | E, R2 | R>, <A, In, L, E, R, A2, In2, L2, E2, R2, A3, A4>( self: Sink.Sink<A, In, L, E, R>, options: { readonly other: Sink.Sink<A2, In2, L2, E2, R2> readonly onSelfDone: (exit: Exit.Exit<A, E>) => MergeDecision.MergeDecision<R2, E2, A2, E2 | E, A3> readonly onOtherDone: (exit: Exit.Exit<A2, E2>) => MergeDecision.MergeDecision<R2, E, A, E2 | E, A4> readonly capacity?: number | undefined } ) => Sink.Sink<A3 | A4, In & In2, L2 | L, E2 | E, R2 | R> >( 2, <A, In, L, E, R, A2, In2, L2, E2, R2, A3, A4>( self: Sink.Sink<A, In, L, E, R>, options: { readonly other: Sink.Sink<A2, In2, L2, E2, R2> readonly onSelfDone: (exit: Exit.Exit<A, E>) => MergeDecision.MergeDecision<R2, E2, A2, E2 | E, A3> readonly onOtherDone: (exit: Exit.Exit<A2, E2>) => MergeDecision.MergeDecision<R2, E, A, E2 | E, A4> readonly capacity?: number | undefined } ): Sink.Sink<A3 | A4, In & In2, L2 | L, E2 | E, R2 | R> => { function race(scope: Scope.Scope) { return Effect.gen(function*() { const pubsub = yield* PubSub.bounded< Either.Either<Chunk.Chunk<In & In2>, Exit.Exit<unknown>> >(options?.capacity ?? 16) const subscription1 = yield* Scope.extend(PubSub.subscribe(pubsub), scope) const subscription2 = yield* Scope.extend(PubSub.subscribe(pubsub), scope) const reader = channel.toPubSub(pubsub) const writer = channel.fromQueue(subscription1).pipe( core.pipeTo(toChannel(self)), channel.zipLeft(core.fromEffect(Queue.shutdown(subscription1))), channel.mergeWith({ other: channel.fromQueue(subscription2).pipe( core.pipeTo(toChannel(options.other)), channel.zipLeft(core.fromEffect(Queue.shutdown(subscription2))) ), onSelfDone: options.onSelfDone, onOtherDone: options.onOtherDone }) ) const racedChannel = channel.mergeWith(reader, { other: writer, onSelfDone: () => mergeDecision.Await(identity), onOtherDone: (exit) => mergeDecision.Done(exit) }) as Channel.Channel< Chunk.Chunk<L | L2>, Chunk.Chunk<In & In2>, E | E2, never, A3 | A4, unknown, R | R2 > return new SinkImpl(racedChannel) }) } return unwrapScopedWith(race) } ) /** @internal */ export const refineOrDie = dual< <E, E2>( pf: (error: E) => Option.Option<E2> ) => <A, In, L, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In, L, E2, R>, <A, In, L, E, R, E2>( self: Sink.Sink<A, In, L, E, R>, pf: (error: E) => Option.Option<E2> ) => Sink.Sink<A, In, L, E2, R> >( 2, (self, pf) => pipe(self, refineOrDieWith(pf, identity)) ) /** @internal */ export const refineOrDieWith = dual< <E, E2>( pf: (error: E) => Option.Option<E2>, f: (error: E) => unknown ) => <A, In, L, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In, L, E2, R>, <A, In, L, E, R, E2>( self: Sink.Sink<A, In, L, E, R>, pf: (error: E) => Option.Option<E2>, f: (error: E) => unknown ) => Sink.Sink<A, In, L, E2, R> >( 3, (self, pf, f) => { const newChannel = pipe( self, toChannel, channel.catchAll((error) => Option.match(pf(error), { onNone: () => core.failCauseSync(() => Cause.die(f(error))), onSome: core.fail }) ) ) return new SinkImpl(newChannel) } ) /** @internal */ export const service = <I, S>( tag: Context.Tag<I, S> ): Sink.Sink<S, unknown, never, never, I> => serviceWith(tag, identity) /** @internal */ export const serviceWith = <I, S, Z>( tag: Context.Tag<I, S>, f: (service: Types.NoInfer<S>) => Z ): Sink.Sink<Z, unknown, never, never, I> => fromEffect(Effect.map(tag, f)) /** @internal */ export const serviceWithEffect = <I, S, R, E, Z>( tag: Context.Tag<I, S>, f: (service: Types.NoInfer<S>) => Effect.Effect<Z, E, R> ): Sink.Sink<Z, unknown, never, E, R | I> => fromEffect(Effect.flatMap(tag, f)) /** @internal */ export const serviceWithSink = <I, S, R, E, In, L, Z>( tag: Context.Tag<I, S>, f: (service: Types.NoInfer<S>) => Sink.Sink<Z, In, L, E, R> ): Sink.Sink<Z, In, L, E, R | I> => new SinkImpl(pipe(Effect.map(tag, (service) => toChannel(f(service))), channel.unwrap)) /** @internal */ export const some = <In>(predicate: Predicate<In>): Sink.Sink<boolean, In, In> => fold(false, (bool) => !bool, (acc, input) => acc || predicate(input)) /** @internal */ export const splitWhere = dual< <In>(f: Predicate<In>) => <A, L extends In, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In, In, E, R>, <A, In, L extends In, E, R>(self: Sink.Sink<A, In, L, E, R>, f: Predicate<In>) => Sink.Sink<A, In, In, E, R> >(2, <A, In, L extends In, E, R>(self: Sink.Sink<A, In, L, E, R>, f: Predicate<In>): Sink.Sink<A, In, In, E, R> => { const newChannel = pipe( core.fromEffect(Ref.make(Chunk.empty<In>())), core.flatMap((ref) => pipe( splitWhereSplitter<In, E>(false, ref, f), channel.pipeToOrFail(toChannel(self)), core.collectElements, core.flatMap(([leftovers, z]) => pipe( core.fromEffect(Ref.get(ref)), core.flatMap((leftover) => pipe( core.write<Chunk.Chunk<In>>(pipe(leftover, Chunk.appendAll(Chunk.flatten(leftovers)))), channel.zipRight(core.succeed(z)) ) ) ) ) ) ) ) return new SinkImpl(newChannel) }) /** @internal */ const splitWhereSplitter = <A, E>( written: boolean, leftovers: Ref.Ref<Chunk.Chunk<A>>, f: Predicate<A> ): Channel.Channel<Chunk.Chunk<A>, Chunk.Chunk<A>, E, never, unknown, unknown> => core.readWithCause({ onInput: (input) => { if (Chunk.isEmpty(input)) { return splitWhereSplitter(written, leftovers, f) } if (written) { const index = indexWhere(input, f) if (index === -1) { return channel.zipRight( core.write(input), splitWhereSplitter<A, E>(true, leftovers, f) ) } const [left, right] = Chunk.splitAt(input, index) return channel.zipRight( core.write(left), core.fromEffect(Ref.set(leftovers, right)) ) } const index = indexWhere(input, f, 1) if (index === -1) { return channel.zipRight( core.write(input), splitWhereSplitter<A, E>(true, leftovers, f) ) } const [left, right] = pipe(input, Chunk.splitAt(Math.max(index, 1))) return channel.zipRight(core.write(left), core.fromEffect(Ref.set(leftovers, right))) }, onFailure: core.failCause, onDone: core.succeed }) /** @internal */ const indexWhere = <A>(self: Chunk.Chunk<A>, predicate: Predicate<A>, from = 0): number => { const iterator = self[Symbol.iterator]() let index = 0 let result = -1 let next: IteratorResult<A, any> while (result < 0 && (next = iterator.next()) && !next.done) { const a = next.value if (index >= from && predicate(a)) { result = index } index = index + 1 } return result } /** @internal */ export const succeed = <A>(a: A): Sink.Sink<A, unknown> => new SinkImpl(core.succeed(a)) /** @internal */ export const sum: Sink.Sink<number, number> = foldLeftChunks( 0, (acc, chunk) => acc + Chunk.reduce(chunk, 0, (s, a) => s + a) ) /** @internal */ export const summarized = dual< <A2, E2, R2, A3>( summary: Effect.Effect<A2, E2, R2>, f: (start: A2, end: A2) => A3 ) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<[A, A3], In, L, E2 | E, R2 | R>, <A, In, L, E, R, A2, E2, R2, A3>( self: Sink.Sink<A, In, L, E, R>, summary: Effect.Effect<A2, E2, R2>, f: (start: A2, end: A2) => A3 ) => Sink.Sink<[A, A3], In, L, E2 | E, R2 | R> >( 3, (self, summary, f) => { const newChannel = pipe( core.fromEffect(summary), core.flatMap((start) => pipe( self, toChannel, core.flatMap((done) => pipe( core.fromEffect(summary), channel.map((end) => [done, f(start, end)]) ) ) ) ) ) return new SinkImpl(newChannel) } ) /** @internal */ export const sync = <A>(evaluate: LazyArg<A>): Sink.Sink<A, unknown> => new SinkImpl(core.sync(evaluate)) /** @internal */ export const take = <In>(n: number): Sink.Sink<Chunk.Chunk<In>, In, In> => pipe( foldChunks<Chunk.Chunk<In>, In>( Chunk.empty(), (chunk) => chunk.length < n, (acc, chunk) => pipe(acc, Chunk.appendAll(chunk)) ), flatMap((acc) => { const [taken, leftover] = pipe(acc, Chunk.splitAt(n)) return new SinkImpl(pipe(core.write(leftover), channel.zipRight(core.succeedNow(taken)))) }) ) /** @internal */ export const toChannel = <A, In, L, E, R>( self: Sink.Sink<A, In, L, E, R> ): Channel.Channel<Chunk.Chunk<L>, Chunk.Chunk<In>, E, never, A, unknown, R> => Effect.isEffect(self) ? toChannel(fromEffect(self as Effect.Effect<A, E, R>)) : (self as SinkImpl<A, In, L, E, R>).channel /** @internal */ export const unwrap = <A, In, L, E2, R2, E, R>( effect: Effect.Effect<Sink.Sink<A, In, L, E2, R2>, E, R> ): Sink.Sink<A, In, L, E | E2, R | R2> => new SinkImpl( channel.unwrap(pipe(effect, Effect.map((sink) => toChannel(sink)))) ) /** @internal */ export const unwrapScoped = <A, In, L, E, R>( effect: Effect.Effect<Sink.Sink<A, In, L, E, R>, E, R> ): Sink.Sink<A, In, L, E, Exclude<R, Scope.Scope>> => new SinkImpl( channel.unwrapScoped(effect.pipe( Effect.map((sink) => toChannel(sink)) )) ) /** @internal */ export const unwrapScopedWith = <A, In, L, E, R>( f: (scope: Scope.Scope) => Effect.Effect<Sink.Sink<A, In, L, E, R>, E, R> ): Sink.Sink<A, In, L, E, R> => new SinkImpl( channel.unwrapScopedWith((scope) => f(scope).pipe( Effect.map((sink) => toChannel(sink)) ) ) ) /** @internal */ export const withDuration = <A, In, L, E, R>( self: Sink.Sink<A, In, L, E, R> ): Sink.Sink<[A, Duration.Duration], In, L, E, R> => pipe(self, summarized(Clock.currentTimeMillis, (start, end) => Duration.millis(end - start))) /** @internal */ export const zip = dual< <A2, In, In2 extends In, L2, E2, R2>( that: Sink.Sink<A2, In2, L2, E2, R2>, options?: { readonly concurrent?: boolean | undefined } ) => <A, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<[A, A2], In & In2, L | L2, E2 | E, R2 | R>, <A, In, L, E, R, A2, In2 extends In, L2, E2, R2>( self: Sink.Sink<A, In, L, E, R>, that: Sink.Sink<A2, In2, L2, E2, R2>, options?: { readonly concurrent?: boolean | undefined } ) => Sink.Sink<[A, A2], In & In2, L | L2, E2 | E, R2 | R> >( (args) => isSink(args[1]), <A, In, L, E, R, A2, In2 extends In, L2, E2, R2>( self: Sink.Sink<A, In, L, E, R>, that: Sink.Sink<A2, In2, L2, E2, R2>, options?: { readonly concurrent?: boolean | undefined } ): Sink.Sink<[A, A2], In & In2, L | L2, E2 | E, R2 | R> => zipWith(self, that, (z, z2) => [z, z2], options) ) /** @internal */ export const zipLeft = dual< <A2, In, In2 extends In, L2, E2, R2>( that: Sink.Sink<A2, In2, L2, E2, R2>, options?: { readonly concurrent?: boolean | undefined } ) => <A, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In & In2, L | L2, E2 | E, R2 | R>, <A, In, L, E, R, A2, In2 extends In, L2, E2, R2>( self: Sink.Sink<A, In, L, E, R>, that: Sink.Sink<A2, In2, L2, E2, R2>, options?: { readonly concurrent?: boolean | undefined } ) => Sink.Sink<A, In & In2, L | L2, E2 | E, R2 | R> >( (args) => isSink(args[1]), <A, In, L, E, R, A2, In2 extends In, L2, E2, R2>( self: Sink.Sink<A, In, L, E, R>, that: Sink.Sink<A2, In2, L2, E2, R2>, options?: { readonly concurrent?: boolean | undefined } ): Sink.Sink<A, In & In2, L | L2, E2 | E, R2 | R> => zipWith(self, that, (z, _) => z, options) ) /** @internal */ export const zipRight = dual< <A2, In, In2 extends In, L2, E2, R2>( that: Sink.Sink<A2, In2, L2, E2, R2>, options?: { readonly concurrent?: boolean | undefined } ) => <A, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A2, In & In2, L | L2, E2 | E, R2 | R>, <A, In, L, E, R, A2, In2 extends In, L2, E2, R2>( self: Sink.Sink<A, In, L, E, R>, that: Sink.Sink<A2, In2, L2, E2, R2>, options?: { readonly concurrent?: boolean | undefined } ) => Sink.Sink<A2, In & In2, L | L2, E2 | E, R2 | R> >( (args) => isSink(args[1]), <A, In, L, E, R, A2, In2 extends In, L2, E2, R2>( self: Sink.Sink<A, In, L, E, R>, that: Sink.Sink<A2, In2, L2, E2, R2>, options?: { readonly concurrent?: boolean | undefined } ): Sink.Sink<A2, In & In2, L | L2, E2 | E, R2 | R> => zipWith(self, that, (_, z2) => z2, options) ) /** @internal */ export const zipWith = dual< <A2, In, In2 extends In, L2, E2, R2, A, A3>( that: Sink.Sink<A2, In2, L2, E2, R2>, f: (a: A, a2: A2) => A3, options?: { readonly concurrent?: boolean | undefined } ) => <L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A3, In & In2, L | L2, E2 | E, R2 | R>, <A, In, L, E, R, A2, In2 extends In, L2, E2, R2, A3>( self: Sink.Sink<A, In, L, E, R>, that: Sink.Sink<A2, In2, L2, E2, R2>, f: (a: A, a2: A2) => A3, options?: { readonly concurrent?: boolean | undefined } ) => Sink.Sink<A3, In & In2, L | L2, E2 | E, R2 | R> >( (args) => isSink(args[1]), <A, In, L, E, R, A2, In2 extends In, L2, E2, R2, A3>( self: Sink.Sink<A, In, L, E, R>, that: Sink.Sink<A2, In2, L2, E2, R2>, f: (a: A, a2: A2) => A3, options?: { readonly concurrent?: boolean | undefined } ): Sink.Sink<A3, In & In2, L | L2, E2 | E, R2 | R> => options?.concurrent ? raceWith(self, { other: that, onSelfDone: Exit.match({ onFailure: (cause) => mergeDecision.Done(Effect.failCause(cause)), onSuccess: (leftZ) => mergeDecision.Await<R | R2, E2, A2, E | E2, A3>( Exit.match({ onFailure: Effect.failCause, onSuccess: (rightZ) => Effect.succeed(f(leftZ, rightZ)) }) ) }), onOtherDone: Exit.match({ onFailure: (cause) => mergeDecision.Done(Effect.failCause(cause)), onSuccess: (rightZ) => mergeDecision.Await<R | R2, E, A, E | E2, A3>( Exit.match({ onFailure: Effect.failCause, onSuccess: (leftZ) => Effect.succeed(f(leftZ, rightZ)) }) ) }) }) : flatMap(self, (z) => map(that, (z2) => f(z, z2))) ) // Circular with Channel /** @internal */ export const channelToSink = <OutElem, InElem, OutErr, InErr, OutDone, Env>( self: Channel.Channel<Chunk.Chunk<OutElem>, Chunk.Chunk<InElem>, OutErr, InErr, OutDone, unknown, Env> ): Sink.Sink<OutDone, InElem, OutElem, OutErr, Env> => new SinkImpl(self) // Constants /** @internal */ export const count: Sink.Sink<number, unknown> = foldLeftChunks( 0, (acc, chunk) => acc + chunk.length ) /** @internal */ export const mkString: Sink.Sink<string, unknown> = suspend(() => { const strings: Array<string> = [] return pipe( foldLeftChunks<void, unknown>(void 0, (_, elems) => Chunk.map(elems, (elem) => { strings.push(String(elem)) })), map(() => strings.join("")) ) }) /** @internal */ export const timed: Sink.Sink<Duration.Duration, unknown> = pipe( withDuration(drain), map((tuple) => tuple[1]) )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ssv445/lorem-ipsum-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server