web.js•3.91 kB
const { Readable, getStreamError } = require('streamx')
exports.ReadableStreamDefaultReader = class ReadableStreamDefaultReader {
  constructor(stream) {
    this._stream = stream._stream
  }
  read() {
    const stream = this._stream
    return new Promise((resolve, reject) => {
      const err = getStreamError(stream)
      if (err) return reject(err)
      if (stream.destroyed) {
        return resolve({ value: undefined, done: true })
      }
      const value = stream.read()
      if (value !== null) {
        return resolve({ value, done: false })
      }
      stream
        .once('readable', onreadable)
        .once('close', onclose)
        .once('error', onerror)
      function onreadable() {
        const value = stream.read()
        ondone(
          null,
          value === null
            ? { value: undefined, done: true }
            : { value, done: false }
        )
      }
      function onclose() {
        ondone(null, { value: undefined, done: true })
      }
      function onerror(err) {
        ondone(err, null)
      }
      function ondone(err, value) {
        stream
          .off('readable', onreadable)
          .off('close', onclose)
          .off('error', onerror)
        if (err) reject(err)
        else resolve(value)
      }
    })
  }
  cancel(reason) {
    if (this._stream.destroyed) return Promise.resolve()
    return new Promise((resolve) =>
      this._stream.once('close', resolve).destroy(reason)
    )
  }
}
exports.ReadableStreamDefaultController = class ReadableStreamDefaultController {
  constructor(stream) {
    this._stream = stream._stream
  }
  get desiredSize() {
    return (
      this._stream._readableState.highWaterMark -
      this._stream._readableState.buffered
    )
  }
  enqueue(data) {
    this._stream.push(data)
  }
  close() {
    this._stream.push(null)
  }
  error(err) {
    this._stream.destroy(err)
  }
}
exports.ReadableStream = class ReadableStream {
  constructor(
    underlyingSource = {},
    queuingStrategy = new exports.CountQueuingStrategy(),
    stream
  ) {
    const { start, pull } = underlyingSource
    const { highWaterMark = 1, size = defaultSize } = queuingStrategy
    this._stream =
      stream ||
      new Readable({ eagerOpen: true, highWaterMark, byteLength: size })
    this._controller = new exports.ReadableStreamDefaultController(this)
    if (start) {
      this._stream._open = open.bind(this, start.call(this, this._controller))
    }
    if (pull) {
      this._stream._read = read.bind(this, pull)
    }
  }
  getReader() {
    return new exports.ReadableStreamDefaultReader(this)
  }
  cancel(reason) {
    if (this._stream.destroyed) return Promise.resolve()
    return new Promise((resolve) =>
      this._stream.once('close', resolve).destroy(reason)
    )
  }
  pipeTo(destination) {
    return new Promise((resolve, reject) =>
      this._stream.pipe(destination, (err) => {
        err ? reject(err) : resolve()
      })
    )
  }
  [Symbol.asyncIterator]() {
    return this._stream[Symbol.asyncIterator]()
  }
  static from(iterable) {
    return new ReadableStream(undefined, undefined, Readable.from(iterable))
  }
}
async function open(starting, cb) {
  try {
    await starting
    cb(null)
  } catch (err) {
    cb(err)
  }
}
async function read(pull, cb) {
  try {
    await pull(this._controller)
    cb(null)
  } catch (err) {
    cb(err)
  }
}
function defaultSize() {
  return 1
}
exports.CountQueuingStrategy = class CountQueuingStrategy {
  constructor(opts = {}) {
    const { highWaterMark = 1 } = opts
    this.highWaterMark = highWaterMark
  }
  size(chunk) {
    return 1
  }
}
exports.ByteLengthQueuingStrategy = class ByteLengthQueuingStrategy {
  constructor(opts = {}) {
    const { highWaterMark = 16384 } = opts
    this.highWaterMark = highWaterMark
  }
  size(chunk) {
    return chunk.byteLength
  }
}