Skip to main content
Glama
ssv445

Lorem Ipsum MCP Server

by ssv445
groupBy.ts17.6 kB
import * as Cause from "../Cause.js" import type * as Channel from "../Channel.js" import * as Chunk from "../Chunk.js" import * as Deferred from "../Deferred.js" import * as Effect from "../Effect.js" import * as Effectable from "../Effectable.js" import * as Exit from "../Exit.js" import { dual, pipe } from "../Function.js" import type * as GroupBy from "../GroupBy.js" import * as Option from "../Option.js" import { pipeArguments } from "../Pipeable.js" import { hasProperty, type Predicate } from "../Predicate.js" import * as Queue from "../Queue.js" import * as Ref from "../Ref.js" import * as Scope from "../Scope.js" import type * as Stream from "../Stream.js" import type * as Take from "../Take.js" import type { NoInfer } from "../Types.js" import * as channel from "./channel.js" import * as channelExecutor from "./channel/channelExecutor.js" import * as core from "./core-stream.js" import * as stream from "./stream.js" import * as take from "./take.js" /** @internal */ const GroupBySymbolKey = "effect/GroupBy" /** @internal */ export const GroupByTypeId: GroupBy.GroupByTypeId = Symbol.for( GroupBySymbolKey ) as GroupBy.GroupByTypeId const groupByVariance = { /* c8 ignore next */ _R: (_: never) => _, /* c8 ignore next */ _E: (_: never) => _, /* c8 ignore next */ _K: (_: never) => _, /* c8 ignore next */ _V: (_: never) => _ } /** @internal */ export const isGroupBy = (u: unknown): u is GroupBy.GroupBy<unknown, unknown, unknown, unknown> => hasProperty(u, GroupByTypeId) /** @internal */ export const evaluate = dual< <K, V, E, A, E2, R2>( f: (key: K, stream: Stream.Stream<V, E>) => Stream.Stream<A, E2, R2>, options?: { readonly bufferSize?: number | undefined } ) => <R>(self: GroupBy.GroupBy<K, V, E, R>) => Stream.Stream<A, E | E2, R2 | R>, <K, V, E, R, A, E2, R2>( self: GroupBy.GroupBy<K, V, E, R>, f: (key: K, stream: Stream.Stream<V, E>) => Stream.Stream<A, E2, R2>, options?: { readonly bufferSize?: number | undefined } ) => Stream.Stream<A, E | E2, R2 | R> >( (args) => isGroupBy(args[0]), <K, V, E, R, A, E2, R2>( self: GroupBy.GroupBy<K, V, E, R>, f: (key: K, stream: Stream.Stream<V, E>) => Stream.Stream<A, E2, R2>, options?: { readonly bufferSize?: number | undefined } ): Stream.Stream<A, E | E2, R2 | R> => stream.flatMap( self.grouped, ([key, queue]) => f(key, stream.flattenTake(stream.fromQueue(queue, { shutdown: true }))), { concurrency: "unbounded", bufferSize: options?.bufferSize ?? 16 } ) ) /** @internal */ export const filter = dual< <K>(predicate: Predicate<NoInfer<K>>) => <V, E, R>(self: GroupBy.GroupBy<K, V, E, R>) => GroupBy.GroupBy<K, V, E, R>, <K, V, E, R>(self: GroupBy.GroupBy<K, V, E, R>, predicate: Predicate<K>) => GroupBy.GroupBy<K, V, E, R> >(2, <K, V, E, R>(self: GroupBy.GroupBy<K, V, E, R>, predicate: Predicate<K>): GroupBy.GroupBy<K, V, E, R> => make( pipe( self.grouped, stream.filterEffect((tuple) => { if (predicate(tuple[0])) { return pipe(Effect.succeed(tuple), Effect.as(true)) } return pipe(Queue.shutdown(tuple[1]), Effect.as(false)) }) ) )) /** @internal */ export const first = dual< (n: number) => <K, V, E, R>(self: GroupBy.GroupBy<K, V, E, R>) => GroupBy.GroupBy<K, V, E, R>, <K, V, E, R>(self: GroupBy.GroupBy<K, V, E, R>, n: number) => GroupBy.GroupBy<K, V, E, R> >(2, <K, V, E, R>(self: GroupBy.GroupBy<K, V, E, R>, n: number): GroupBy.GroupBy<K, V, E, R> => make( pipe( stream.zipWithIndex(self.grouped), stream.filterEffect((tuple) => { const index = tuple[1] const queue = tuple[0][1] if (index < n) { return pipe(Effect.succeed(tuple), Effect.as(true)) } return pipe(Queue.shutdown(queue), Effect.as(false)) }), stream.map((tuple) => tuple[0]) ) )) /** @internal */ export const make = <K, V, E, R>( grouped: Stream.Stream<readonly [K, Queue.Dequeue<Take.Take<V, E>>], E, R> ): GroupBy.GroupBy<K, V, E, R> => ({ [GroupByTypeId]: groupByVariance, pipe() { return pipeArguments(this, arguments) }, grouped }) // Circular with Stream /** @internal */ export const groupBy = dual< <A, K, V, E2, R2>( f: (a: A) => Effect.Effect<readonly [K, V], E2, R2>, options?: { readonly bufferSize?: number | undefined } ) => <E, R>(self: Stream.Stream<A, E, R>) => GroupBy.GroupBy<K, V, E2 | E, R2 | R>, <A, E, R, K, V, E2, R2>( self: Stream.Stream<A, E, R>, f: (a: A) => Effect.Effect<readonly [K, V], E2, R2>, options?: { readonly bufferSize?: number | undefined } ) => GroupBy.GroupBy<K, V, E2 | E, R2 | R> >( (args) => stream.isStream(args[0]), <A, E, R, K, V, E2, R2>( self: Stream.Stream<A, E, R>, f: (a: A) => Effect.Effect<readonly [K, V], E2, R2>, options?: { readonly bufferSize?: number | undefined } ): GroupBy.GroupBy<K, V, E | E2, R | R2> => make( stream.unwrapScoped( Effect.gen(function*() { const decider = yield* Deferred.make<(key: K, value: V) => Effect.Effect<Predicate<number>>>() const output = yield* Effect.acquireRelease( Queue.bounded<Exit.Exit<readonly [K, Queue.Dequeue<Take.Take<V, E | E2>>], Option.Option<E | E2>>>( options?.bufferSize ?? 16 ), (queue) => Queue.shutdown(queue) ) const ref = yield* Ref.make<Map<K, number>>(new Map()) const add = yield* pipe( stream.mapEffectSequential(self, f), stream.distributedWithDynamicCallback( options?.bufferSize ?? 16, ([key, value]) => Effect.flatMap(Deferred.await(decider), (f) => f(key, value)), (exit) => Queue.offer(output, exit) ) ) yield* Deferred.succeed(decider, (key, _) => pipe( Ref.get(ref), Effect.map((map) => Option.fromNullable(map.get(key))), Effect.flatMap(Option.match({ onNone: () => Effect.flatMap(add, ([index, queue]) => Effect.zipRight( Ref.update(ref, (map) => map.set(key, index)), pipe( Queue.offer( output, Exit.succeed( [ key, mapDequeue(queue, (exit) => new take.TakeImpl(pipe( exit, Exit.map((tuple) => Chunk.of(tuple[1])) ))) ] as const ) ), Effect.as<Predicate<number>>((n: number) => n === index) ) )), onSome: (index) => Effect.succeed<Predicate<number>>((n: number) => n === index) })) )) return stream.flattenExitOption(stream.fromQueue(output, { shutdown: true })) }) ) ) ) /** @internal */ export const mapEffectOptions = dual< { <A, A2, E2, R2>( f: (a: A) => Effect.Effect<A2, E2, R2>, options?: { readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined } ): <E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A2, E2 | E, R2 | R> <A, A2, E2, R2, K>( f: (a: A) => Effect.Effect<A2, E2, R2>, options: { readonly key: (a: A) => K readonly bufferSize?: number | undefined } ): <E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A2, E2 | E, R2 | R> }, { <A, E, R, A2, E2, R2>( self: Stream.Stream<A, E, R>, f: (a: A) => Effect.Effect<A2, E2, R2>, options?: { readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined } ): Stream.Stream<A2, E2 | E, R2 | R> <A, E, R, A2, E2, R2, K>( self: Stream.Stream<A, E, R>, f: (a: A) => Effect.Effect<A2, E2, R2>, options: { readonly key: (a: A) => K readonly bufferSize?: number | undefined } ): Stream.Stream<A2, E2 | E, R2 | R> } >( (args) => typeof args[0] !== "function", (<A, E, R, A2, E2, R2, K>( self: Stream.Stream<A, E, R>, f: (a: A) => Effect.Effect<A2, E2, R2>, options?: { readonly key?: ((a: A) => K) | undefined readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined readonly bufferSize?: number | undefined } ): Stream.Stream<A2, E2 | E, R2 | R> => { if (options?.key) { return evaluate( groupByKey(self, options.key, { bufferSize: options.bufferSize }), (_, s) => stream.mapEffectSequential(s, f) ) } return stream.matchConcurrency( options?.concurrency, () => stream.mapEffectSequential(self, f), (n) => options?.unordered ? stream.flatMap(self, (a) => stream.fromEffect(f(a)), { concurrency: n }) : stream.mapEffectPar(self, n, f) ) }) as any ) /** @internal */ export const bindEffect = dual< <N extends string, A, B, E2, R2>( tag: Exclude<N, keyof A>, f: (_: NoInfer<A>) => Effect.Effect<B, E2, R2>, options?: { readonly concurrency?: number | "unbounded" | undefined readonly bufferSize?: number | undefined } ) => <E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream< { [K in keyof A | N]: K extends keyof A ? A[K] : B }, E | E2, R | R2 >, <A, E, R, N extends string, B, E2, R2>( self: Stream.Stream<A, E, R>, tag: Exclude<N, keyof A>, f: (_: NoInfer<A>) => Effect.Effect<B, E2, R2>, options?: { readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined } ) => Stream.Stream< { [K in keyof A | N]: K extends keyof A ? A[K] : B }, E | E2, R | R2 > >((args) => typeof args[0] !== "string", <A, E, R, N extends string, B, E2, R2>( self: Stream.Stream<A, E, R>, tag: Exclude<N, keyof A>, f: (_: A) => Effect.Effect<B, E2, R2>, options?: { readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined } ) => mapEffectOptions(self, (k) => Effect.map( f(k), (a) => ({ ...k, [tag]: a } as { [K in keyof A | N]: K extends keyof A ? A[K] : B }) ), options)) const mapDequeue = <A, B>(dequeue: Queue.Dequeue<A>, f: (a: A) => B): Queue.Dequeue<B> => new MapDequeue(dequeue, f) class MapDequeue<in out A, out B> extends Effectable.Class<B> implements Queue.Dequeue<B> { readonly [Queue.DequeueTypeId] = { _Out: (_: never) => _ } constructor( readonly dequeue: Queue.Dequeue<A>, readonly f: (a: A) => B ) { super() } capacity(): number { return Queue.capacity(this.dequeue) } get size(): Effect.Effect<number> { return Queue.size(this.dequeue) } unsafeSize(): Option.Option<number> { return this.dequeue.unsafeSize() } get awaitShutdown(): Effect.Effect<void> { return Queue.awaitShutdown(this.dequeue) } isActive(): boolean { return this.dequeue.isActive() } get isShutdown(): Effect.Effect<boolean> { return Queue.isShutdown(this.dequeue) } get shutdown(): Effect.Effect<void> { return Queue.shutdown(this.dequeue) } get isFull(): Effect.Effect<boolean> { return Queue.isFull(this.dequeue) } get isEmpty(): Effect.Effect<boolean> { return Queue.isEmpty(this.dequeue) } get take(): Effect.Effect<B> { return pipe(Queue.take(this.dequeue), Effect.map((a) => this.f(a))) } get takeAll(): Effect.Effect<Chunk.Chunk<B>> { return pipe(Queue.takeAll(this.dequeue), Effect.map(Chunk.map((a) => this.f(a)))) } takeUpTo(max: number): Effect.Effect<Chunk.Chunk<B>> { return pipe(Queue.takeUpTo(this.dequeue, max), Effect.map(Chunk.map((a) => this.f(a)))) } takeBetween(min: number, max: number): Effect.Effect<Chunk.Chunk<B>> { return pipe(Queue.takeBetween(this.dequeue, min, max), Effect.map(Chunk.map((a) => this.f(a)))) } takeN(n: number): Effect.Effect<Chunk.Chunk<B>> { return pipe(Queue.takeN(this.dequeue, n), Effect.map(Chunk.map((a) => this.f(a)))) } poll(): Effect.Effect<Option.Option<B>> { return pipe(Queue.poll(this.dequeue), Effect.map(Option.map((a) => this.f(a)))) } pipe() { return pipeArguments(this, arguments) } commit() { return this.take } } /** @internal */ export const groupByKey = dual< <A, K>( f: (a: A) => K, options?: { readonly bufferSize?: number | undefined } ) => <E, R>(self: Stream.Stream<A, E, R>) => GroupBy.GroupBy<K, A, E, R>, <A, E, R, K>( self: Stream.Stream<A, E, R>, f: (a: A) => K, options?: { readonly bufferSize?: number | undefined } ) => GroupBy.GroupBy<K, A, E, R> >( (args) => typeof args[0] !== "function", <A, E, R, K>( self: Stream.Stream<A, E, R>, f: (a: A) => K, options?: { readonly bufferSize?: number | undefined } ): GroupBy.GroupBy<K, A, E, R> => { const loop = ( map: Map<K, Queue.Queue<Take.Take<A, E>>>, outerQueue: Queue.Queue<Take.Take<readonly [K, Queue.Queue<Take.Take<A, E>>], E>> ): Channel.Channel<never, Chunk.Chunk<A>, E, E, unknown, unknown, R> => core.readWithCause({ onInput: (input: Chunk.Chunk<A>) => core.flatMap( core.fromEffect( Effect.forEach(groupByIterable(input, f), ([key, values]) => { const innerQueue = map.get(key) if (innerQueue === undefined) { return pipe( Queue.bounded<Take.Take<A, E>>(options?.bufferSize ?? 16), Effect.flatMap((innerQueue) => pipe( Effect.sync(() => { map.set(key, innerQueue) }), Effect.zipRight( Queue.offer(outerQueue, take.of([key, innerQueue] as const)) ), Effect.zipRight( pipe( Queue.offer(innerQueue, take.chunk(values)), Effect.catchSomeCause((cause) => Cause.isInterruptedOnly(cause) ? Option.some(Effect.void) : Option.none() ) ) ) ) ) ) } return Effect.catchSomeCause( Queue.offer(innerQueue, take.chunk(values)), (cause) => Cause.isInterruptedOnly(cause) ? Option.some(Effect.void) : Option.none() ) }, { discard: true }) ), () => loop(map, outerQueue) ), onFailure: (cause) => core.fromEffect(Queue.offer(outerQueue, take.failCause(cause))), onDone: () => pipe( core.fromEffect( pipe( Effect.forEach(map.entries(), ([_, innerQueue]) => pipe( Queue.offer(innerQueue, take.end), Effect.catchSomeCause((cause) => Cause.isInterruptedOnly(cause) ? Option.some(Effect.void) : Option.none() ) ), { discard: true }), Effect.zipRight(Queue.offer(outerQueue, take.end)) ) ) ) }) return make(stream.unwrapScopedWith((scope) => Effect.gen(function*() { const map = new Map<K, Queue.Queue<Take.Take<A, E>>>() const queue = yield* Queue.unbounded<Take.Take<readonly [K, Queue.Queue<Take.Take<A, E>>], E>>() yield* Scope.addFinalizer(scope, Queue.shutdown(queue)) return yield* stream.toChannel(self).pipe( core.pipeTo(loop(map, queue)), channel.drain, channelExecutor.runIn(scope), Effect.forkIn(scope), Effect.as(stream.flattenTake(stream.fromQueue(queue, { shutdown: true }))) ) }) )) } ) /** * A variant of `groupBy` that retains the insertion order of keys. * * @internal */ const groupByIterable = dual< <V, K>(f: (value: V) => K) => (iterable: Iterable<V>) => Chunk.Chunk<[K, Chunk.Chunk<V>]>, <V, K>(iterable: Iterable<V>, f: (value: V) => K) => Chunk.Chunk<[K, Chunk.Chunk<V>]> >(2, <V, K>(iterable: Iterable<V>, f: (value: V) => K): Chunk.Chunk<[K, Chunk.Chunk<V>]> => { const builder: Array<[K, Array<V>]> = [] const iterator = iterable[Symbol.iterator]() const map = new Map<K, Array<V>>() let next: IteratorResult<V, any> while ((next = iterator.next()) && !next.done) { const value = next.value const key = f(value) if (map.has(key)) { const innerBuilder = map.get(key)! innerBuilder.push(value) } else { const innerBuilder: Array<V> = [value] builder.push([key, innerBuilder]) map.set(key, innerBuilder) } } return Chunk.unsafeFromArray( builder.map((tuple) => [tuple[0], Chunk.unsafeFromArray(tuple[1])]) ) })

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ssv445/lorem-ipsum-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server