concurrency.ts•2.35 kB
import { createAbortError } from "./abort.js";
interface PendingRequest {
  resolve: () => void;
  reject: (error: Error) => void;
}
interface ProfileState {
  active: number;
  limit: number;
  queue: PendingRequest[];
}
export class ConcurrencyLimiter {
  private readonly states = new Map<string, ProfileState>();
  async acquire(key: string, limit: number, signal?: AbortSignal): Promise<() => void> {
    const normalizedLimit = Math.max(1, limit);
    let state = this.states.get(key);
    if (!state) {
      state = { active: 0, limit: normalizedLimit, queue: [] };
      this.states.set(key, state);
    } else {
      state.limit = normalizedLimit;
    }
    let released = false;
    const release = () => {
      if (released) {
        return;
      }
      released = true;
      this.release(key);
    };
    if (state.active < state.limit) {
      state.active += 1;
      return release;
    }
    return new Promise((resolve, reject) => {
      const abortError = createAbortError("Operation cancelled before starting");
      let abortListener: (() => void) | undefined;
      const pending: PendingRequest = {
        resolve: () => {
          if (abortListener && signal) {
            signal.removeEventListener("abort", abortListener);
          }
          state!.active += 1;
          resolve(release);
        },
        reject: (error: Error) => {
          if (abortListener && signal) {
            signal.removeEventListener("abort", abortListener);
          }
          reject(error);
        }
      };
      state!.queue.push(pending);
      if (signal) {
        abortListener = () => {
          const index = state!.queue.indexOf(pending);
          if (index >= 0) {
            state!.queue.splice(index, 1);
          }
          pending.reject(abortError);
        };
        if (signal.aborted) {
          abortListener();
          return;
        }
        signal.addEventListener("abort", abortListener, { once: true });
      }
    });
  }
  private release(key: string): void {
    const state = this.states.get(key);
    if (!state) {
      return;
    }
    if (state.active > 0) {
      state.active -= 1;
    }
    const next = state.queue.shift();
    if (next) {
      next.resolve();
      return;
    }
    if (state.active === 0) {
      this.states.delete(key);
    }
  }
}