'use strict'
const fastq = require('fastq')
const EE = require('node:events').EventEmitter
const inherits = require('node:util').inherits
const {
AVV_ERR_EXPOSE_ALREADY_DEFINED,
AVV_ERR_CALLBACK_NOT_FN,
AVV_ERR_ROOT_PLG_BOOTED,
AVV_ERR_READY_TIMEOUT,
AVV_ERR_ATTRIBUTE_ALREADY_DEFINED
} = require('./lib/errors')
const {
kAvvio,
kIsOnCloseHandler
} = require('./lib/symbols')
const { TimeTree } = require('./lib/time-tree')
const { Plugin } = require('./lib/plugin')
const { debug } = require('./lib/debug')
const { validatePlugin } = require('./lib/validate-plugin')
const { isBundledOrTypescriptPlugin } = require('./lib/is-bundled-or-typescript-plugin')
const { isPromiseLike } = require('./lib/is-promise-like')
const { thenify } = require('./lib/thenify')
const { executeWithThenable } = require('./lib/execute-with-thenable')
function Boot (server, opts, done) {
if (typeof server === 'function' && arguments.length === 1) {
done = server
opts = {}
server = null
}
if (typeof opts === 'function') {
done = opts
opts = {}
}
opts = opts || {}
opts.autostart = opts.autostart !== false
opts.timeout = Number(opts.timeout) || 0
opts.expose = opts.expose || {}
if (!new.target) {
return new Boot(server, opts, done)
}
this._server = server || this
this._opts = opts
if (server) {
this._expose()
}
/**
* @type {Array<Plugin>}
*/
this._current = []
this._error = null
this._lastUsed = null
this.setMaxListeners(0)
if (done) {
this.once('start', done)
}
this.started = false
this.booted = false
this.pluginTree = new TimeTree()
this._readyQ = fastq(this, callWithCbOrNextTick, 1)
this._readyQ.pause()
this._readyQ.drain = () => {
this.emit('start')
// nooping this, we want to emit start only once
this._readyQ.drain = noop
}
this._closeQ = fastq(this, closeWithCbOrNextTick, 1)
this._closeQ.pause()
this._closeQ.drain = () => {
this.emit('close')
// nooping this, we want to emit close only once
this._closeQ.drain = noop
}
this._doStart = null
const instance = this
this._root = new Plugin(fastq(this, this._loadPluginNextTick, 1), function root (server, opts, done) {
instance._doStart = done
opts.autostart && instance.start()
}, opts, false, 0)
this._trackPluginLoading(this._root)
this._loadPlugin(this._root, (err) => {
debug('root plugin ready')
try {
this.emit('preReady')
this._root = null
} catch (preReadyError) {
err = err || this._error || preReadyError
}
if (err) {
this._error = err
if (this._readyQ.length() === 0) {
throw err
}
} else {
this.booted = true
}
this._readyQ.resume()
})
}
inherits(Boot, EE)
Boot.prototype.start = function () {
this.started = true
// we need to wait any call to use() to happen
process.nextTick(this._doStart)
return this
}
// allows to override the instance of a server, given a plugin
Boot.prototype.override = function (server, func, opts) {
return server
}
Boot.prototype[kAvvio] = true
// load a plugin
Boot.prototype.use = function (plugin, opts) {
this._lastUsed = this._addPlugin(plugin, opts, false)
return this
}
Boot.prototype._loadRegistered = function () {
const plugin = this._current[0]
const weNeedToStart = !this.started && !this.booted
// if the root plugin is not loaded, let's resume that
// so one can use after() before calling ready
if (weNeedToStart) {
process.nextTick(() => this._root.queue.resume())
}
if (!plugin) {
return Promise.resolve()
}
return plugin.loadedSoFar()
}
Object.defineProperty(Boot.prototype, 'then', { get: thenify })
Boot.prototype._addPlugin = function (pluginFn, opts, isAfter) {
if (isBundledOrTypescriptPlugin(pluginFn)) {
pluginFn = pluginFn.default
}
validatePlugin(pluginFn)
opts = opts || {}
if (this.booted) {
throw new AVV_ERR_ROOT_PLG_BOOTED()
}
// we always add plugins to load at the current element
const current = this._current[0]
let timeout = this._opts.timeout
if (!current.loaded && current.timeout > 0) {
const delta = Date.now() - current.startTime
// We need to decrease it by 3ms to make sure the internal timeout
// is triggered earlier than the parent
timeout = current.timeout - (delta + 3)
}
const plugin = new Plugin(fastq(this, this._loadPluginNextTick, 1), pluginFn, opts, isAfter, timeout)
this._trackPluginLoading(plugin)
if (current.loaded) {
throw new Error(plugin.name, current.name)
}
// we add the plugin to be loaded at the end of the current queue
current.enqueue(plugin, (err) => { err && (this._error = err) })
return plugin
}
Boot.prototype._expose = function _expose () {
const instance = this
const server = instance._server
const {
use: useKey = 'use',
after: afterKey = 'after',
ready: readyKey = 'ready',
onClose: onCloseKey = 'onClose',
close: closeKey = 'close'
} = this._opts.expose
if (server[useKey]) {
throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(useKey, 'use')
}
server[useKey] = function (fn, opts) {
instance.use(fn, opts)
return this
}
if (server[afterKey]) {
throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(afterKey, 'after')
}
server[afterKey] = function (func) {
if (typeof func !== 'function') {
return instance._loadRegistered()
}
instance.after(encapsulateThreeParam(func, this))
return this
}
if (server[readyKey]) {
throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(readyKey, 'ready')
}
server[readyKey] = function (func) {
if (func && typeof func !== 'function') {
throw new AVV_ERR_CALLBACK_NOT_FN(readyKey, typeof func)
}
return instance.ready(func ? encapsulateThreeParam(func, this) : undefined)
}
if (server[onCloseKey]) {
throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(onCloseKey, 'onClose')
}
server[onCloseKey] = function (func) {
if (typeof func !== 'function') {
throw new AVV_ERR_CALLBACK_NOT_FN(onCloseKey, typeof func)
}
instance.onClose(encapsulateTwoParam(func, this))
return this
}
if (server[closeKey]) {
throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(closeKey, 'close')
}
server[closeKey] = function (func) {
if (func && typeof func !== 'function') {
throw new AVV_ERR_CALLBACK_NOT_FN(closeKey, typeof func)
}
if (func) {
instance.close(encapsulateThreeParam(func, this))
return this
}
// this is a Promise
return instance.close()
}
if (server.then) {
throw new AVV_ERR_ATTRIBUTE_ALREADY_DEFINED('then')
}
Object.defineProperty(server, 'then', { get: thenify.bind(instance) })
server[kAvvio] = true
}
Boot.prototype.after = function (func) {
if (!func) {
return this._loadRegistered()
}
this._addPlugin(_after.bind(this), {}, true)
function _after (s, opts, done) {
callWithCbOrNextTick.call(this, func, done)
}
return this
}
Boot.prototype.onClose = function (func) {
// this is used to distinguish between onClose and close handlers
// because they share the same queue but must be called with different signatures
if (typeof func !== 'function') {
throw new AVV_ERR_CALLBACK_NOT_FN('onClose', typeof func)
}
func[kIsOnCloseHandler] = true
this._closeQ.unshift(func, (err) => { err && (this._error = err) })
return this
}
Boot.prototype.close = function (func) {
let promise
if (func) {
if (typeof func !== 'function') {
throw new AVV_ERR_CALLBACK_NOT_FN('close', typeof func)
}
} else {
promise = new Promise(function (resolve, reject) {
func = function (err) {
if (err) {
return reject(err)
}
resolve()
}
})
}
this.ready(() => {
this._error = null
this._closeQ.push(func)
process.nextTick(this._closeQ.resume.bind(this._closeQ))
})
return promise
}
Boot.prototype.ready = function (func) {
if (func) {
if (typeof func !== 'function') {
throw new AVV_ERR_CALLBACK_NOT_FN('ready', typeof func)
}
this._readyQ.push(func)
queueMicrotask(this.start.bind(this))
return
}
return new Promise((resolve, reject) => {
this._readyQ.push(readyPromiseCB)
this.start()
/**
* The `encapsulateThreeParam` let callback function
* bind to the right server instance.
* In promises we need to track the last server
* instance loaded, the first one in the _current queue.
*/
const relativeContext = this._current[0].server
function readyPromiseCB (err, context, done) {
// the context is always binded to the root server
if (err) {
reject(err)
} else {
resolve(relativeContext)
}
process.nextTick(done)
}
})
}
/**
* @param {Plugin} plugin
* @returns {void}
*/
Boot.prototype._trackPluginLoading = function (plugin) {
const parentName = this._current[0]?.name || null
plugin.once('start', (serverName, funcName, time) => {
const nodeId = this.pluginTree.start(parentName || null, funcName, time)
plugin.once('loaded', (serverName, funcName, time) => {
this.pluginTree.stop(nodeId, time)
})
})
}
Boot.prototype.prettyPrint = function () {
return this.pluginTree.prettyPrint()
}
Boot.prototype.toJSON = function () {
return this.pluginTree.toJSON()
}
/**
* @callback LoadPluginCallback
* @param {Error} [err]
*/
/**
* Load a plugin
*
* @param {Plugin} plugin
* @param {LoadPluginCallback} callback
*/
Boot.prototype._loadPlugin = function (plugin, callback) {
const instance = this
if (isPromiseLike(plugin.func)) {
plugin.func.then((fn) => {
if (typeof fn.default === 'function') {
fn = fn.default
}
plugin.func = fn
this._loadPlugin(plugin, callback)
}, callback)
return
}
const last = instance._current[0]
// place the plugin at the top of _current
instance._current.unshift(plugin)
if (instance._error && !plugin.isAfter) {
debug('skipping loading of plugin as instance errored and it is not an after', plugin.name)
process.nextTick(execCallback)
return
}
let server = (last && last.server) || instance._server
if (!plugin.isAfter) {
// Skip override for after
try {
server = instance.override(server, plugin.func, plugin.options)
} catch (overrideErr) {
debug('override errored', plugin.name)
return execCallback(overrideErr)
}
}
plugin.exec(server, execCallback)
function execCallback (err) {
plugin.finish(err, (err) => {
instance._current.shift()
callback(err)
})
}
}
/**
* Delays plugin loading until the next tick to ensure any bound `_after` callbacks have a chance
* to run prior to executing the next plugin
*/
Boot.prototype._loadPluginNextTick = function (plugin, callback) {
process.nextTick(this._loadPlugin.bind(this), plugin, callback)
}
function noop () { }
function callWithCbOrNextTick (func, cb) {
const context = this._server
const err = this._error
// with this the error will appear just in the next after/ready callback
this._error = null
if (func.length === 0) {
this._error = err
executeWithThenable(func, [], cb)
} else if (func.length === 1) {
executeWithThenable(func, [err], cb)
} else {
if (this._opts.timeout === 0) {
const wrapCb = (err) => {
this._error = err
cb(this._error)
}
if (func.length === 2) {
func(err, wrapCb)
} else {
func(err, context, wrapCb)
}
} else {
timeoutCall.call(this, func, err, context, cb)
}
}
}
function timeoutCall (func, rootErr, context, cb) {
const name = func.unwrappedName ?? func.name
debug('setting up ready timeout', name, this._opts.timeout)
let timer = setTimeout(() => {
debug('timed out', name)
timer = null
const toutErr = new AVV_ERR_READY_TIMEOUT(name)
toutErr.fn = func
this._error = toutErr
cb(toutErr)
}, this._opts.timeout)
if (func.length === 2) {
func(rootErr, timeoutCb.bind(this))
} else {
func(rootErr, context, timeoutCb.bind(this))
}
function timeoutCb (err) {
if (timer) {
clearTimeout(timer)
this._error = err
cb(this._error)
} else {
// timeout has been triggered
// can not call cb twice
}
}
}
function closeWithCbOrNextTick (func, cb) {
const context = this._server
const isOnCloseHandler = func[kIsOnCloseHandler]
if (func.length === 0 || func.length === 1) {
let promise
if (isOnCloseHandler) {
promise = func(context)
} else {
promise = func(this._error)
}
if (promise && typeof promise.then === 'function') {
debug('resolving close/onClose promise')
promise.then(
() => process.nextTick(cb),
(e) => process.nextTick(cb, e))
} else {
process.nextTick(cb)
}
} else if (func.length === 2) {
if (isOnCloseHandler) {
func(context, cb)
} else {
func(this._error, cb)
}
} else {
if (isOnCloseHandler) {
func(context, cb)
} else {
func(this._error, context, cb)
}
}
}
function encapsulateTwoParam (func, that) {
return _encapsulateTwoParam.bind(that)
function _encapsulateTwoParam (context, cb) {
let res
if (func.length === 0) {
res = func()
if (res && res.then) {
res.then(function () {
process.nextTick(cb)
}, cb)
} else {
process.nextTick(cb)
}
} else if (func.length === 1) {
res = func(this)
if (res && res.then) {
res.then(function () {
process.nextTick(cb)
}, cb)
} else {
process.nextTick(cb)
}
} else {
func(this, cb)
}
}
}
function encapsulateThreeParam (func, that) {
const wrapped = _encapsulateThreeParam.bind(that)
wrapped.unwrappedName = func.name
return wrapped
function _encapsulateThreeParam (err, cb) {
let res
if (!func) {
process.nextTick(cb)
} else if (func.length === 0) {
res = func()
if (res && res.then) {
res.then(function () {
process.nextTick(cb, err)
}, cb)
} else {
process.nextTick(cb, err)
}
} else if (func.length === 1) {
res = func(err)
if (res && res.then) {
res.then(function () {
process.nextTick(cb)
}, cb)
} else {
process.nextTick(cb)
}
} else if (func.length === 2) {
func(err, cb)
} else {
func(err, this, cb)
}
}
}
module.exports = Boot