Skip to main content
Glama
ssv445

Lorem Ipsum MCP Server

by ssv445
tPubSub.ts19.5 kB
import * as RA from "../../Array.js" import * as Effect from "../../Effect.js" import { dual, identity, pipe } from "../../Function.js" import * as HashSet from "../../HashSet.js" import * as Option from "../../Option.js" import type * as Scope from "../../Scope.js" import type * as STM from "../../STM.js" import type * as TPubSub from "../../TPubSub.js" import type * as TQueue from "../../TQueue.js" import type * as TRef from "../../TRef.js" import * as core from "./core.js" import * as OpCodes from "./opCodes/strategy.js" import * as stm from "./stm.js" import * as tQueue from "./tQueue.js" import * as tRef from "./tRef.js" /** @internal */ const TPubSubSymbolKey = "effect/TPubSub" /** @internal */ export const TPubSubTypeId: TPubSub.TPubSubTypeId = Symbol.for(TPubSubSymbolKey) as TPubSub.TPubSubTypeId const AbsentValue = Symbol.for("effect/TPubSub/AbsentValue") type AbsentValue = typeof AbsentValue /** @internal */ export interface Node<in out A> { readonly head: A | AbsentValue readonly subscribers: number readonly tail: TRef.TRef<Node<A> | undefined> } /** @internal */ export const makeNode = <A>( head: A | AbsentValue, subscribers: number, tail: TRef.TRef<Node<A> | undefined> ): Node<A> => ({ head, subscribers, tail }) /** @internal */ class TPubSubImpl<in out A> implements TPubSub.TPubSub<A> { readonly [TPubSubTypeId] = { _A: (_: any) => _ } readonly [tQueue.TEnqueueTypeId] = tQueue.tEnqueueVariance constructor( readonly pubsubSize: TRef.TRef<number>, readonly publisherHead: TRef.TRef<TRef.TRef<Node<A> | undefined>>, readonly publisherTail: TRef.TRef<TRef.TRef<Node<A> | undefined> | undefined>, readonly requestedCapacity: number, readonly strategy: tQueue.TQueueStrategy, readonly subscriberCount: TRef.TRef<number>, readonly subscribers: TRef.TRef<HashSet.HashSet<TRef.TRef<TRef.TRef<Node<A>> | undefined>>> ) {} isShutdown: STM.STM<boolean> = core.effect<never, boolean>((journal) => { const currentPublisherTail = tRef.unsafeGet(this.publisherTail, journal) return currentPublisherTail === undefined }) awaitShutdown: STM.STM<void> = core.flatMap( this.isShutdown, (isShutdown) => isShutdown ? stm.void : core.retry ) capacity(): number { return this.requestedCapacity } size: STM.STM<number> = core.withSTMRuntime((runtime) => { const currentPublisherTail = tRef.unsafeGet(this.publisherTail, runtime.journal) if (currentPublisherTail === undefined) { return core.interruptAs(runtime.fiberId) } return core.succeed(tRef.unsafeGet(this.pubsubSize, runtime.journal)) }) isEmpty: STM.STM<boolean> = core.map(this.size, (size) => size === 0) isFull: STM.STM<boolean> = core.map(this.size, (size) => size === this.capacity()) offer(value: A): STM.STM<boolean> { return core.withSTMRuntime((runtime) => { const currentPublisherTail = tRef.unsafeGet(this.publisherTail, runtime.journal) if (currentPublisherTail === undefined) { return core.interruptAs(runtime.fiberId) } const currentSubscriberCount = tRef.unsafeGet(this.subscriberCount, runtime.journal) if (currentSubscriberCount === 0) { return core.succeed(true) } const currentPubSubSize = tRef.unsafeGet(this.pubsubSize, runtime.journal) if (currentPubSubSize < this.requestedCapacity) { const updatedPublisherTail: TRef.TRef<Node<A> | undefined> = new tRef.TRefImpl<Node<A> | undefined>(void 0) const updatedNode = makeNode(value, currentSubscriberCount, updatedPublisherTail) tRef.unsafeSet<Node<A> | undefined>(currentPublisherTail, updatedNode, runtime.journal) tRef.unsafeSet<TRef.TRef<Node<A> | undefined> | undefined>( this.publisherTail, updatedPublisherTail, runtime.journal ) tRef.unsafeSet(this.pubsubSize, currentPubSubSize + 1, runtime.journal) return core.succeed(true) } switch (this.strategy._tag) { case OpCodes.OP_BACKPRESSURE_STRATEGY: { return core.retry } case OpCodes.OP_DROPPING_STRATEGY: { return core.succeed(false) } case OpCodes.OP_SLIDING_STRATEGY: { if (this.requestedCapacity > 0) { let currentPublisherHead: TRef.TRef<Node<A> | undefined> = tRef.unsafeGet( this.publisherHead, runtime.journal ) let loop = true while (loop) { const node = tRef.unsafeGet(currentPublisherHead, runtime.journal) if (node === undefined) { return core.retry } const head = node.head const tail = node.tail if (head !== AbsentValue) { const updatedNode = makeNode<A>(AbsentValue, node.subscribers, node.tail as any) tRef.unsafeSet<Node<A> | undefined>( currentPublisherHead as any, updatedNode as any, runtime.journal ) tRef.unsafeSet(this.publisherHead, tail, runtime.journal) loop = false } else { currentPublisherHead = tail } } } const updatedPublisherTail: TRef.TRef<Node<A> | undefined> = new tRef.TRefImpl<Node<A> | undefined>(void 0) const updatedNode = makeNode(value, currentSubscriberCount, updatedPublisherTail) tRef.unsafeSet<Node<A> | undefined>(currentPublisherTail, updatedNode, runtime.journal) tRef.unsafeSet<TRef.TRef<Node<A> | undefined> | undefined>( this.publisherTail, updatedPublisherTail, runtime.journal ) return core.succeed(true) } } }) } offerAll(iterable: Iterable<A>): STM.STM<boolean> { return core.map( stm.forEach(iterable, (a) => this.offer(a)), RA.every(identity) ) } shutdown: STM.STM<void> = core.effect<never, void>((journal) => { const currentPublisherTail = tRef.unsafeGet(this.publisherTail, journal) if (currentPublisherTail !== undefined) { tRef.unsafeSet<TRef.TRef<Node<A> | undefined> | undefined>(this.publisherTail, void 0, journal) const currentSubscribers = tRef.unsafeGet(this.subscribers, journal) HashSet.forEach(currentSubscribers, (subscriber) => { tRef.unsafeSet<TRef.TRef<Node<A>> | undefined>(subscriber, void 0, journal) }) tRef.unsafeSet(this.subscribers, HashSet.empty<TRef.TRef<TRef.TRef<Node<A>> | undefined>>(), journal) } }) } /** @internal */ class TPubSubSubscriptionImpl<in out A> implements TQueue.TDequeue<A> { readonly [TPubSubTypeId]: TPubSub.TPubSubTypeId = TPubSubTypeId readonly [tQueue.TDequeueTypeId] = tQueue.tDequeueVariance constructor( readonly pubsubSize: TRef.TRef<number>, readonly publisherHead: TRef.TRef<TRef.TRef<Node<A> | undefined>>, readonly requestedCapacity: number, readonly subscriberHead: TRef.TRef<TRef.TRef<Node<A> | undefined> | undefined>, readonly subscriberCount: TRef.TRef<number>, readonly subscribers: TRef.TRef<HashSet.HashSet<TRef.TRef<TRef.TRef<Node<A>> | undefined>>> ) {} isShutdown: STM.STM<boolean> = core.effect<never, boolean>((journal) => { const currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, journal) return currentSubscriberHead === undefined }) awaitShutdown: STM.STM<void> = core.flatMap( this.isShutdown, (isShutdown) => isShutdown ? stm.void : core.retry ) capacity(): number { return this.requestedCapacity } size: STM.STM<number> = core.withSTMRuntime((runtime) => { let currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, runtime.journal) if (currentSubscriberHead === undefined) { return core.interruptAs(runtime.fiberId) } let loop = true let size = 0 while (loop) { const node = tRef.unsafeGet(currentSubscriberHead, runtime.journal) if (node === undefined) { loop = false } else { const head = node.head const tail: TRef.TRef<Node<A> | undefined> = node.tail if (head !== AbsentValue) { size = size + 1 if (size >= Number.MAX_SAFE_INTEGER) { loop = false } } currentSubscriberHead = tail } } return core.succeed(size) }) isEmpty: STM.STM<boolean> = core.map(this.size, (size) => size === 0) isFull: STM.STM<boolean> = core.map(this.size, (size) => size === this.capacity()) peek: STM.STM<A> = core.withSTMRuntime((runtime) => { let currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, runtime.journal) if (currentSubscriberHead === undefined) { return core.interruptAs(runtime.fiberId) } let value: A | AbsentValue = AbsentValue let loop = true while (loop) { const node = tRef.unsafeGet(currentSubscriberHead, runtime.journal) if (node === undefined) { return core.retry } const head = node.head const tail: TRef.TRef<Node<A> | undefined> = node.tail if (head !== AbsentValue) { value = head loop = false } else { currentSubscriberHead = tail } } return core.succeed(value as A) }) peekOption: STM.STM<Option.Option<A>> = core.withSTMRuntime((runtime) => { let currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, runtime.journal) if (currentSubscriberHead === undefined) { return core.interruptAs(runtime.fiberId) } let value: Option.Option<A> = Option.none() let loop = true while (loop) { const node = tRef.unsafeGet(currentSubscriberHead, runtime.journal) if (node === undefined) { value = Option.none() loop = false } else { const head = node.head const tail: TRef.TRef<Node<A> | undefined> = node.tail if (head !== AbsentValue) { value = Option.some(head) loop = false } else { currentSubscriberHead = tail } } } return core.succeed(value) }) shutdown: STM.STM<void> = core.effect<never, void>((journal) => { let currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, journal) if (currentSubscriberHead !== undefined) { tRef.unsafeSet<TRef.TRef<Node<A> | undefined> | undefined>(this.subscriberHead, void 0, journal) let loop = true while (loop) { const node = tRef.unsafeGet(currentSubscriberHead, journal) if (node === undefined) { loop = false } else { const head = node.head const tail: TRef.TRef<Node<A> | undefined> = node.tail if (head !== AbsentValue) { const subscribers = node.subscribers if (subscribers === 1) { const size = tRef.unsafeGet(this.pubsubSize, journal) const updatedNode = makeNode<A>(AbsentValue, 0, tail) tRef.unsafeSet<Node<A> | undefined>(currentSubscriberHead, updatedNode, journal) tRef.unsafeSet(this.publisherHead, tail as any, journal) tRef.unsafeSet(this.pubsubSize, size - 1, journal) } else { const updatedNode = makeNode(head, subscribers - 1, tail) tRef.unsafeSet<Node<A> | undefined>(currentSubscriberHead, updatedNode, journal) } } currentSubscriberHead = tail } } const currentSubscriberCount = tRef.unsafeGet(this.subscriberCount, journal) tRef.unsafeSet(this.subscriberCount, currentSubscriberCount - 1, journal) tRef.unsafeSet( this.subscribers, HashSet.remove( tRef.unsafeGet(this.subscribers, journal), this.subscriberHead as any ), journal ) } }) take: STM.STM<A> = core.withSTMRuntime((runtime) => { let currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, runtime.journal) if (currentSubscriberHead === undefined) { return core.interruptAs(runtime.fiberId) } let value: A | AbsentValue = AbsentValue let loop = true while (loop) { const node = tRef.unsafeGet(currentSubscriberHead, runtime.journal) if (node === undefined) { return core.retry } const head = node.head const tail: TRef.TRef<Node<A> | undefined> = node.tail if (head !== AbsentValue) { const subscribers = node.subscribers if (subscribers === 1) { const size = tRef.unsafeGet(this.pubsubSize, runtime.journal) const updatedNode = makeNode<A>(AbsentValue, 0, tail) tRef.unsafeSet<Node<A> | undefined>(currentSubscriberHead, updatedNode, runtime.journal) tRef.unsafeSet(this.publisherHead, tail as any, runtime.journal) tRef.unsafeSet(this.pubsubSize, size - 1, runtime.journal) } else { const updatedNode = makeNode(head, subscribers - 1, tail) tRef.unsafeSet<Node<A> | undefined>(currentSubscriberHead, updatedNode, runtime.journal) } tRef.unsafeSet<TRef.TRef<Node<A> | undefined> | undefined>( this.subscriberHead, tail, runtime.journal ) value = head loop = false } else { currentSubscriberHead = tail } } return core.succeed(value as A) }) takeAll: STM.STM<Array<A>> = this.takeUpTo(Number.POSITIVE_INFINITY) takeUpTo(max: number): STM.STM<Array<A>> { return core.withSTMRuntime((runtime) => { let currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, runtime.journal) if (currentSubscriberHead === undefined) { return core.interruptAs(runtime.fiberId) } const builder: Array<A> = [] let n = 0 while (n !== max) { const node = tRef.unsafeGet(currentSubscriberHead, runtime.journal) if (node === undefined) { n = max } else { const head = node.head const tail: TRef.TRef<Node<A> | undefined> = node.tail if (head !== AbsentValue) { const subscribers = node.subscribers if (subscribers === 1) { const size = tRef.unsafeGet(this.pubsubSize, runtime.journal) const updatedNode = makeNode<A>(AbsentValue, 0, tail) tRef.unsafeSet<Node<A> | undefined>(currentSubscriberHead, updatedNode, runtime.journal) tRef.unsafeSet(this.publisherHead, tail as any, runtime.journal) tRef.unsafeSet(this.pubsubSize, size - 1, runtime.journal) } else { const updatedNode = makeNode(head, subscribers - 1, tail) tRef.unsafeSet<Node<A> | undefined>(currentSubscriberHead, updatedNode, runtime.journal) } builder.push(head) n = n + 1 } currentSubscriberHead = tail } } tRef.unsafeSet<TRef.TRef<Node<A> | undefined> | undefined>( this.subscriberHead, currentSubscriberHead, runtime.journal ) return core.succeed(builder) }) } } /** @internal */ const makeTPubSub = <A>( requestedCapacity: number, strategy: tQueue.TQueueStrategy ): STM.STM<TPubSub.TPubSub<A>> => pipe( stm.all([ tRef.make<Node<A> | undefined>(void 0), tRef.make(0) ]), core.flatMap(([empty, pubsubSize]) => pipe( stm.all([ tRef.make(empty), tRef.make(empty), tRef.make(0), tRef.make(HashSet.empty()) ]), core.map(([publisherHead, publisherTail, subscriberCount, subscribers]) => new TPubSubImpl( pubsubSize, publisherHead, publisherTail as any, requestedCapacity, strategy, subscriberCount, subscribers as any ) ) ) ) ) const makeSubscription = <A>( pubsubSize: TRef.TRef<number>, publisherHead: TRef.TRef<TRef.TRef<Node<A> | undefined>>, publisherTail: TRef.TRef<TRef.TRef<Node<A> | undefined> | undefined>, requestedCapacity: number, subscriberCount: TRef.TRef<number>, subscribers: TRef.TRef<HashSet.HashSet<TRef.TRef<TRef.TRef<Node<A>> | undefined>>> ): STM.STM<TQueue.TDequeue<A>> => pipe( tRef.get(publisherTail), core.flatMap((currentPublisherTail) => pipe( stm.all([ tRef.make(currentPublisherTail), tRef.get(subscriberCount), tRef.get(subscribers) ]), stm.tap(([_, currentSubscriberCount]) => pipe( subscriberCount, tRef.set(currentSubscriberCount + 1) ) ), stm.tap(([subscriberHead, _, currentSubscribers]) => pipe( subscribers as any, tRef.set(pipe(currentSubscribers as any, HashSet.add(subscriberHead))) ) ), core.map(([subscriberHead]) => new TPubSubSubscriptionImpl( pubsubSize, publisherHead, requestedCapacity, subscriberHead as any, subscriberCount, subscribers ) ) ) ) ) /** @internal */ export const awaitShutdown = <A>(self: TPubSub.TPubSub<A>): STM.STM<void> => self.awaitShutdown /** @internal */ export const bounded = <A>(requestedCapacity: number): STM.STM<TPubSub.TPubSub<A>> => makeTPubSub<A>(requestedCapacity, tQueue.BackPressure) /** @internal */ export const capacity = <A>(self: TPubSub.TPubSub<A>): number => self.capacity() /** @internal */ export const dropping = <A>(requestedCapacity: number): STM.STM<TPubSub.TPubSub<A>> => makeTPubSub<A>(requestedCapacity, tQueue.Dropping) /** @internal */ export const isEmpty = <A>(self: TPubSub.TPubSub<A>): STM.STM<boolean> => self.isEmpty /** @internal */ export const isFull = <A>(self: TPubSub.TPubSub<A>): STM.STM<boolean> => self.isFull /** @internal */ export const isShutdown = <A>(self: TPubSub.TPubSub<A>): STM.STM<boolean> => self.isShutdown /** @internal */ export const publish = dual< <A>(value: A) => (self: TPubSub.TPubSub<A>) => STM.STM<boolean>, <A>(self: TPubSub.TPubSub<A>, value: A) => STM.STM<boolean> >(2, (self, value) => self.offer(value)) /** @internal */ export const publishAll = dual< <A>(iterable: Iterable<A>) => (self: TPubSub.TPubSub<A>) => STM.STM<boolean>, <A>(self: TPubSub.TPubSub<A>, iterable: Iterable<A>) => STM.STM<boolean> >(2, (self, iterable) => self.offerAll(iterable)) /** @internal */ export const size = <A>(self: TPubSub.TPubSub<A>): STM.STM<number> => self.size /** @internal */ export const shutdown = <A>(self: TPubSub.TPubSub<A>): STM.STM<void> => self.shutdown /** @internal */ export const sliding = <A>(requestedCapacity: number): STM.STM<TPubSub.TPubSub<A>> => makeTPubSub<A>(requestedCapacity, tQueue.Sliding) /** @internal */ export const subscribe = <A>(self: TPubSub.TPubSub<A>): STM.STM<TQueue.TDequeue<A>> => makeSubscription( self.pubsubSize, self.publisherHead, self.publisherTail, self.requestedCapacity, self.subscriberCount, self.subscribers ) /** @internal */ export const subscribeScoped = <A>(self: TPubSub.TPubSub<A>): Effect.Effect<TQueue.TDequeue<A>, never, Scope.Scope> => Effect.acquireRelease( subscribe(self), (dequeue) => tQueue.shutdown(dequeue) ) /** @internal */ export const unbounded = <A>(): STM.STM<TPubSub.TPubSub<A>> => makeTPubSub<A>(Number.MAX_SAFE_INTEGER, tQueue.Dropping)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ssv445/lorem-ipsum-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server