Skip to main content
Glama
ssv445

Lorem Ipsum MCP Server

by ssv445
pool.ts14.6 kB
import type { Cause } from "../Cause.js" import * as Context from "../Context.js" import * as Duration from "../Duration.js" import type { Effect, Semaphore } from "../Effect.js" import * as Effectable from "../Effectable.js" import type { Exit } from "../Exit.js" import { dual, identity } from "../Function.js" import * as Iterable from "../Iterable.js" import * as Option from "../Option.js" import { pipeArguments } from "../Pipeable.js" import type { Pool, PoolTypeId as PoolTypeId_ } from "../Pool.js" import { hasProperty } from "../Predicate.js" import type { Scope } from "../Scope.js" import * as coreEffect from "./core-effect.js" import * as core from "./core.js" import * as defaultServices from "./defaultServices.js" import * as circular from "./effect/circular.js" import * as fiberRuntime from "./fiberRuntime.js" import * as internalQueue from "./queue.js" /** @internal */ export const PoolTypeId: PoolTypeId_ = Symbol.for("effect/Pool") as PoolTypeId_ const poolVariance = { /* c8 ignore next */ _E: (_: never) => _, /* c8 ignore next */ _A: (_: any) => _ } /** @internal */ export const isPool = (u: unknown): u is Pool<unknown, unknown> => hasProperty(u, PoolTypeId) /** @internal */ export const makeWith = <A, E, R>(options: { readonly acquire: Effect<A, E, R> readonly min: number readonly max: number readonly concurrency?: number | undefined readonly targetUtilization?: number | undefined readonly strategy: Strategy<A, E> }): Effect<Pool<A, E>, never, Scope | R> => core.uninterruptibleMask((restore) => core.flatMap(core.context<R | Scope>(), (context) => { const scope = Context.get(context, fiberRuntime.scopeTag) const acquire = core.mapInputContext( options.acquire, (input) => Context.merge(context, input) ) as Effect< A, E, Scope > const pool = new PoolImpl<A, E>( scope, acquire, options.concurrency ?? 1, options.min, options.max, options.strategy, Math.min(Math.max(options.targetUtilization ?? 1, 0.1), 1) ) const initialize = core.tap(fiberRuntime.forkDaemon(restore(pool.resize)), (fiber) => scope.addFinalizer(() => core.interruptFiber(fiber))) const runStrategy = core.tap(fiberRuntime.forkDaemon(restore(options.strategy.run(pool))), (fiber) => scope.addFinalizer(() => core.interruptFiber(fiber) )) return core.succeed(pool).pipe( core.zipLeft(scope.addFinalizer(() => pool.shutdown )), core.zipLeft(initialize), core.zipLeft(runStrategy) ) }) ) /** @internal */ export const make = <A, E, R>(options: { readonly acquire: Effect<A, E, R> readonly size: number readonly concurrency?: number | undefined readonly targetUtilization?: number | undefined }): Effect<Pool<A, E>, never, R | Scope> => makeWith({ ...options, min: options.size, max: options.size, strategy: strategyNoop() }) /** @internal */ export const makeWithTTL = <A, E, R>(options: { readonly acquire: Effect<A, E, R> readonly min: number readonly max: number readonly concurrency?: number | undefined readonly targetUtilization?: number | undefined readonly timeToLive: Duration.DurationInput readonly timeToLiveStrategy?: "creation" | "usage" | undefined }): Effect<Pool<A, E>, never, R | Scope> => core.flatMap( options.timeToLiveStrategy === "creation" ? strategyCreationTTL<A, E>(options.timeToLive) : strategyUsageTTL<A, E>(options.timeToLive), (strategy) => makeWith({ ...options, strategy }) ) /** @internal */ export const get = <A, E>(self: Pool<A, E>): Effect<A, E, Scope> => self.get /** @internal */ export const invalidate: { <A>(item: A): <E>(self: Pool<A, E>) => Effect<void> <A, E>(self: Pool<A, E>, item: A): Effect<void> } = dual(2, <A, E>(self: Pool<A, E>, item: A): Effect<void> => self.invalidate(item)) interface PoolItem<A, E> { readonly exit: Exit<A, E> finalizer: Effect<void> refCount: number disableReclaim: boolean } interface Strategy<A, E> { readonly run: (pool: PoolImpl<A, E>) => Effect<void> readonly onAcquire: (item: PoolItem<A, E>) => Effect<void> readonly reclaim: (pool: PoolImpl<A, E>) => Effect<Option.Option<PoolItem<A, E>>> } class PoolImpl<A, E> extends Effectable.Class<A, E, Scope> implements Pool<A, E> { readonly [PoolTypeId]: Pool.Variance<A, E>[PoolTypeId_] isShuttingDown = false readonly semaphore: Semaphore readonly items = new Set<PoolItem<A, E>>() readonly available = new Set<PoolItem<A, E>>() readonly availableLatch = circular.unsafeMakeLatch(false) readonly invalidated = new Set<PoolItem<A, E>>() waiters = 0 constructor( readonly scope: Scope, readonly acquire: Effect<A, E, Scope>, readonly concurrency: number, readonly minSize: number, readonly maxSize: number, readonly strategy: Strategy<A, E>, readonly targetUtilization: number ) { super() this[PoolTypeId] = poolVariance this.semaphore = circular.unsafeMakeSemaphore(concurrency * maxSize) } readonly allocate: Effect<PoolItem<A, E>> = core.acquireUseRelease( fiberRuntime.scopeMake(), (scope) => this.acquire.pipe( fiberRuntime.scopeExtend(scope), core.exit, core.flatMap((exit) => { const item: PoolItem<A, E> = { exit, finalizer: core.catchAllCause(scope.close(exit), reportUnhandledError), refCount: 0, disableReclaim: false } this.items.add(item) this.available.add(item) return core.as( exit._tag === "Success" ? this.strategy.onAcquire(item) : core.zipRight(item.finalizer, this.strategy.onAcquire(item)), item ) }) ), (scope, exit) => exit._tag === "Failure" ? scope.close(exit) : core.void ) get currentUsage() { let count = this.waiters for (const item of this.items) { count += item.refCount } return count } get targetSize() { if (this.isShuttingDown) return 0 const utilization = this.currentUsage / this.targetUtilization const target = Math.ceil(utilization / this.concurrency) return Math.min(Math.max(this.minSize, target), this.maxSize) } get activeSize() { return this.items.size - this.invalidated.size } readonly resizeLoop: Effect<void> = core.suspend(() => { if (this.activeSize >= this.targetSize) { return core.void } const toAcquire = this.targetSize - this.activeSize return this.strategy.reclaim(this).pipe( core.flatMap(Option.match({ onNone: () => this.allocate, onSome: core.succeed })), fiberRuntime.replicateEffect(toAcquire, { concurrency: toAcquire }), core.zipLeft(this.availableLatch.open), core.flatMap((items) => items.some((_) => _.exit._tag === "Failure") ? core.void : this.resizeLoop) ) }) readonly resizeSemaphore = circular.unsafeMakeSemaphore(1) readonly resize = this.resizeSemaphore.withPermits(1)(this.resizeLoop) readonly getPoolItem: Effect<PoolItem<A, E>, never, Scope> = core.uninterruptibleMask((restore) => restore(this.semaphore.take(1)).pipe( core.zipRight(fiberRuntime.scopeTag), core.flatMap((scope) => core.suspend(() => { this.waiters++ if (this.isShuttingDown) { return core.interrupt } else if (this.targetSize > this.activeSize) { // eslint-disable-next-line @typescript-eslint/no-this-alias const self = this return core.flatMap( this.resizeSemaphore.withPermitsIfAvailable(1)( circular.forkIn(core.interruptible(this.resize), this.scope) ), function loop(): Effect<PoolItem<A, E>> { if (self.isShuttingDown) { return core.interrupt } else if (self.available.size > 0) { return core.succeed(Iterable.unsafeHead(self.available)) } self.availableLatch.unsafeClose() return core.flatMap(self.availableLatch.await, loop) } ) } return core.succeed(Iterable.unsafeHead(this.available)) }).pipe( fiberRuntime.ensuring(core.sync(() => this.waiters--)), core.tap((item) => { if (item.exit._tag === "Failure") { this.items.delete(item) this.invalidated.delete(item) this.available.delete(item) return this.semaphore.release(1) } item.refCount++ this.available.delete(item) if (item.refCount < this.concurrency) { this.available.add(item) } return scope.addFinalizer(() => core.zipRight( core.suspend(() => { item.refCount-- if (this.invalidated.has(item)) { return this.invalidatePoolItem(item) } this.available.add(item) return core.exitVoid }), this.semaphore.release(1) ) ) }), core.onInterrupt(() => this.semaphore.release(1)) ) ) ) ) commit() { return this.get } readonly get: Effect<A, E, Scope> = core.flatMap( core.suspend(() => this.isShuttingDown ? core.interrupt : this.getPoolItem), (_) => _.exit ) invalidate(item: A): Effect<void> { return core.suspend(() => { if (this.isShuttingDown) return core.void for (const poolItem of this.items) { if (poolItem.exit._tag === "Success" && poolItem.exit.value === item) { poolItem.disableReclaim = true return core.uninterruptible(this.invalidatePoolItem(poolItem)) } } return core.void }) } invalidatePoolItem(poolItem: PoolItem<A, E>): Effect<void> { return core.suspend(() => { if (!this.items.has(poolItem)) { return core.void } else if (poolItem.refCount === 0) { this.items.delete(poolItem) this.available.delete(poolItem) this.invalidated.delete(poolItem) return core.zipRight( poolItem.finalizer, circular.forkIn(core.interruptible(this.resize), this.scope) ) } this.invalidated.add(poolItem) this.available.delete(poolItem) return core.void }) } get shutdown(): Effect<void> { return core.suspend(() => { if (this.isShuttingDown) return core.void this.isShuttingDown = true const size = this.items.size const semaphore = circular.unsafeMakeSemaphore(size) return core.forEachSequentialDiscard(this.items, (item) => { if (item.refCount > 0) { item.finalizer = core.zipLeft(item.finalizer, semaphore.release(1)) this.invalidated.add(item) return semaphore.take(1) } this.items.delete(item) this.available.delete(item) this.invalidated.delete(item) return item.finalizer }).pipe( core.zipRight(this.semaphore.releaseAll), core.zipRight(this.availableLatch.open), core.zipRight(semaphore.take(size)) ) }) } pipe() { return pipeArguments(this, arguments) } } const strategyNoop = <A, E>(): Strategy<A, E> => ({ run: (_) => core.void, onAcquire: (_) => core.void, reclaim: (_) => coreEffect.succeedNone }) const strategyCreationTTL = <A, E>(ttl: Duration.DurationInput) => defaultServices.clockWith((clock) => core.map(internalQueue.unbounded<PoolItem<A, E>>(), (queue) => { const ttlMillis = Duration.toMillis(ttl) const creationTimes = new WeakMap<PoolItem<A, E>, number>() return identity<Strategy<A, E>>({ run: (pool) => { const process = (item: PoolItem<A, E>): Effect<void> => core.suspend(() => { if (!pool.items.has(item) || pool.invalidated.has(item)) { return core.void } const now = clock.unsafeCurrentTimeMillis() const created = creationTimes.get(item)! const remaining = ttlMillis - (now - created) return remaining > 0 ? coreEffect.delay(process(item), remaining) : pool.invalidatePoolItem(item) }) return queue.take.pipe( core.tap(process), coreEffect.forever ) }, onAcquire: (item) => core.suspend(() => { creationTimes.set(item, clock.unsafeCurrentTimeMillis()) return queue.offer(item) }), reclaim: (_) => coreEffect.succeedNone }) }) ) const strategyUsageTTL = <A, E>(ttl: Duration.DurationInput) => core.map(internalQueue.unbounded<PoolItem<A, E>>(), (queue) => { return identity<Strategy<A, E>>({ run: (pool) => { const process: Effect<void> = core.suspend(() => { const excess = pool.activeSize - pool.targetSize if (excess <= 0) return core.void return queue.take.pipe( core.tap((item) => pool.invalidatePoolItem(item)), core.zipRight(process) ) }) return process.pipe( coreEffect.delay(ttl), coreEffect.forever ) }, onAcquire: (item) => queue.offer(item), reclaim(pool) { return core.suspend((): Effect<Option.Option<PoolItem<A, E>>> => { if (pool.invalidated.size === 0) { return coreEffect.succeedNone } const item = Iterable.head( Iterable.filter(pool.invalidated, (item) => !item.disableReclaim) ) if (item._tag === "None") { return coreEffect.succeedNone } pool.invalidated.delete(item.value) if (item.value.refCount < pool.concurrency) { pool.available.add(item.value) } return core.as(queue.offer(item.value), item) }) } }) }) const reportUnhandledError = <E>(cause: Cause<E>) => core.withFiberRuntime<void>((fiber) => { const unhandledLogLevel = fiber.getFiberRef(core.currentUnhandledErrorLogLevel) if (unhandledLogLevel._tag === "Some") { fiber.log("Unhandled error in pool finalizer", cause, unhandledLogLevel) } return core.void })

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ssv445/lorem-ipsum-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server