index.js•7.06 kB
const stream = require('streamx')
const defaultEncoding = 'utf8'
module.exports = exports = stream.Stream
exports.pipeline = stream.pipeline
exports.isStream = stream.isStream
exports.isEnded = stream.isEnded
exports.isFinished = stream.isFinished
exports.getStreamError = stream.getStreamError
exports.Stream = exports
exports.Readable = class Readable extends stream.Readable {
  constructor(opts = {}) {
    super(opts)
    if (this._construct) this._open = this._construct
    if (this._read !== stream.Readable.prototype._read) {
      this._read = read.bind(this, this._read)
    }
    if (this._destroy !== stream.Stream.prototype._destroy) {
      this._destroy = destroy.bind(this, this._destroy)
    }
  }
  push(chunk, encoding) {
    if (typeof chunk === 'string') {
      chunk = Buffer.from(chunk, encoding || defaultEncoding)
    }
    return super.push(chunk)
  }
  unshift(chunk, encoding) {
    if (typeof chunk === 'string') {
      chunk = Buffer.from(chunk, encoding || defaultEncoding)
    }
    super.unshift(chunk)
  }
}
exports.Writable = class Writable extends stream.Writable {
  constructor(opts = {}) {
    super({ ...opts, byteLengthWritable })
    if (this._construct) this._open = this._construct
    if (this._write !== stream.Writable.prototype._write) {
      this._write = write.bind(this, this._write)
    }
    if (this._destroy !== stream.Stream.prototype._destroy) {
      this._destroy = destroy.bind(this, this._destroy)
    }
  }
  write(chunk, encoding, cb) {
    if (typeof encoding === 'function') {
      cb = encoding
      encoding = null
    }
    if (typeof chunk === 'string') {
      encoding = encoding || defaultEncoding
      chunk = Buffer.from(chunk, encoding)
    } else {
      encoding = 'buffer'
    }
    const result = super.write({ chunk, encoding })
    if (cb) stream.Writable.drained(this).then(cb, cb)
    return result
  }
  end(chunk, encoding, cb) {
    if (typeof chunk === 'function') {
      cb = chunk
      chunk = null
    } else if (typeof encoding === 'function') {
      cb = encoding
      encoding = null
    }
    if (typeof chunk === 'string') {
      encoding = encoding || defaultEncoding
      chunk = Buffer.from(chunk, encoding || defaultEncoding)
    } else {
      encoding = 'buffer'
    }
    const result =
      chunk !== undefined && chunk !== null
        ? super.end({ chunk, encoding })
        : super.end()
    if (cb) this.once('end', cb)
    return result
  }
}
exports.Duplex = class Duplex extends stream.Duplex {
  constructor(opts = {}) {
    super({ ...opts, byteLengthWritable })
    if (this._construct) this._open = this._construct
    if (this._read !== stream.Readable.prototype._read) {
      this._read = read.bind(this, this._read)
    }
    if (this._write !== stream.Duplex.prototype._write) {
      this._write = write.bind(this, this._write)
    }
    if (this._destroy !== stream.Stream.prototype._destroy) {
      this._destroy = destroy.bind(this, this._destroy)
    }
  }
  push(chunk, encoding) {
    if (typeof chunk === 'string') {
      chunk = Buffer.from(chunk, encoding || defaultEncoding)
    }
    return super.push(chunk)
  }
  unshift(chunk, encoding) {
    if (typeof chunk === 'string') {
      chunk = Buffer.from(chunk, encoding || defaultEncoding)
    }
    super.unshift(chunk)
  }
  write(chunk, encoding, cb) {
    if (typeof encoding === 'function') {
      cb = encoding
      encoding = null
    }
    if (typeof chunk === 'string') {
      encoding = encoding || defaultEncoding
      chunk = Buffer.from(chunk, encoding)
    } else {
      encoding = 'buffer'
    }
    const result = super.write({ chunk, encoding })
    if (cb) stream.Writable.drained(this).then(cb, cb)
    return result
  }
  end(chunk, encoding, cb) {
    if (typeof chunk === 'function') {
      cb = chunk
      chunk = null
    } else if (typeof encoding === 'function') {
      cb = encoding
      encoding = null
    }
    if (typeof chunk === 'string') {
      encoding = encoding || defaultEncoding
      chunk = Buffer.from(chunk, encoding)
    } else {
      encoding = 'buffer'
    }
    const result =
      chunk !== undefined && chunk !== null
        ? super.end({ chunk, encoding })
        : super.end()
    if (cb) this.once('end', cb)
    return result
  }
}
exports.Transform = class Transform extends stream.Transform {
  constructor(opts = {}) {
    super({ ...opts, byteLengthWritable })
    if (this._transform !== stream.Transform.prototype._transform) {
      this._transform = transform.bind(this, this._transform)
    } else {
      this._transform = passthrough
    }
  }
  push(chunk, encoding) {
    if (typeof chunk === 'string') {
      chunk = Buffer.from(chunk, encoding || defaultEncoding)
    }
    return super.push(chunk)
  }
  unshift(chunk, encoding) {
    if (typeof chunk === 'string') {
      chunk = Buffer.from(chunk, encoding || defaultEncoding)
    }
    super.unshift(chunk)
  }
  write(chunk, encoding, cb) {
    if (typeof encoding === 'function') {
      cb = encoding
      encoding = null
    }
    if (typeof chunk === 'string') {
      encoding = encoding || defaultEncoding
      chunk = Buffer.from(chunk, encoding)
    } else {
      encoding = 'buffer'
    }
    const result = super.write({ chunk, encoding })
    if (cb) stream.Writable.drained(this).then(cb, cb)
    return result
  }
  end(chunk, encoding, cb) {
    if (typeof chunk === 'function') {
      cb = chunk
      chunk = null
    } else if (typeof encoding === 'function') {
      cb = encoding
      encoding = null
    }
    if (typeof chunk === 'string') {
      encoding = encoding || defaultEncoding
      chunk = Buffer.from(chunk, encoding)
    } else {
      encoding = 'buffer'
    }
    const result =
      chunk !== undefined && chunk !== null
        ? super.end({ chunk, encoding })
        : super.end()
    if (cb) this.once('end', cb)
    return result
  }
}
exports.PassThrough = class PassThrough extends exports.Transform {}
exports.finished = function finished(stream, opts, cb) {
  if (typeof opts === 'function') {
    cb = opts
    opts = {}
  }
  if (!opts) opts = {}
  const { cleanup = false } = opts
  const done = () => {
    cb(exports.getStreamError(stream, { all: true }))
    if (cleanup) detach()
  }
  const detach = () => {
    stream.off('close', done)
    stream.off('error', noop)
  }
  if (stream.destroyed) {
    done()
  } else {
    stream.on('close', done)
    stream.on('error', noop)
  }
  return detach
}
function read(read, cb) {
  read.call(this, 65536)
  cb(null)
}
function write(write, data, cb) {
  write.call(this, data.chunk, data.encoding, cb)
}
function transform(transform, data, cb) {
  transform.call(this, data.chunk, data.encoding, cb)
}
function destroy(destroy, cb) {
  destroy.call(this, exports.getStreamError(this), cb)
}
function passthrough(data, cb) {
  cb(null, data.chunk)
}
function byteLengthWritable(data) {
  return data.chunk.byteLength
}
function noop() {}