Skip to main content
Glama
boot.js14.6 kB
'use strict' const fastq = require('fastq') const EE = require('node:events').EventEmitter const inherits = require('node:util').inherits const { AVV_ERR_EXPOSE_ALREADY_DEFINED, AVV_ERR_CALLBACK_NOT_FN, AVV_ERR_ROOT_PLG_BOOTED, AVV_ERR_READY_TIMEOUT, AVV_ERR_ATTRIBUTE_ALREADY_DEFINED } = require('./lib/errors') const { kAvvio, kIsOnCloseHandler } = require('./lib/symbols') const { TimeTree } = require('./lib/time-tree') const { Plugin } = require('./lib/plugin') const { debug } = require('./lib/debug') const { validatePlugin } = require('./lib/validate-plugin') const { isBundledOrTypescriptPlugin } = require('./lib/is-bundled-or-typescript-plugin') const { isPromiseLike } = require('./lib/is-promise-like') const { thenify } = require('./lib/thenify') const { executeWithThenable } = require('./lib/execute-with-thenable') function Boot (server, opts, done) { if (typeof server === 'function' && arguments.length === 1) { done = server opts = {} server = null } if (typeof opts === 'function') { done = opts opts = {} } opts = opts || {} opts.autostart = opts.autostart !== false opts.timeout = Number(opts.timeout) || 0 opts.expose = opts.expose || {} if (!new.target) { return new Boot(server, opts, done) } this._server = server || this this._opts = opts if (server) { this._expose() } /** * @type {Array<Plugin>} */ this._current = [] this._error = null this._lastUsed = null this.setMaxListeners(0) if (done) { this.once('start', done) } this.started = false this.booted = false this.pluginTree = new TimeTree() this._readyQ = fastq(this, callWithCbOrNextTick, 1) this._readyQ.pause() this._readyQ.drain = () => { this.emit('start') // nooping this, we want to emit start only once this._readyQ.drain = noop } this._closeQ = fastq(this, closeWithCbOrNextTick, 1) this._closeQ.pause() this._closeQ.drain = () => { this.emit('close') // nooping this, we want to emit close only once this._closeQ.drain = noop } this._doStart = null const instance = this this._root = new Plugin(fastq(this, this._loadPluginNextTick, 1), function root (server, opts, done) { instance._doStart = done opts.autostart && instance.start() }, opts, false, 0) this._trackPluginLoading(this._root) this._loadPlugin(this._root, (err) => { debug('root plugin ready') try { this.emit('preReady') this._root = null } catch (preReadyError) { err = err || this._error || preReadyError } if (err) { this._error = err if (this._readyQ.length() === 0) { throw err } } else { this.booted = true } this._readyQ.resume() }) } inherits(Boot, EE) Boot.prototype.start = function () { this.started = true // we need to wait any call to use() to happen process.nextTick(this._doStart) return this } // allows to override the instance of a server, given a plugin Boot.prototype.override = function (server, func, opts) { return server } Boot.prototype[kAvvio] = true // load a plugin Boot.prototype.use = function (plugin, opts) { this._lastUsed = this._addPlugin(plugin, opts, false) return this } Boot.prototype._loadRegistered = function () { const plugin = this._current[0] const weNeedToStart = !this.started && !this.booted // if the root plugin is not loaded, let's resume that // so one can use after() before calling ready if (weNeedToStart) { process.nextTick(() => this._root.queue.resume()) } if (!plugin) { return Promise.resolve() } return plugin.loadedSoFar() } Object.defineProperty(Boot.prototype, 'then', { get: thenify }) Boot.prototype._addPlugin = function (pluginFn, opts, isAfter) { if (isBundledOrTypescriptPlugin(pluginFn)) { pluginFn = pluginFn.default } validatePlugin(pluginFn) opts = opts || {} if (this.booted) { throw new AVV_ERR_ROOT_PLG_BOOTED() } // we always add plugins to load at the current element const current = this._current[0] let timeout = this._opts.timeout if (!current.loaded && current.timeout > 0) { const delta = Date.now() - current.startTime // We need to decrease it by 3ms to make sure the internal timeout // is triggered earlier than the parent timeout = current.timeout - (delta + 3) } const plugin = new Plugin(fastq(this, this._loadPluginNextTick, 1), pluginFn, opts, isAfter, timeout) this._trackPluginLoading(plugin) if (current.loaded) { throw new Error(plugin.name, current.name) } // we add the plugin to be loaded at the end of the current queue current.enqueue(plugin, (err) => { err && (this._error = err) }) return plugin } Boot.prototype._expose = function _expose () { const instance = this const server = instance._server const { use: useKey = 'use', after: afterKey = 'after', ready: readyKey = 'ready', onClose: onCloseKey = 'onClose', close: closeKey = 'close' } = this._opts.expose if (server[useKey]) { throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(useKey, 'use') } server[useKey] = function (fn, opts) { instance.use(fn, opts) return this } if (server[afterKey]) { throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(afterKey, 'after') } server[afterKey] = function (func) { if (typeof func !== 'function') { return instance._loadRegistered() } instance.after(encapsulateThreeParam(func, this)) return this } if (server[readyKey]) { throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(readyKey, 'ready') } server[readyKey] = function (func) { if (func && typeof func !== 'function') { throw new AVV_ERR_CALLBACK_NOT_FN(readyKey, typeof func) } return instance.ready(func ? encapsulateThreeParam(func, this) : undefined) } if (server[onCloseKey]) { throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(onCloseKey, 'onClose') } server[onCloseKey] = function (func) { if (typeof func !== 'function') { throw new AVV_ERR_CALLBACK_NOT_FN(onCloseKey, typeof func) } instance.onClose(encapsulateTwoParam(func, this)) return this } if (server[closeKey]) { throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(closeKey, 'close') } server[closeKey] = function (func) { if (func && typeof func !== 'function') { throw new AVV_ERR_CALLBACK_NOT_FN(closeKey, typeof func) } if (func) { instance.close(encapsulateThreeParam(func, this)) return this } // this is a Promise return instance.close() } if (server.then) { throw new AVV_ERR_ATTRIBUTE_ALREADY_DEFINED('then') } Object.defineProperty(server, 'then', { get: thenify.bind(instance) }) server[kAvvio] = true } Boot.prototype.after = function (func) { if (!func) { return this._loadRegistered() } this._addPlugin(_after.bind(this), {}, true) function _after (s, opts, done) { callWithCbOrNextTick.call(this, func, done) } return this } Boot.prototype.onClose = function (func) { // this is used to distinguish between onClose and close handlers // because they share the same queue but must be called with different signatures if (typeof func !== 'function') { throw new AVV_ERR_CALLBACK_NOT_FN('onClose', typeof func) } func[kIsOnCloseHandler] = true this._closeQ.unshift(func, (err) => { err && (this._error = err) }) return this } Boot.prototype.close = function (func) { let promise if (func) { if (typeof func !== 'function') { throw new AVV_ERR_CALLBACK_NOT_FN('close', typeof func) } } else { promise = new Promise(function (resolve, reject) { func = function (err) { if (err) { return reject(err) } resolve() } }) } this.ready(() => { this._error = null this._closeQ.push(func) process.nextTick(this._closeQ.resume.bind(this._closeQ)) }) return promise } Boot.prototype.ready = function (func) { if (func) { if (typeof func !== 'function') { throw new AVV_ERR_CALLBACK_NOT_FN('ready', typeof func) } this._readyQ.push(func) queueMicrotask(this.start.bind(this)) return } return new Promise((resolve, reject) => { this._readyQ.push(readyPromiseCB) this.start() /** * The `encapsulateThreeParam` let callback function * bind to the right server instance. * In promises we need to track the last server * instance loaded, the first one in the _current queue. */ const relativeContext = this._current[0].server function readyPromiseCB (err, context, done) { // the context is always binded to the root server if (err) { reject(err) } else { resolve(relativeContext) } process.nextTick(done) } }) } /** * @param {Plugin} plugin * @returns {void} */ Boot.prototype._trackPluginLoading = function (plugin) { const parentName = this._current[0]?.name || null plugin.once('start', (serverName, funcName, time) => { const nodeId = this.pluginTree.start(parentName || null, funcName, time) plugin.once('loaded', (serverName, funcName, time) => { this.pluginTree.stop(nodeId, time) }) }) } Boot.prototype.prettyPrint = function () { return this.pluginTree.prettyPrint() } Boot.prototype.toJSON = function () { return this.pluginTree.toJSON() } /** * @callback LoadPluginCallback * @param {Error} [err] */ /** * Load a plugin * * @param {Plugin} plugin * @param {LoadPluginCallback} callback */ Boot.prototype._loadPlugin = function (plugin, callback) { const instance = this if (isPromiseLike(plugin.func)) { plugin.func.then((fn) => { if (typeof fn.default === 'function') { fn = fn.default } plugin.func = fn this._loadPlugin(plugin, callback) }, callback) return } const last = instance._current[0] // place the plugin at the top of _current instance._current.unshift(plugin) if (instance._error && !plugin.isAfter) { debug('skipping loading of plugin as instance errored and it is not an after', plugin.name) process.nextTick(execCallback) return } let server = (last && last.server) || instance._server if (!plugin.isAfter) { // Skip override for after try { server = instance.override(server, plugin.func, plugin.options) } catch (overrideErr) { debug('override errored', plugin.name) return execCallback(overrideErr) } } plugin.exec(server, execCallback) function execCallback (err) { plugin.finish(err, (err) => { instance._current.shift() callback(err) }) } } /** * Delays plugin loading until the next tick to ensure any bound `_after` callbacks have a chance * to run prior to executing the next plugin */ Boot.prototype._loadPluginNextTick = function (plugin, callback) { process.nextTick(this._loadPlugin.bind(this), plugin, callback) } function noop () { } function callWithCbOrNextTick (func, cb) { const context = this._server const err = this._error // with this the error will appear just in the next after/ready callback this._error = null if (func.length === 0) { this._error = err executeWithThenable(func, [], cb) } else if (func.length === 1) { executeWithThenable(func, [err], cb) } else { if (this._opts.timeout === 0) { const wrapCb = (err) => { this._error = err cb(this._error) } if (func.length === 2) { func(err, wrapCb) } else { func(err, context, wrapCb) } } else { timeoutCall.call(this, func, err, context, cb) } } } function timeoutCall (func, rootErr, context, cb) { const name = func.unwrappedName ?? func.name debug('setting up ready timeout', name, this._opts.timeout) let timer = setTimeout(() => { debug('timed out', name) timer = null const toutErr = new AVV_ERR_READY_TIMEOUT(name) toutErr.fn = func this._error = toutErr cb(toutErr) }, this._opts.timeout) if (func.length === 2) { func(rootErr, timeoutCb.bind(this)) } else { func(rootErr, context, timeoutCb.bind(this)) } function timeoutCb (err) { if (timer) { clearTimeout(timer) this._error = err cb(this._error) } else { // timeout has been triggered // can not call cb twice } } } function closeWithCbOrNextTick (func, cb) { const context = this._server const isOnCloseHandler = func[kIsOnCloseHandler] if (func.length === 0 || func.length === 1) { let promise if (isOnCloseHandler) { promise = func(context) } else { promise = func(this._error) } if (promise && typeof promise.then === 'function') { debug('resolving close/onClose promise') promise.then( () => process.nextTick(cb), (e) => process.nextTick(cb, e)) } else { process.nextTick(cb) } } else if (func.length === 2) { if (isOnCloseHandler) { func(context, cb) } else { func(this._error, cb) } } else { if (isOnCloseHandler) { func(context, cb) } else { func(this._error, context, cb) } } } function encapsulateTwoParam (func, that) { return _encapsulateTwoParam.bind(that) function _encapsulateTwoParam (context, cb) { let res if (func.length === 0) { res = func() if (res && res.then) { res.then(function () { process.nextTick(cb) }, cb) } else { process.nextTick(cb) } } else if (func.length === 1) { res = func(this) if (res && res.then) { res.then(function () { process.nextTick(cb) }, cb) } else { process.nextTick(cb) } } else { func(this, cb) } } } function encapsulateThreeParam (func, that) { const wrapped = _encapsulateThreeParam.bind(that) wrapped.unwrappedName = func.name return wrapped function _encapsulateThreeParam (err, cb) { let res if (!func) { process.nextTick(cb) } else if (func.length === 0) { res = func() if (res && res.then) { res.then(function () { process.nextTick(cb, err) }, cb) } else { process.nextTick(cb, err) } } else if (func.length === 1) { res = func(err) if (res && res.then) { res.then(function () { process.nextTick(cb) }, cb) } else { process.nextTick(cb) } } else if (func.length === 2) { func(err, cb) } else { func(err, this, cb) } } } module.exports = Boot

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krtw00/search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server