Skip to main content
Glama
ssv445

Lorem Ipsum MCP Server

by ssv445
mailbox.ts18.3 kB
import * as Arr from "../Array.js" import type { Cause } from "../Cause.js" import { NoSuchElementException } from "../Cause.js" import type { Channel } from "../Channel.js" import * as Chunk from "../Chunk.js" import type { Effect } from "../Effect.js" import * as Effectable from "../Effectable.js" import type { Exit } from "../Exit.js" import { dual } from "../Function.js" import * as Inspectable from "../Inspectable.js" import * as Iterable from "../Iterable.js" import type * as Api from "../Mailbox.js" import * as Option from "../Option.js" import { pipeArguments } from "../Pipeable.js" import { hasProperty } from "../Predicate.js" import type { Scheduler } from "../Scheduler.js" import type { Scope } from "../Scope.js" import type { Stream } from "../Stream.js" import * as channel from "./channel.js" import * as channelExecutor from "./channel/channelExecutor.js" import * as coreChannel from "./core-stream.js" import * as core from "./core.js" import * as circular from "./effect/circular.js" import * as fiberRuntime from "./fiberRuntime.js" import * as stream from "./stream.js" /** @internal */ export const TypeId: Api.TypeId = Symbol.for("effect/Mailbox") as Api.TypeId /** @internal */ export const ReadonlyTypeId: Api.ReadonlyTypeId = Symbol.for("effect/Mailbox/ReadonlyMailbox") as Api.ReadonlyTypeId /** @internal */ export const isMailbox = (u: unknown): u is Api.Mailbox<unknown, unknown> => hasProperty(u, TypeId) /** @internal */ export const isReadonlyMailbox = (u: unknown): u is Api.ReadonlyMailbox<unknown, unknown> => hasProperty(u, ReadonlyTypeId) type MailboxState<A, E> = { readonly _tag: "Open" readonly takers: Set<(_: Effect<void, E>) => void> readonly offers: Set<OfferEntry<A>> readonly awaiters: Set<(_: Effect<void, E>) => void> } | { readonly _tag: "Closing" readonly takers: Set<(_: Effect<void, E>) => void> readonly offers: Set<OfferEntry<A>> readonly awaiters: Set<(_: Effect<void, E>) => void> readonly exit: Exit<void, E> } | { readonly _tag: "Done" readonly exit: Exit<void, E> } type OfferEntry<A> = { readonly _tag: "Array" readonly remaining: Array<A> offset: number readonly resume: (_: Effect<Chunk.Chunk<A>>) => void } | { readonly _tag: "Single" readonly message: A readonly resume: (_: Effect<boolean>) => void } const empty = Chunk.empty() const exitEmpty = core.exitSucceed(empty) const exitFalse = core.exitSucceed(false) const exitTrue = core.exitSucceed(true) const constDone = [empty, true] as const class MailboxImpl<A, E> extends Effectable.Class<readonly [messages: Chunk.Chunk<A>, done: boolean], E> implements Api.Mailbox<A, E> { readonly [TypeId]: Api.TypeId = TypeId readonly [ReadonlyTypeId]: Api.ReadonlyTypeId = ReadonlyTypeId private state: MailboxState<A, E> = { _tag: "Open", takers: new Set(), offers: new Set(), awaiters: new Set() } private messages: Array<A> = [] private messagesChunk = Chunk.empty<A>() constructor( readonly scheduler: Scheduler, private capacity: number, readonly strategy: "suspend" | "dropping" | "sliding" ) { super() } offer(message: A): Effect<boolean> { return core.suspend(() => { if (this.state._tag !== "Open") { return exitFalse } else if (this.messages.length + this.messagesChunk.length >= this.capacity) { switch (this.strategy) { case "dropping": return exitFalse case "suspend": if (this.capacity <= 0 && this.state.takers.size > 0) { this.messages.push(message) this.releaseTaker() return exitTrue } return this.offerRemainingSingle(message) case "sliding": this.unsafeTake() this.messages.push(message) return exitTrue } } this.messages.push(message) this.scheduleReleaseTaker() return exitTrue }) } unsafeOffer(message: A): boolean { if (this.state._tag !== "Open") { return false } else if (this.messages.length + this.messagesChunk.length >= this.capacity) { if (this.strategy === "sliding") { this.unsafeTake() this.messages.push(message) return true } else if (this.capacity <= 0 && this.state.takers.size > 0) { this.messages.push(message) this.releaseTaker() return true } return false } this.messages.push(message) this.scheduleReleaseTaker() return true } offerAll(messages: Iterable<A>): Effect<Chunk.Chunk<A>> { return core.suspend(() => { if (this.state._tag !== "Open") { return core.succeed(Chunk.fromIterable(messages)) } const remaining = this.unsafeOfferAllArray(messages) if (remaining.length === 0) { return exitEmpty } else if (this.strategy === "dropping") { return core.succeed(Chunk.unsafeFromArray(remaining)) } return this.offerRemainingArray(remaining) }) } unsafeOfferAll(messages: Iterable<A>): Chunk.Chunk<A> { return Chunk.unsafeFromArray(this.unsafeOfferAllArray(messages)) } unsafeOfferAllArray(messages: Iterable<A>): Array<A> { if (this.state._tag !== "Open") { return Arr.fromIterable(messages) } else if (this.capacity === Number.POSITIVE_INFINITY || this.strategy === "sliding") { if (this.messages.length > 0) { this.messagesChunk = Chunk.appendAll(this.messagesChunk, Chunk.unsafeFromArray(this.messages)) } if (this.strategy === "sliding") { this.messagesChunk = this.messagesChunk.pipe( Chunk.appendAll(Chunk.fromIterable(messages)), Chunk.takeRight(this.capacity) ) } else if (Chunk.isChunk(messages)) { this.messagesChunk = Chunk.appendAll(this.messagesChunk, messages) } else { this.messages = Arr.fromIterable(messages) } this.scheduleReleaseTaker() return [] } const free = this.capacity <= 0 ? this.state.takers.size : this.capacity - this.messages.length - this.messagesChunk.length if (free === 0) { return Arr.fromIterable(messages) } const remaining: Array<A> = [] let i = 0 for (const message of messages) { if (i < free) { this.messages.push(message) } else { remaining.push(message) } i++ } this.scheduleReleaseTaker() return remaining } fail(error: E) { return this.done(core.exitFail(error)) } failCause(cause: Cause<E>) { return this.done(core.exitFailCause(cause)) } unsafeDone(exit: Exit<void, E>): boolean { if (this.state._tag !== "Open") { return false } else if (this.state.offers.size === 0 && this.messages.length === 0 && this.messagesChunk.length === 0) { this.finalize(exit) return true } this.state = { ...this.state, _tag: "Closing", exit } return true } shutdown: Effect<boolean> = core.sync(() => { if (this.state._tag === "Done") { return true } this.messages = [] this.messagesChunk = empty const offers = this.state.offers this.finalize(this.state._tag === "Open" ? core.exitVoid : this.state.exit) if (offers.size > 0) { for (const entry of offers) { if (entry._tag === "Single") { entry.resume(exitFalse) } else { entry.resume(core.exitSucceed(Chunk.unsafeFromArray(entry.remaining.slice(entry.offset)))) } } offers.clear() } return true }) done(exit: Exit<void, E>) { return core.sync(() => this.unsafeDone(exit)) } end = this.done(core.exitVoid) clear: Effect<Chunk.Chunk<A>, E> = core.suspend(() => { if (this.state._tag === "Done") { return core.exitAs(this.state.exit, empty) } const messages = this.unsafeTakeAll() this.releaseCapacity() return core.succeed(messages) }) takeAll: Effect<readonly [messages: Chunk.Chunk<A>, done: boolean], E> = core.suspend(() => { if (this.state._tag === "Done") { return core.exitAs(this.state.exit, constDone) } const messages = this.unsafeTakeAll() if (messages.length === 0) { return core.zipRight(this.awaitTake, this.takeAll) } return core.succeed([messages, this.releaseCapacity()]) }) takeN(n: number): Effect<readonly [messages: Chunk.Chunk<A>, done: boolean], E> { return core.suspend(() => { if (this.state._tag === "Done") { return core.exitAs(this.state.exit, constDone) } else if (n <= 0) { return core.succeed([empty, false]) } n = Math.min(n, this.capacity) let messages: Chunk.Chunk<A> if (n <= this.messagesChunk.length) { messages = Chunk.take(this.messagesChunk, n) this.messagesChunk = Chunk.drop(this.messagesChunk, n) } else if (n <= this.messages.length + this.messagesChunk.length) { this.messagesChunk = Chunk.appendAll(this.messagesChunk, Chunk.unsafeFromArray(this.messages)) this.messages = [] messages = Chunk.take(this.messagesChunk, n) this.messagesChunk = Chunk.drop(this.messagesChunk, n) } else { return core.zipRight(this.awaitTake, this.takeN(n)) } return core.succeed([messages, this.releaseCapacity()]) }) } unsafeTake(): Exit<A, E | NoSuchElementException> | undefined { if (this.state._tag === "Done") { return core.exitZipRight(this.state.exit, core.exitFail(new NoSuchElementException())) } let message: A if (this.messagesChunk.length > 0) { message = Chunk.unsafeHead(this.messagesChunk) this.messagesChunk = Chunk.drop(this.messagesChunk, 1) } else if (this.messages.length > 0) { message = this.messages[0] this.messagesChunk = Chunk.drop(Chunk.unsafeFromArray(this.messages), 1) this.messages = [] } else if (this.capacity <= 0 && this.state.offers.size > 0) { this.capacity = 1 this.releaseCapacity() this.capacity = 0 return this.messages.length > 0 ? core.exitSucceed(this.messages.pop()!) : undefined } else { return undefined } this.releaseCapacity() return core.exitSucceed(message) } take: Effect<A, E | NoSuchElementException> = core.suspend(() => this.unsafeTake() ?? core.zipRight(this.awaitTake, this.take) ) await: Effect<void, E> = core.asyncInterrupt<void, E>((resume) => { if (this.state._tag === "Done") { return resume(this.state.exit) } this.state.awaiters.add(resume) return core.sync(() => { if (this.state._tag !== "Done") { this.state.awaiters.delete(resume) } }) }) unsafeSize(): Option.Option<number> { const size = this.messages.length + this.messagesChunk.length return this.state._tag === "Done" ? Option.none() : Option.some(size) } size = core.sync(() => this.unsafeSize()) commit() { return this.takeAll } pipe() { return pipeArguments(this, arguments) } toJSON() { return { _id: "effect/Mailbox", state: this.state._tag, size: this.unsafeSize().toJSON() } } toString(): string { return Inspectable.format(this) } [Inspectable.NodeInspectSymbol]() { return Inspectable.format(this) } private offerRemainingSingle(message: A) { return core.asyncInterrupt<boolean>((resume) => { if (this.state._tag !== "Open") { return resume(exitFalse) } const entry: OfferEntry<A> = { _tag: "Single", message, resume } this.state.offers.add(entry) return core.sync(() => { if (this.state._tag === "Open") { this.state.offers.delete(entry) } }) }) } private offerRemainingArray(remaining: Array<A>) { return core.asyncInterrupt<Chunk.Chunk<A>>((resume) => { if (this.state._tag !== "Open") { return resume(core.exitSucceed(Chunk.unsafeFromArray(remaining))) } const entry: OfferEntry<A> = { _tag: "Array", remaining, offset: 0, resume } this.state.offers.add(entry) return core.sync(() => { if (this.state._tag === "Open") { this.state.offers.delete(entry) } }) }) } private releaseCapacity(): boolean { if (this.state._tag === "Done") { return this.state.exit._tag === "Success" } else if (this.state.offers.size === 0) { if (this.state._tag === "Closing" && this.messages.length === 0 && this.messagesChunk.length === 0) { this.finalize(this.state.exit) return this.state.exit._tag === "Success" } return false } let n = this.capacity - this.messages.length - this.messagesChunk.length for (const entry of this.state.offers) { if (n === 0) return false else if (entry._tag === "Single") { this.messages.push(entry.message) n-- entry.resume(exitTrue) this.state.offers.delete(entry) } else { for (; entry.offset < entry.remaining.length; entry.offset++) { if (n === 0) return false this.messages.push(entry.remaining[entry.offset]) n-- } entry.resume(exitEmpty) this.state.offers.delete(entry) } } return false } private awaitTake = core.asyncInterrupt<void, E>((resume) => { if (this.state._tag === "Done") { return resume(this.state.exit) } this.state.takers.add(resume) return core.sync(() => { if (this.state._tag !== "Done") { this.state.takers.delete(resume) } }) }) private scheduleRunning = false private scheduleReleaseTaker() { if (this.scheduleRunning) { return } this.scheduleRunning = true this.scheduler.scheduleTask(this.releaseTaker, 0) } private releaseTaker = () => { this.scheduleRunning = false if (this.state._tag === "Done") { return } else if (this.state.takers.size === 0) { return } const taker = Iterable.unsafeHead(this.state.takers) this.state.takers.delete(taker) taker(core.exitVoid) } private unsafeTakeAll() { if (this.messagesChunk.length > 0) { const messages = this.messages.length > 0 ? Chunk.appendAll(this.messagesChunk, Chunk.unsafeFromArray(this.messages)) : this.messagesChunk this.messagesChunk = empty this.messages = [] return messages } else if (this.messages.length > 0) { const messages = Chunk.unsafeFromArray(this.messages) this.messages = [] return messages } else if (this.state._tag !== "Done" && this.state.offers.size > 0) { this.capacity = 1 this.releaseCapacity() this.capacity = 0 return Chunk.of(this.messages.pop()!) } return empty } private finalize(exit: Exit<void, E>) { if (this.state._tag === "Done") { return } const openState = this.state this.state = { _tag: "Done", exit } for (const taker of openState.takers) { taker(exit) } openState.takers.clear() for (const awaiter of openState.awaiters) { awaiter(exit) } openState.awaiters.clear() } } /** @internal */ export const make = <A, E = never>( capacity?: number | { readonly capacity?: number | undefined readonly strategy?: "suspend" | "dropping" | "sliding" | undefined } | undefined ): Effect<Api.Mailbox<A, E>> => core.withFiberRuntime((fiber) => core.succeed( new MailboxImpl<A, E>( fiber.currentScheduler, typeof capacity === "number" ? capacity : capacity?.capacity ?? Number.POSITIVE_INFINITY, typeof capacity === "number" ? "suspend" : capacity?.strategy ?? "suspend" ) ) ) /** @internal */ export const into: { <A, E>( self: Api.Mailbox<A, E> ): <AX, EX extends E, RX>(effect: Effect<AX, EX, RX>) => Effect<boolean, never, RX> <AX, E, EX extends E, RX, A>( effect: Effect<AX, EX, RX>, self: Api.Mailbox<A, E> ): Effect<boolean, never, RX> } = dual( 2, <AX, E, EX extends E, RX, A>( effect: Effect<AX, EX, RX>, self: Api.Mailbox<A, E> ): Effect<boolean, never, RX> => core.uninterruptibleMask((restore) => core.matchCauseEffect(restore(effect), { onFailure: (cause) => self.failCause(cause), onSuccess: (_) => self.end }) ) ) /** @internal */ export const toChannel = <A, E>(self: Api.ReadonlyMailbox<A, E>): Channel<Chunk.Chunk<A>, unknown, E> => { const loop: Channel<Chunk.Chunk<A>, unknown, E> = coreChannel.flatMap(self.takeAll, ([messages, done]) => done ? messages.length === 0 ? coreChannel.void : coreChannel.write(messages) : channel.zipRight(coreChannel.write(messages), loop)) return loop } /** @internal */ export const toStream = <A, E>(self: Api.ReadonlyMailbox<A, E>): Stream<A, E> => stream.fromChannel(toChannel(self)) /** @internal */ export const fromStream: { (options?: { readonly capacity?: number | undefined readonly strategy?: "suspend" | "dropping" | "sliding" | undefined }): <A, E, R>(self: Stream<A, E, R>) => Effect<Api.ReadonlyMailbox<A, E>, never, R | Scope> <A, E, R>( self: Stream<A, E, R>, options?: { readonly capacity?: number | undefined readonly strategy?: "suspend" | "dropping" | "sliding" | undefined } ): Effect<Api.ReadonlyMailbox<A, E>, never, R | Scope> } = dual((args) => stream.isStream(args[0]), <A, E, R>( self: Stream<A, E, R>, options?: { readonly capacity?: number | undefined readonly strategy?: "suspend" | "dropping" | "sliding" | undefined } ): Effect<Api.ReadonlyMailbox<A, E>, never, R | Scope> => core.tap( fiberRuntime.acquireRelease( make<A, E>(options), (mailbox) => mailbox.shutdown ), (mailbox) => { const writer: Channel<never, Chunk.Chunk<A>, never, E> = coreChannel.readWithCause({ onInput: (input: Chunk.Chunk<A>) => coreChannel.flatMap(mailbox.offerAll(input), () => writer), onFailure: (cause: Cause<E>) => mailbox.failCause(cause), onDone: () => mailbox.end }) return fiberRuntime.scopeWith((scope) => stream.toChannel(self).pipe( coreChannel.pipeTo(writer), channelExecutor.runIn(scope), circular.forkIn(scope) ) ) } ))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ssv445/lorem-ipsum-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server