import * as Chunk from "../Chunk.js";
import * as Effectable from "../Effectable.js";
import { dual, pipe } from "../Function.js";
import * as MutableQueue from "../MutableQueue.js";
import * as MutableRef from "../MutableRef.js";
import { nextPow2 } from "../Number.js";
import * as Option from "../Option.js";
import { pipeArguments } from "../Pipeable.js";
import * as core from "./core.js";
import * as executionStrategy from "./executionStrategy.js";
import * as fiberRuntime from "./fiberRuntime.js";
import * as queue from "./queue.js";
const AbsentValue = /*#__PURE__*/Symbol.for("effect/PubSub/AbsentValue");
const addSubscribers = (subscription, pollers) => subscribers => {
if (!subscribers.has(subscription)) {
subscribers.set(subscription, new Set());
}
const set = subscribers.get(subscription);
set.add(pollers);
};
const removeSubscribers = (subscription, pollers) => subscribers => {
if (!subscribers.has(subscription)) {
return;
}
const set = subscribers.get(subscription);
set.delete(pollers);
if (set.size === 0) {
subscribers.delete(subscription);
}
};
/** @internal */
export const bounded = capacity => core.suspend(() => {
const pubsub = makeBoundedPubSub(capacity);
return makePubSub(pubsub, new BackPressureStrategy());
});
/** @internal */
export const dropping = capacity => core.suspend(() => {
const pubsub = makeBoundedPubSub(capacity);
return makePubSub(pubsub, new DroppingStrategy());
});
/** @internal */
export const sliding = capacity => core.suspend(() => {
const pubsub = makeBoundedPubSub(capacity);
return makePubSub(pubsub, new SlidingStrategy());
});
/** @internal */
export const unbounded = options => core.suspend(() => {
const pubsub = makeUnboundedPubSub(options);
return makePubSub(pubsub, new DroppingStrategy());
});
/** @internal */
export const capacity = self => self.capacity();
/** @internal */
export const size = self => self.size;
/** @internal */
export const isFull = self => self.isFull;
/** @internal */
export const isEmpty = self => self.isEmpty;
/** @internal */
export const shutdown = self => self.shutdown;
/** @internal */
export const isShutdown = self => self.isShutdown;
/** @internal */
export const awaitShutdown = self => self.awaitShutdown;
/** @internal */
export const publish = /*#__PURE__*/dual(2, (self, value) => self.publish(value));
/** @internal */
export const publishAll = /*#__PURE__*/dual(2, (self, elements) => self.publishAll(elements));
/** @internal */
export const subscribe = self => self.subscribe;
/** @internal */
const makeBoundedPubSub = capacity => {
const options = typeof capacity === "number" ? {
capacity
} : capacity;
ensureCapacity(options.capacity);
const replayBuffer = options.replay && options.replay > 0 ? new ReplayBuffer(Math.ceil(options.replay)) : undefined;
if (options.capacity === 1) {
return new BoundedPubSubSingle(replayBuffer);
} else if (nextPow2(options.capacity) === options.capacity) {
return new BoundedPubSubPow2(options.capacity, replayBuffer);
} else {
return new BoundedPubSubArb(options.capacity, replayBuffer);
}
};
/** @internal */
const makeUnboundedPubSub = options => new UnboundedPubSub(options?.replay ? new ReplayBuffer(options.replay) : undefined);
/** @internal */
const makeSubscription = (pubsub, subscribers, strategy) => core.map(core.deferredMake(), deferred => unsafeMakeSubscription(pubsub, subscribers, pubsub.subscribe(), MutableQueue.unbounded(), deferred, MutableRef.make(false), strategy));
/** @internal */
export const unsafeMakeSubscription = (pubsub, subscribers, subscription, pollers, shutdownHook, shutdownFlag, strategy) => new SubscriptionImpl(pubsub, subscribers, subscription, pollers, shutdownHook, shutdownFlag, strategy, pubsub.replayWindow());
/** @internal */
class BoundedPubSubArb {
capacity;
replayBuffer;
array;
publisherIndex = 0;
subscribers;
subscriberCount = 0;
subscribersIndex = 0;
constructor(capacity, replayBuffer) {
this.capacity = capacity;
this.replayBuffer = replayBuffer;
this.array = Array.from({
length: capacity
});
this.subscribers = Array.from({
length: capacity
});
}
replayWindow() {
return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow;
}
isEmpty() {
return this.publisherIndex === this.subscribersIndex;
}
isFull() {
return this.publisherIndex === this.subscribersIndex + this.capacity;
}
size() {
return this.publisherIndex - this.subscribersIndex;
}
publish(value) {
if (this.isFull()) {
return false;
}
if (this.subscriberCount !== 0) {
const index = this.publisherIndex % this.capacity;
this.array[index] = value;
this.subscribers[index] = this.subscriberCount;
this.publisherIndex += 1;
}
if (this.replayBuffer) {
this.replayBuffer.offer(value);
}
return true;
}
publishAll(elements) {
if (this.subscriberCount === 0) {
if (this.replayBuffer) {
this.replayBuffer.offerAll(elements);
}
return Chunk.empty();
}
const chunk = Chunk.fromIterable(elements);
const n = chunk.length;
const size = this.publisherIndex - this.subscribersIndex;
const available = this.capacity - size;
const forPubSub = Math.min(n, available);
if (forPubSub === 0) {
return chunk;
}
let iteratorIndex = 0;
const publishAllIndex = this.publisherIndex + forPubSub;
while (this.publisherIndex !== publishAllIndex) {
const a = Chunk.unsafeGet(chunk, iteratorIndex++);
const index = this.publisherIndex % this.capacity;
this.array[index] = a;
this.subscribers[index] = this.subscriberCount;
this.publisherIndex += 1;
if (this.replayBuffer) {
this.replayBuffer.offer(a);
}
}
return Chunk.drop(chunk, iteratorIndex);
}
slide() {
if (this.subscribersIndex !== this.publisherIndex) {
const index = this.subscribersIndex % this.capacity;
this.array[index] = AbsentValue;
this.subscribers[index] = 0;
this.subscribersIndex += 1;
}
if (this.replayBuffer) {
this.replayBuffer.slide();
}
}
subscribe() {
this.subscriberCount += 1;
return new BoundedPubSubArbSubscription(this, this.publisherIndex, false);
}
}
class BoundedPubSubArbSubscription {
self;
subscriberIndex;
unsubscribed;
constructor(self, subscriberIndex, unsubscribed) {
this.self = self;
this.subscriberIndex = subscriberIndex;
this.unsubscribed = unsubscribed;
}
isEmpty() {
return this.unsubscribed || this.self.publisherIndex === this.subscriberIndex || this.self.publisherIndex === this.self.subscribersIndex;
}
size() {
if (this.unsubscribed) {
return 0;
}
return this.self.publisherIndex - Math.max(this.subscriberIndex, this.self.subscribersIndex);
}
poll(default_) {
if (this.unsubscribed) {
return default_;
}
this.subscriberIndex = Math.max(this.subscriberIndex, this.self.subscribersIndex);
if (this.subscriberIndex !== this.self.publisherIndex) {
const index = this.subscriberIndex % this.self.capacity;
const elem = this.self.array[index];
this.self.subscribers[index] -= 1;
if (this.self.subscribers[index] === 0) {
this.self.array[index] = AbsentValue;
this.self.subscribersIndex += 1;
}
this.subscriberIndex += 1;
return elem;
}
return default_;
}
pollUpTo(n) {
if (this.unsubscribed) {
return Chunk.empty();
}
this.subscriberIndex = Math.max(this.subscriberIndex, this.self.subscribersIndex);
const size = this.self.publisherIndex - this.subscriberIndex;
const toPoll = Math.min(n, size);
if (toPoll <= 0) {
return Chunk.empty();
}
const builder = [];
const pollUpToIndex = this.subscriberIndex + toPoll;
while (this.subscriberIndex !== pollUpToIndex) {
const index = this.subscriberIndex % this.self.capacity;
const a = this.self.array[index];
this.self.subscribers[index] -= 1;
if (this.self.subscribers[index] === 0) {
this.self.array[index] = AbsentValue;
this.self.subscribersIndex += 1;
}
builder.push(a);
this.subscriberIndex += 1;
}
return Chunk.fromIterable(builder);
}
unsubscribe() {
if (!this.unsubscribed) {
this.unsubscribed = true;
this.self.subscriberCount -= 1;
this.subscriberIndex = Math.max(this.subscriberIndex, this.self.subscribersIndex);
while (this.subscriberIndex !== this.self.publisherIndex) {
const index = this.subscriberIndex % this.self.capacity;
this.self.subscribers[index] -= 1;
if (this.self.subscribers[index] === 0) {
this.self.array[index] = AbsentValue;
this.self.subscribersIndex += 1;
}
this.subscriberIndex += 1;
}
}
}
}
/** @internal */
class BoundedPubSubPow2 {
capacity;
replayBuffer;
array;
mask;
publisherIndex = 0;
subscribers;
subscriberCount = 0;
subscribersIndex = 0;
constructor(capacity, replayBuffer) {
this.capacity = capacity;
this.replayBuffer = replayBuffer;
this.array = Array.from({
length: capacity
});
this.mask = capacity - 1;
this.subscribers = Array.from({
length: capacity
});
}
replayWindow() {
return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow;
}
isEmpty() {
return this.publisherIndex === this.subscribersIndex;
}
isFull() {
return this.publisherIndex === this.subscribersIndex + this.capacity;
}
size() {
return this.publisherIndex - this.subscribersIndex;
}
publish(value) {
if (this.isFull()) {
return false;
}
if (this.subscriberCount !== 0) {
const index = this.publisherIndex & this.mask;
this.array[index] = value;
this.subscribers[index] = this.subscriberCount;
this.publisherIndex += 1;
}
if (this.replayBuffer) {
this.replayBuffer.offer(value);
}
return true;
}
publishAll(elements) {
if (this.subscriberCount === 0) {
if (this.replayBuffer) {
this.replayBuffer.offerAll(elements);
}
return Chunk.empty();
}
const chunk = Chunk.fromIterable(elements);
const n = chunk.length;
const size = this.publisherIndex - this.subscribersIndex;
const available = this.capacity - size;
const forPubSub = Math.min(n, available);
if (forPubSub === 0) {
return chunk;
}
let iteratorIndex = 0;
const publishAllIndex = this.publisherIndex + forPubSub;
while (this.publisherIndex !== publishAllIndex) {
const elem = Chunk.unsafeGet(chunk, iteratorIndex++);
const index = this.publisherIndex & this.mask;
this.array[index] = elem;
this.subscribers[index] = this.subscriberCount;
this.publisherIndex += 1;
if (this.replayBuffer) {
this.replayBuffer.offer(elem);
}
}
return Chunk.drop(chunk, iteratorIndex);
}
slide() {
if (this.subscribersIndex !== this.publisherIndex) {
const index = this.subscribersIndex & this.mask;
this.array[index] = AbsentValue;
this.subscribers[index] = 0;
this.subscribersIndex += 1;
}
if (this.replayBuffer) {
this.replayBuffer.slide();
}
}
subscribe() {
this.subscriberCount += 1;
return new BoundedPubSubPow2Subscription(this, this.publisherIndex, false);
}
}
/** @internal */
class BoundedPubSubPow2Subscription {
self;
subscriberIndex;
unsubscribed;
constructor(self, subscriberIndex, unsubscribed) {
this.self = self;
this.subscriberIndex = subscriberIndex;
this.unsubscribed = unsubscribed;
}
isEmpty() {
return this.unsubscribed || this.self.publisherIndex === this.subscriberIndex || this.self.publisherIndex === this.self.subscribersIndex;
}
size() {
if (this.unsubscribed) {
return 0;
}
return this.self.publisherIndex - Math.max(this.subscriberIndex, this.self.subscribersIndex);
}
poll(default_) {
if (this.unsubscribed) {
return default_;
}
this.subscriberIndex = Math.max(this.subscriberIndex, this.self.subscribersIndex);
if (this.subscriberIndex !== this.self.publisherIndex) {
const index = this.subscriberIndex & this.self.mask;
const elem = this.self.array[index];
this.self.subscribers[index] -= 1;
if (this.self.subscribers[index] === 0) {
this.self.array[index] = AbsentValue;
this.self.subscribersIndex += 1;
}
this.subscriberIndex += 1;
return elem;
}
return default_;
}
pollUpTo(n) {
if (this.unsubscribed) {
return Chunk.empty();
}
this.subscriberIndex = Math.max(this.subscriberIndex, this.self.subscribersIndex);
const size = this.self.publisherIndex - this.subscriberIndex;
const toPoll = Math.min(n, size);
if (toPoll <= 0) {
return Chunk.empty();
}
const builder = [];
const pollUpToIndex = this.subscriberIndex + toPoll;
while (this.subscriberIndex !== pollUpToIndex) {
const index = this.subscriberIndex & this.self.mask;
const elem = this.self.array[index];
this.self.subscribers[index] -= 1;
if (this.self.subscribers[index] === 0) {
this.self.array[index] = AbsentValue;
this.self.subscribersIndex += 1;
}
builder.push(elem);
this.subscriberIndex += 1;
}
return Chunk.fromIterable(builder);
}
unsubscribe() {
if (!this.unsubscribed) {
this.unsubscribed = true;
this.self.subscriberCount -= 1;
this.subscriberIndex = Math.max(this.subscriberIndex, this.self.subscribersIndex);
while (this.subscriberIndex !== this.self.publisherIndex) {
const index = this.subscriberIndex & this.self.mask;
this.self.subscribers[index] -= 1;
if (this.self.subscribers[index] === 0) {
this.self.array[index] = AbsentValue;
this.self.subscribersIndex += 1;
}
this.subscriberIndex += 1;
}
}
}
}
/** @internal */
class BoundedPubSubSingle {
replayBuffer;
publisherIndex = 0;
subscriberCount = 0;
subscribers = 0;
value = AbsentValue;
capacity = 1;
constructor(replayBuffer) {
this.replayBuffer = replayBuffer;
}
replayWindow() {
return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow;
}
pipe() {
return pipeArguments(this, arguments);
}
isEmpty() {
return this.subscribers === 0;
}
isFull() {
return !this.isEmpty();
}
size() {
return this.isEmpty() ? 0 : 1;
}
publish(value) {
if (this.isFull()) {
return false;
}
if (this.subscriberCount !== 0) {
this.value = value;
this.subscribers = this.subscriberCount;
this.publisherIndex += 1;
}
if (this.replayBuffer) {
this.replayBuffer.offer(value);
}
return true;
}
publishAll(elements) {
if (this.subscriberCount === 0) {
if (this.replayBuffer) {
this.replayBuffer.offerAll(elements);
}
return Chunk.empty();
}
const chunk = Chunk.fromIterable(elements);
if (Chunk.isEmpty(chunk)) {
return chunk;
}
if (this.publish(Chunk.unsafeHead(chunk))) {
return Chunk.drop(chunk, 1);
} else {
return chunk;
}
}
slide() {
if (this.isFull()) {
this.subscribers = 0;
this.value = AbsentValue;
}
if (this.replayBuffer) {
this.replayBuffer.slide();
}
}
subscribe() {
this.subscriberCount += 1;
return new BoundedPubSubSingleSubscription(this, this.publisherIndex, false);
}
}
/** @internal */
class BoundedPubSubSingleSubscription {
self;
subscriberIndex;
unsubscribed;
constructor(self, subscriberIndex, unsubscribed) {
this.self = self;
this.subscriberIndex = subscriberIndex;
this.unsubscribed = unsubscribed;
}
isEmpty() {
return this.unsubscribed || this.self.subscribers === 0 || this.subscriberIndex === this.self.publisherIndex;
}
size() {
return this.isEmpty() ? 0 : 1;
}
poll(default_) {
if (this.isEmpty()) {
return default_;
}
const elem = this.self.value;
this.self.subscribers -= 1;
if (this.self.subscribers === 0) {
this.self.value = AbsentValue;
}
this.subscriberIndex += 1;
return elem;
}
pollUpTo(n) {
if (this.isEmpty() || n < 1) {
return Chunk.empty();
}
const a = this.self.value;
this.self.subscribers -= 1;
if (this.self.subscribers === 0) {
this.self.value = AbsentValue;
}
this.subscriberIndex += 1;
return Chunk.of(a);
}
unsubscribe() {
if (!this.unsubscribed) {
this.unsubscribed = true;
this.self.subscriberCount -= 1;
if (this.subscriberIndex !== this.self.publisherIndex) {
this.self.subscribers -= 1;
if (this.self.subscribers === 0) {
this.self.value = AbsentValue;
}
}
}
}
}
/** @internal */
class UnboundedPubSub {
replayBuffer;
publisherHead = {
value: AbsentValue,
subscribers: 0,
next: null
};
publisherTail = this.publisherHead;
publisherIndex = 0;
subscribersIndex = 0;
capacity = Number.MAX_SAFE_INTEGER;
constructor(replayBuffer) {
this.replayBuffer = replayBuffer;
}
replayWindow() {
return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow;
}
isEmpty() {
return this.publisherHead === this.publisherTail;
}
isFull() {
return false;
}
size() {
return this.publisherIndex - this.subscribersIndex;
}
publish(value) {
const subscribers = this.publisherTail.subscribers;
if (subscribers !== 0) {
this.publisherTail.next = {
value,
subscribers,
next: null
};
this.publisherTail = this.publisherTail.next;
this.publisherIndex += 1;
}
if (this.replayBuffer) {
this.replayBuffer.offer(value);
}
return true;
}
publishAll(elements) {
if (this.publisherTail.subscribers !== 0) {
for (const a of elements) {
this.publish(a);
}
} else if (this.replayBuffer) {
this.replayBuffer.offerAll(elements);
}
return Chunk.empty();
}
slide() {
if (this.publisherHead !== this.publisherTail) {
this.publisherHead = this.publisherHead.next;
this.publisherHead.value = AbsentValue;
this.subscribersIndex += 1;
}
if (this.replayBuffer) {
this.replayBuffer.slide();
}
}
subscribe() {
this.publisherTail.subscribers += 1;
return new UnboundedPubSubSubscription(this, this.publisherTail, this.publisherIndex, false);
}
}
/** @internal */
class UnboundedPubSubSubscription {
self;
subscriberHead;
subscriberIndex;
unsubscribed;
constructor(self, subscriberHead, subscriberIndex, unsubscribed) {
this.self = self;
this.subscriberHead = subscriberHead;
this.subscriberIndex = subscriberIndex;
this.unsubscribed = unsubscribed;
}
isEmpty() {
if (this.unsubscribed) {
return true;
}
let empty = true;
let loop = true;
while (loop) {
if (this.subscriberHead === this.self.publisherTail) {
loop = false;
} else {
if (this.subscriberHead.next.value !== AbsentValue) {
empty = false;
loop = false;
} else {
this.subscriberHead = this.subscriberHead.next;
this.subscriberIndex += 1;
}
}
}
return empty;
}
size() {
if (this.unsubscribed) {
return 0;
}
return this.self.publisherIndex - Math.max(this.subscriberIndex, this.self.subscribersIndex);
}
poll(default_) {
if (this.unsubscribed) {
return default_;
}
let loop = true;
let polled = default_;
while (loop) {
if (this.subscriberHead === this.self.publisherTail) {
loop = false;
} else {
const elem = this.subscriberHead.next.value;
if (elem !== AbsentValue) {
polled = elem;
this.subscriberHead.subscribers -= 1;
if (this.subscriberHead.subscribers === 0) {
this.self.publisherHead = this.self.publisherHead.next;
this.self.publisherHead.value = AbsentValue;
this.self.subscribersIndex += 1;
}
loop = false;
}
this.subscriberHead = this.subscriberHead.next;
this.subscriberIndex += 1;
}
}
return polled;
}
pollUpTo(n) {
const builder = [];
const default_ = AbsentValue;
let i = 0;
while (i !== n) {
const a = this.poll(default_);
if (a === default_) {
i = n;
} else {
builder.push(a);
i += 1;
}
}
return Chunk.fromIterable(builder);
}
unsubscribe() {
if (!this.unsubscribed) {
this.unsubscribed = true;
this.self.publisherTail.subscribers -= 1;
while (this.subscriberHead !== this.self.publisherTail) {
if (this.subscriberHead.next.value !== AbsentValue) {
this.subscriberHead.subscribers -= 1;
if (this.subscriberHead.subscribers === 0) {
this.self.publisherHead = this.self.publisherHead.next;
this.self.publisherHead.value = AbsentValue;
this.self.subscribersIndex += 1;
}
}
this.subscriberHead = this.subscriberHead.next;
}
}
}
}
/** @internal */
class SubscriptionImpl extends Effectable.Class {
pubsub;
subscribers;
subscription;
pollers;
shutdownHook;
shutdownFlag;
strategy;
replayWindow;
[queue.DequeueTypeId] = queue.dequeueVariance;
constructor(pubsub, subscribers, subscription, pollers, shutdownHook, shutdownFlag, strategy, replayWindow) {
super();
this.pubsub = pubsub;
this.subscribers = subscribers;
this.subscription = subscription;
this.pollers = pollers;
this.shutdownHook = shutdownHook;
this.shutdownFlag = shutdownFlag;
this.strategy = strategy;
this.replayWindow = replayWindow;
}
commit() {
return this.take;
}
pipe() {
return pipeArguments(this, arguments);
}
capacity() {
return this.pubsub.capacity;
}
isActive() {
return !MutableRef.get(this.shutdownFlag);
}
get size() {
return core.suspend(() => MutableRef.get(this.shutdownFlag) ? core.interrupt : core.succeed(this.subscription.size() + this.replayWindow.remaining));
}
unsafeSize() {
if (MutableRef.get(this.shutdownFlag)) {
return Option.none();
}
return Option.some(this.subscription.size() + this.replayWindow.remaining);
}
get isFull() {
return core.suspend(() => MutableRef.get(this.shutdownFlag) ? core.interrupt : core.succeed(this.subscription.size() === this.capacity()));
}
get isEmpty() {
return core.map(this.size, size => size === 0);
}
get shutdown() {
return core.uninterruptible(core.withFiberRuntime(state => {
MutableRef.set(this.shutdownFlag, true);
return pipe(fiberRuntime.forEachParUnbounded(unsafePollAllQueue(this.pollers), d => core.deferredInterruptWith(d, state.id()), false), core.zipRight(core.sync(() => {
this.subscribers.delete(this.subscription);
this.subscription.unsubscribe();
this.strategy.unsafeOnPubSubEmptySpace(this.pubsub, this.subscribers);
})), core.whenEffect(core.deferredSucceed(this.shutdownHook, void 0)), core.asVoid);
}));
}
get isShutdown() {
return core.sync(() => MutableRef.get(this.shutdownFlag));
}
get awaitShutdown() {
return core.deferredAwait(this.shutdownHook);
}
get take() {
return core.withFiberRuntime(state => {
if (MutableRef.get(this.shutdownFlag)) {
return core.interrupt;
}
if (this.replayWindow.remaining > 0) {
const message = this.replayWindow.take();
return core.succeed(message);
}
const message = MutableQueue.isEmpty(this.pollers) ? this.subscription.poll(MutableQueue.EmptyMutableQueue) : MutableQueue.EmptyMutableQueue;
if (message === MutableQueue.EmptyMutableQueue) {
const deferred = core.deferredUnsafeMake(state.id());
return pipe(core.suspend(() => {
pipe(this.pollers, MutableQueue.offer(deferred));
pipe(this.subscribers, addSubscribers(this.subscription, this.pollers));
this.strategy.unsafeCompletePollers(this.pubsub, this.subscribers, this.subscription, this.pollers);
return MutableRef.get(this.shutdownFlag) ? core.interrupt : core.deferredAwait(deferred);
}), core.onInterrupt(() => core.sync(() => unsafeRemove(this.pollers, deferred))));
} else {
this.strategy.unsafeOnPubSubEmptySpace(this.pubsub, this.subscribers);
return core.succeed(message);
}
});
}
get takeAll() {
return core.suspend(() => {
if (MutableRef.get(this.shutdownFlag)) {
return core.interrupt;
}
const as = MutableQueue.isEmpty(this.pollers) ? unsafePollAllSubscription(this.subscription) : Chunk.empty();
this.strategy.unsafeOnPubSubEmptySpace(this.pubsub, this.subscribers);
if (this.replayWindow.remaining > 0) {
return core.succeed(Chunk.appendAll(this.replayWindow.takeAll(), as));
}
return core.succeed(as);
});
}
takeUpTo(max) {
return core.suspend(() => {
if (MutableRef.get(this.shutdownFlag)) {
return core.interrupt;
}
let replay = undefined;
if (this.replayWindow.remaining >= max) {
const as = this.replayWindow.takeN(max);
return core.succeed(as);
} else if (this.replayWindow.remaining > 0) {
replay = this.replayWindow.takeAll();
max = max - replay.length;
}
const as = MutableQueue.isEmpty(this.pollers) ? unsafePollN(this.subscription, max) : Chunk.empty();
this.strategy.unsafeOnPubSubEmptySpace(this.pubsub, this.subscribers);
return replay ? core.succeed(Chunk.appendAll(replay, as)) : core.succeed(as);
});
}
takeBetween(min, max) {
return core.suspend(() => takeRemainderLoop(this, min, max, Chunk.empty()));
}
}
/** @internal */
const takeRemainderLoop = (self, min, max, acc) => {
if (max < min) {
return core.succeed(acc);
}
return pipe(self.takeUpTo(max), core.flatMap(bs => {
const remaining = min - bs.length;
if (remaining === 1) {
return pipe(self.take, core.map(b => pipe(acc, Chunk.appendAll(bs), Chunk.append(b))));
}
if (remaining > 1) {
return pipe(self.take, core.flatMap(b => takeRemainderLoop(self, remaining - 1, max - bs.length - 1, pipe(acc, Chunk.appendAll(bs), Chunk.append(b)))));
}
return core.succeed(pipe(acc, Chunk.appendAll(bs)));
}));
};
/** @internal */
class PubSubImpl {
pubsub;
subscribers;
scope;
shutdownHook;
shutdownFlag;
strategy;
[queue.EnqueueTypeId] = queue.enqueueVariance;
[queue.DequeueTypeId] = queue.dequeueVariance;
constructor(pubsub, subscribers, scope, shutdownHook, shutdownFlag, strategy) {
this.pubsub = pubsub;
this.subscribers = subscribers;
this.scope = scope;
this.shutdownHook = shutdownHook;
this.shutdownFlag = shutdownFlag;
this.strategy = strategy;
}
capacity() {
return this.pubsub.capacity;
}
get size() {
return core.suspend(() => MutableRef.get(this.shutdownFlag) ? core.interrupt : core.sync(() => this.pubsub.size()));
}
unsafeSize() {
if (MutableRef.get(this.shutdownFlag)) {
return Option.none();
}
return Option.some(this.pubsub.size());
}
get isFull() {
return core.map(this.size, size => size === this.capacity());
}
get isEmpty() {
return core.map(this.size, size => size === 0);
}
get awaitShutdown() {
return core.deferredAwait(this.shutdownHook);
}
get isShutdown() {
return core.sync(() => MutableRef.get(this.shutdownFlag));
}
get shutdown() {
return core.uninterruptible(core.withFiberRuntime(state => {
pipe(this.shutdownFlag, MutableRef.set(true));
return pipe(this.scope.close(core.exitInterrupt(state.id())), core.zipRight(this.strategy.shutdown), core.whenEffect(core.deferredSucceed(this.shutdownHook, void 0)), core.asVoid);
}));
}
publish(value) {
return core.suspend(() => {
if (MutableRef.get(this.shutdownFlag)) {
return core.interrupt;
}
if (this.pubsub.publish(value)) {
this.strategy.unsafeCompleteSubscribers(this.pubsub, this.subscribers);
return core.succeed(true);
}
return this.strategy.handleSurplus(this.pubsub, this.subscribers, Chunk.of(value), this.shutdownFlag);
});
}
isActive() {
return !MutableRef.get(this.shutdownFlag);
}
unsafeOffer(value) {
if (MutableRef.get(this.shutdownFlag)) {
return false;
}
if (this.pubsub.publish(value)) {
this.strategy.unsafeCompleteSubscribers(this.pubsub, this.subscribers);
return true;
}
return false;
}
publishAll(elements) {
return core.suspend(() => {
if (MutableRef.get(this.shutdownFlag)) {
return core.interrupt;
}
const surplus = unsafePublishAll(this.pubsub, elements);
this.strategy.unsafeCompleteSubscribers(this.pubsub, this.subscribers);
if (Chunk.isEmpty(surplus)) {
return core.succeed(true);
}
return this.strategy.handleSurplus(this.pubsub, this.subscribers, surplus, this.shutdownFlag);
});
}
get subscribe() {
const acquire = core.tap(fiberRuntime.all([this.scope.fork(executionStrategy.sequential), makeSubscription(this.pubsub, this.subscribers, this.strategy)]), tuple => tuple[0].addFinalizer(() => tuple[1].shutdown));
return core.map(fiberRuntime.acquireRelease(acquire, (tuple, exit) => tuple[0].close(exit)), tuple => tuple[1]);
}
offer(value) {
return this.publish(value);
}
offerAll(elements) {
return this.publishAll(elements);
}
pipe() {
return pipeArguments(this, arguments);
}
}
/** @internal */
export const makePubSub = (pubsub, strategy) => core.flatMap(fiberRuntime.scopeMake(), scope => core.map(core.deferredMake(), deferred => unsafeMakePubSub(pubsub, new Map(), scope, deferred, MutableRef.make(false), strategy)));
/** @internal */
export const unsafeMakePubSub = (pubsub, subscribers, scope, shutdownHook, shutdownFlag, strategy) => new PubSubImpl(pubsub, subscribers, scope, shutdownHook, shutdownFlag, strategy);
/** @internal */
const ensureCapacity = capacity => {
if (capacity <= 0) {
throw new core.InvalidPubSubCapacityException(`Cannot construct PubSub with capacity of ${capacity}`);
}
};
/** @internal */
const unsafeCompleteDeferred = (deferred, a) => {
core.deferredUnsafeDone(deferred, core.succeed(a));
};
/** @internal */
const unsafeOfferAll = (queue, as) => {
return pipe(queue, MutableQueue.offerAll(as));
};
/** @internal */
const unsafePollAllQueue = queue => {
return pipe(queue, MutableQueue.pollUpTo(Number.POSITIVE_INFINITY));
};
/** @internal */
const unsafePollAllSubscription = subscription => {
return subscription.pollUpTo(Number.POSITIVE_INFINITY);
};
/** @internal */
const unsafePollN = (subscription, max) => {
return subscription.pollUpTo(max);
};
/** @internal */
const unsafePublishAll = (pubsub, as) => {
return pubsub.publishAll(as);
};
/** @internal */
const unsafeRemove = (queue, value) => {
unsafeOfferAll(queue, pipe(unsafePollAllQueue(queue), Chunk.filter(elem => elem !== value)));
};
/**
* A strategy that applies back pressure to publishers when the `PubSub` is at
* capacity. This guarantees that all subscribers will receive all messages
* published to the `PubSub` while they are subscribed. However, it creates the
* risk that a slow subscriber will slow down the rate at which messages
* are published and received by other subscribers.
*
* @internal
*/
class BackPressureStrategy {
publishers = /*#__PURE__*/MutableQueue.unbounded();
get shutdown() {
return core.flatMap(core.fiberId, fiberId => core.flatMap(core.sync(() => unsafePollAllQueue(this.publishers)), publishers => fiberRuntime.forEachConcurrentDiscard(publishers, ([_, deferred, last]) => last ? pipe(core.deferredInterruptWith(deferred, fiberId), core.asVoid) : core.void, false, false)));
}
handleSurplus(pubsub, subscribers, elements, isShutdown) {
return core.withFiberRuntime(state => {
const deferred = core.deferredUnsafeMake(state.id());
return pipe(core.suspend(() => {
this.unsafeOffer(elements, deferred);
this.unsafeOnPubSubEmptySpace(pubsub, subscribers);
this.unsafeCompleteSubscribers(pubsub, subscribers);
return MutableRef.get(isShutdown) ? core.interrupt : core.deferredAwait(deferred);
}), core.onInterrupt(() => core.sync(() => this.unsafeRemove(deferred))));
});
}
unsafeOnPubSubEmptySpace(pubsub, subscribers) {
let keepPolling = true;
while (keepPolling && !pubsub.isFull()) {
const publisher = pipe(this.publishers, MutableQueue.poll(MutableQueue.EmptyMutableQueue));
if (publisher === MutableQueue.EmptyMutableQueue) {
keepPolling = false;
} else {
const published = pubsub.publish(publisher[0]);
if (published && publisher[2]) {
unsafeCompleteDeferred(publisher[1], true);
} else if (!published) {
unsafeOfferAll(this.publishers, pipe(unsafePollAllQueue(this.publishers), Chunk.prepend(publisher)));
}
this.unsafeCompleteSubscribers(pubsub, subscribers);
}
}
}
unsafeCompletePollers(pubsub, subscribers, subscription, pollers) {
return unsafeStrategyCompletePollers(this, pubsub, subscribers, subscription, pollers);
}
unsafeCompleteSubscribers(pubsub, subscribers) {
return unsafeStrategyCompleteSubscribers(this, pubsub, subscribers);
}
unsafeOffer(elements, deferred) {
const iterator = elements[Symbol.iterator]();
let next = iterator.next();
if (!next.done) {
// eslint-disable-next-line no-constant-condition
while (1) {
const value = next.value;
next = iterator.next();
if (next.done) {
pipe(this.publishers, MutableQueue.offer([value, deferred, true]));
break;
}
pipe(this.publishers, MutableQueue.offer([value, deferred, false]));
}
}
}
unsafeRemove(deferred) {
unsafeOfferAll(this.publishers, pipe(unsafePollAllQueue(this.publishers), Chunk.filter(([_, a]) => a !== deferred)));
}
}
/**
* A strategy that drops new messages when the `PubSub` is at capacity. This
* guarantees that a slow subscriber will not slow down the rate at which
* messages are published. However, it creates the risk that a slow
* subscriber will slow down the rate at which messages are received by
* other subscribers and that subscribers may not receive all messages
* published to the `PubSub` while they are subscribed.
*
* @internal
*/
export class DroppingStrategy {
get shutdown() {
return core.void;
}
handleSurplus(_pubsub, _subscribers, _elements, _isShutdown) {
return core.succeed(false);
}
unsafeOnPubSubEmptySpace(_pubsub, _subscribers) {
//
}
unsafeCompletePollers(pubsub, subscribers, subscription, pollers) {
return unsafeStrategyCompletePollers(this, pubsub, subscribers, subscription, pollers);
}
unsafeCompleteSubscribers(pubsub, subscribers) {
return unsafeStrategyCompleteSubscribers(this, pubsub, subscribers);
}
}
/**
* A strategy that adds new messages and drops old messages when the `PubSub` is
* at capacity. This guarantees that a slow subscriber will not slow down
* the rate at which messages are published and received by other
* subscribers. However, it creates the risk that a slow subscriber will
* not receive some messages published to the `PubSub` while it is subscribed.
*
* @internal
*/
export class SlidingStrategy {
get shutdown() {
return core.void;
}
handleSurplus(pubsub, subscribers, elements, _isShutdown) {
return core.sync(() => {
this.unsafeSlidingPublish(pubsub, elements);
this.unsafeCompleteSubscribers(pubsub, subscribers);
return true;
});
}
unsafeOnPubSubEmptySpace(_pubsub, _subscribers) {
//
}
unsafeCompletePollers(pubsub, subscribers, subscription, pollers) {
return unsafeStrategyCompletePollers(this, pubsub, subscribers, subscription, pollers);
}
unsafeCompleteSubscribers(pubsub, subscribers) {
return unsafeStrategyCompleteSubscribers(this, pubsub, subscribers);
}
unsafeSlidingPublish(pubsub, elements) {
const it = elements[Symbol.iterator]();
let next = it.next();
if (!next.done && pubsub.capacity > 0) {
let a = next.value;
let loop = true;
while (loop) {
pubsub.slide();
const pub = pubsub.publish(a);
if (pub && (next = it.next()) && !next.done) {
a = next.value;
} else if (pub) {
loop = false;
}
}
}
}
}
/** @internal */
const unsafeStrategyCompletePollers = (strategy, pubsub, subscribers, subscription, pollers) => {
let keepPolling = true;
while (keepPolling && !subscription.isEmpty()) {
const poller = pipe(pollers, MutableQueue.poll(MutableQueue.EmptyMutableQueue));
if (poller === MutableQueue.EmptyMutableQueue) {
pipe(subscribers, removeSubscribers(subscription, pollers));
if (MutableQueue.isEmpty(pollers)) {
keepPolling = false;
} else {
pipe(subscribers, addSubscribers(subscription, pollers));
}
} else {
const pollResult = subscription.poll(MutableQueue.EmptyMutableQueue);
if (pollResult === MutableQueue.EmptyMutableQueue) {
unsafeOfferAll(pollers, pipe(unsafePollAllQueue(pollers), Chunk.prepend(poller)));
} else {
unsafeCompleteDeferred(poller, pollResult);
strategy.unsafeOnPubSubEmptySpace(pubsub, subscribers);
}
}
}
};
/** @internal */
const unsafeStrategyCompleteSubscribers = (strategy, pubsub, subscribers) => {
for (const [subscription, pollersSet] of subscribers) {
for (const pollers of pollersSet) {
strategy.unsafeCompletePollers(pubsub, subscribers, subscription, pollers);
}
}
};
class ReplayBuffer {
capacity;
constructor(capacity) {
this.capacity = capacity;
}
head = {
value: AbsentValue,
next: null
};
tail = this.head;
size = 0;
index = 0;
slide() {
this.index++;
}
offer(a) {
this.tail.value = a;
this.tail.next = {
value: AbsentValue,
next: null
};
this.tail = this.tail.next;
if (this.size === this.capacity) {
this.head = this.head.next;
} else {
this.size += 1;
}
}
offerAll(as) {
for (const a of as) {
this.offer(a);
}
}
}
class ReplayWindowImpl {
buffer;
head;
index;
remaining;
constructor(buffer) {
this.buffer = buffer;
this.index = buffer.index;
this.remaining = buffer.size;
this.head = buffer.head;
}
fastForward() {
while (this.index < this.buffer.index) {
this.head = this.head.next;
this.index++;
}
}
take() {
if (this.remaining === 0) {
return undefined;
} else if (this.index < this.buffer.index) {
this.fastForward();
}
this.remaining--;
const value = this.head.value;
this.head = this.head.next;
return value;
}
takeN(n) {
if (this.remaining === 0) {
return Chunk.empty();
} else if (this.index < this.buffer.index) {
this.fastForward();
}
const len = Math.min(n, this.remaining);
const items = new Array(len);
for (let i = 0; i < len; i++) {
const value = this.head.value;
this.head = this.head.next;
items[i] = value;
}
this.remaining -= len;
return Chunk.unsafeFromArray(items);
}
takeAll() {
return this.takeN(this.remaining);
}
}
const emptyReplayWindow = {
remaining: 0,
take: () => undefined,
takeN: () => Chunk.empty(),
takeAll: () => Chunk.empty()
};
//# sourceMappingURL=pubsub.js.map