bolt-protocol-v1.js•26.8 kB
"use strict";
var __assign = (this && this.__assign) || function () {
__assign = Object.assign || function(t) {
for (var s, i = 1, n = arguments.length; i < n; i++) {
s = arguments[i];
for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p))
t[p] = s[p];
}
return t;
};
return __assign.apply(this, arguments);
};
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
var desc = Object.getOwnPropertyDescriptor(m, k);
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
desc = { enumerable: true, get: function() { return m[k]; } };
}
Object.defineProperty(o, k2, desc);
}) : (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
o[k2] = m[k];
}));
var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
Object.defineProperty(o, "default", { enumerable: true, value: v });
}) : function(o, v) {
o["default"] = v;
});
var __importStar = (this && this.__importStar) || function (mod) {
if (mod && mod.__esModule) return mod;
var result = {};
if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k);
__setModuleDefault(result, mod);
return result;
};
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var bolt_protocol_util_1 = require("./bolt-protocol-util");
// eslint-disable-next-line no-unused-vars
var channel_1 = require("../channel");
var packstream_1 = require("../packstream");
var request_message_1 = __importStar(require("./request-message"));
var stream_observers_1 = require("./stream-observers");
var neo4j_driver_core_1 = require("neo4j-driver-core");
var bolt_protocol_v1_transformer_1 = __importDefault(require("./bolt-protocol-v1.transformer"));
var transformer_1 = __importDefault(require("./transformer"));
var Bookmarks = neo4j_driver_core_1.internal.bookmarks.Bookmarks, _a = neo4j_driver_core_1.internal.constants, ACCESS_MODE_WRITE = _a.ACCESS_MODE_WRITE, BOLT_PROTOCOL_V1 = _a.BOLT_PROTOCOL_V1, Logger = neo4j_driver_core_1.internal.logger.Logger, TxConfig = neo4j_driver_core_1.internal.txConfig.TxConfig;
var DEFAULT_DIAGNOSTIC_RECORD = Object.freeze({
OPERATION: '',
OPERATION_CODE: '0',
CURRENT_SCHEMA: '/'
});
var BoltProtocol = /** @class */ (function () {
/**
* @callback CreateResponseHandler Creates the response handler
* @param {BoltProtocol} protocol The bolt protocol
* @returns {ResponseHandler} The response handler
*/
/**
* @callback OnProtocolError Handles protocol error
* @param {string} error The description
*/
/**
* @constructor
* @param {Object} server the server informatio.
* @param {Chunker} chunker the chunker.
* @param {Object} packstreamConfig Packstream configuration
* @param {boolean} packstreamConfig.disableLosslessIntegers if this connection should convert all received integers to native JS numbers.
* @param {boolean} packstreamConfig.useBigInt if this connection should convert all received integers to native BigInt numbers.
* @param {CreateResponseHandler} createResponseHandler Function which creates the response handler
* @param {Logger} log the logger
* @param {OnProtocolError} onProtocolError handles protocol errors
*/
function BoltProtocol(server, chunker, _a, createResponseHandler, log, onProtocolError) {
var _b = _a === void 0 ? {} : _a, disableLosslessIntegers = _b.disableLosslessIntegers, useBigInt = _b.useBigInt;
if (createResponseHandler === void 0) { createResponseHandler = function () { return null; }; }
this._server = server || {};
this._chunker = chunker;
this._packer = this._createPacker(chunker);
this._unpacker = this._createUnpacker(disableLosslessIntegers, useBigInt);
this._responseHandler = createResponseHandler(this);
this._log = log;
this._onProtocolError = onProtocolError;
this._fatalError = null;
this._lastMessageSignature = null;
this._config = { disableLosslessIntegers: disableLosslessIntegers, useBigInt: useBigInt };
}
Object.defineProperty(BoltProtocol.prototype, "transformer", {
get: function () {
var _this = this;
if (this._transformer === undefined) {
this._transformer = new transformer_1.default(Object.values(bolt_protocol_v1_transformer_1.default).map(function (create) { return create(_this._config, _this._log); }));
}
return this._transformer;
},
enumerable: false,
configurable: true
});
Object.defineProperty(BoltProtocol.prototype, "version", {
/**
* Returns the numerical version identifier for this protocol
*/
get: function () {
return BOLT_PROTOCOL_V1;
},
enumerable: false,
configurable: true
});
Object.defineProperty(BoltProtocol.prototype, "supportsReAuth", {
/**
* @property {boolean} supportsReAuth Either if the protocol version supports re-auth or not.
*/
get: function () {
return false;
},
enumerable: false,
configurable: true
});
Object.defineProperty(BoltProtocol.prototype, "initialized", {
/**
* @property {boolean} initialized Either if the protocol was initialized or not
*/
get: function () {
return !!this._initialized;
},
enumerable: false,
configurable: true
});
Object.defineProperty(BoltProtocol.prototype, "authToken", {
/**
* @property {object} authToken The token used in the last login
*/
get: function () {
return this._authToken;
},
enumerable: false,
configurable: true
});
/**
* Get the packer.
* @return {Packer} the protocol's packer.
*/
BoltProtocol.prototype.packer = function () {
return this._packer;
};
/**
* Creates a packable function out of the provided value
* @param x the value to pack
* @returns Function
*/
BoltProtocol.prototype.packable = function (x) {
return this._packer.packable(x, this.transformer.toStructure);
};
/**
* Get the unpacker.
* @return {Unpacker} the protocol's unpacker.
*/
BoltProtocol.prototype.unpacker = function () {
return this._unpacker;
};
/**
* Unpack a buffer
* @param {Buffer} buf
* @returns {any|null} The unpacked value
*/
BoltProtocol.prototype.unpack = function (buf) {
return this._unpacker.unpack(buf, this.transformer.fromStructure);
};
/**
* Transform metadata received in SUCCESS message before it is passed to the handler.
* @param {Object} metadata the received metadata.
* @return {Object} transformed metadata.
*/
BoltProtocol.prototype.transformMetadata = function (metadata) {
return metadata;
};
BoltProtocol.prototype.enrichErrorMetadata = function (metadata) {
return __assign(__assign({}, metadata), { diagnostic_record: metadata.diagnostic_record !== null ? __assign(__assign({}, DEFAULT_DIAGNOSTIC_RECORD), metadata.diagnostic_record) : null });
};
/**
* Perform initialization and authentication of the underlying connection.
* @param {Object} param
* @param {string} param.userAgent the user agent.
* @param {Object} param.authToken the authentication token.
* @param {NotificationFilter} param.notificationFilter the notification filter.
* @param {function(err: Error)} param.onError the callback to invoke on error.
* @param {function()} param.onComplete the callback to invoke on completion.
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
*/
BoltProtocol.prototype.initialize = function (_a) {
var _this = this;
var _b = _a === void 0 ? {} : _a, userAgent = _b.userAgent, boltAgent = _b.boltAgent, authToken = _b.authToken, notificationFilter = _b.notificationFilter, onError = _b.onError, onComplete = _b.onComplete;
var observer = new stream_observers_1.LoginObserver({
onError: function (error) { return _this._onLoginError(error, onError); },
onCompleted: function (metadata) { return _this._onLoginCompleted(metadata, onComplete); }
});
// passing notification filter on this protocol version throws an error
(0, bolt_protocol_util_1.assertNotificationFilterIsEmpty)(notificationFilter, this._onProtocolError, observer);
this.write(request_message_1.default.init(userAgent, authToken), observer, true);
return observer;
};
/**
* Performs logoff of the underlying connection
*
* @param {Object} param
* @param {function(err: Error)} param.onError the callback to invoke on error.
* @param {function()} param.onComplete the callback to invoke on completion.
* @param {boolean} param.flush whether to flush the buffered messages.
*
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
*/
BoltProtocol.prototype.logoff = function (_a) {
var _b = _a === void 0 ? {} : _a, onComplete = _b.onComplete, onError = _b.onError, flush = _b.flush;
var observer = new stream_observers_1.LogoffObserver({
onCompleted: onComplete,
onError: onError
});
// TODO: Verify the Neo4j version in the message
var error = (0, neo4j_driver_core_1.newError)('Driver is connected to a database that does not support logoff. ' +
'Please upgrade to Neo4j 5.5.0 or later in order to use this functionality.');
// unsupported API was used, consider this a fatal error for the current connection
this._onProtocolError(error.message);
observer.onError(error);
throw error;
};
/**
* Performs login of the underlying connection
*
* @param {Object} args
* @param {Object} args.authToken the authentication token.
* @param {function(err: Error)} args.onError the callback to invoke on error.
* @param {function()} args.onComplete the callback to invoke on completion.
* @param {boolean} args.flush whether to flush the buffered messages.
*
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
*/
BoltProtocol.prototype.logon = function (_a) {
var _this = this;
var _b = _a === void 0 ? {} : _a, authToken = _b.authToken, onComplete = _b.onComplete, onError = _b.onError, flush = _b.flush;
var observer = new stream_observers_1.LoginObserver({
onCompleted: function () { return _this._onLoginCompleted({}, authToken, onComplete); },
onError: function (error) { return _this._onLoginError(error, onError); }
});
// TODO: Verify the Neo4j version in the message
var error = (0, neo4j_driver_core_1.newError)('Driver is connected to a database that does not support logon. ' +
'Please upgrade to Neo4j 5.5.0 or later in order to use this functionality.');
// unsupported API was used, consider this a fatal error for the current connection
this._onProtocolError(error.message);
observer.onError(error);
throw error;
};
/**
* Perform protocol related operations for closing this connection
*/
BoltProtocol.prototype.prepareToClose = function () {
// no need to notify the database in this protocol version
};
/**
* Begin an explicit transaction.
* @param {Object} param
* @param {Bookmarks} param.bookmarks the bookmarks.
* @param {TxConfig} param.txConfig the configuration.
* @param {string} param.database the target database name.
* @param {string} param.mode the access mode.
* @param {string} param.impersonatedUser the impersonated user
* @param {NotificationFilter} param.notificationFilter the notification filter.
* @param {function(err: Error)} param.beforeError the callback to invoke before handling the error.
* @param {function(err: Error)} param.afterError the callback to invoke after handling the error.
* @param {function()} param.beforeComplete the callback to invoke before handling the completion.
* @param {function()} param.afterComplete the callback to invoke after handling the completion.
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
*/
BoltProtocol.prototype.beginTransaction = function (_a) {
var _b = _a === void 0 ? {} : _a, bookmarks = _b.bookmarks, txConfig = _b.txConfig, database = _b.database, mode = _b.mode, impersonatedUser = _b.impersonatedUser, notificationFilter = _b.notificationFilter, beforeError = _b.beforeError, afterError = _b.afterError, beforeComplete = _b.beforeComplete, afterComplete = _b.afterComplete;
return this.run('BEGIN', bookmarks ? bookmarks.asBeginTransactionParameters() : {}, {
bookmarks: bookmarks,
txConfig: txConfig,
database: database,
mode: mode,
impersonatedUser: impersonatedUser,
notificationFilter: notificationFilter,
beforeError: beforeError,
afterError: afterError,
beforeComplete: beforeComplete,
afterComplete: afterComplete,
flush: false
});
};
/**
* Commit the explicit transaction.
* @param {Object} param
* @param {function(err: Error)} param.beforeError the callback to invoke before handling the error.
* @param {function(err: Error)} param.afterError the callback to invoke after handling the error.
* @param {function()} param.beforeComplete the callback to invoke before handling the completion.
* @param {function()} param.afterComplete the callback to invoke after handling the completion.
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
*/
BoltProtocol.prototype.commitTransaction = function (_a) {
var _b = _a === void 0 ? {} : _a, beforeError = _b.beforeError, afterError = _b.afterError, beforeComplete = _b.beforeComplete, afterComplete = _b.afterComplete;
// WRITE access mode is used as a place holder here, it has
// no effect on behaviour for Bolt V1 & V2
return this.run('COMMIT', {}, {
bookmarks: Bookmarks.empty(),
txConfig: TxConfig.empty(),
mode: ACCESS_MODE_WRITE,
beforeError: beforeError,
afterError: afterError,
beforeComplete: beforeComplete,
afterComplete: afterComplete
});
};
/**
* Rollback the explicit transaction.
* @param {Object} param
* @param {function(err: Error)} param.beforeError the callback to invoke before handling the error.
* @param {function(err: Error)} param.afterError the callback to invoke after handling the error.
* @param {function()} param.beforeComplete the callback to invoke before handling the completion.
* @param {function()} param.afterComplete the callback to invoke after handling the completion.
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
*/
BoltProtocol.prototype.rollbackTransaction = function (_a) {
var _b = _a === void 0 ? {} : _a, beforeError = _b.beforeError, afterError = _b.afterError, beforeComplete = _b.beforeComplete, afterComplete = _b.afterComplete;
// WRITE access mode is used as a place holder here, it has
// no effect on behaviour for Bolt V1 & V2
return this.run('ROLLBACK', {}, {
bookmarks: Bookmarks.empty(),
txConfig: TxConfig.empty(),
mode: ACCESS_MODE_WRITE,
beforeError: beforeError,
afterError: afterError,
beforeComplete: beforeComplete,
afterComplete: afterComplete
});
};
/**
* Send a Cypher query through the underlying connection.
* @param {string} query the cypher query.
* @param {Object} parameters the query parameters.
* @param {Object} param
* @param {Bookmarks} param.bookmarks the bookmarks.
* @param {TxConfig} param.txConfig the transaction configuration.
* @param {string} param.database the target database name.
* @param {string} param.impersonatedUser the impersonated user
* @param {NotificationFilter} param.notificationFilter the notification filter.
* @param {string} param.mode the access mode.
* @param {function(keys: string[])} param.beforeKeys the callback to invoke before handling the keys.
* @param {function(keys: string[])} param.afterKeys the callback to invoke after handling the keys.
* @param {function(err: Error)} param.beforeError the callback to invoke before handling the error.
* @param {function(err: Error)} param.afterError the callback to invoke after handling the error.
* @param {function()} param.beforeComplete the callback to invoke before handling the completion.
* @param {function()} param.afterComplete the callback to invoke after handling the completion.
* @param {boolean} param.flush whether to flush the buffered messages.
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
*/
BoltProtocol.prototype.run = function (query, parameters, _a) {
var _b = _a === void 0 ? {} : _a, bookmarks = _b.bookmarks, txConfig = _b.txConfig, database = _b.database, mode = _b.mode, impersonatedUser = _b.impersonatedUser, notificationFilter = _b.notificationFilter, beforeKeys = _b.beforeKeys, afterKeys = _b.afterKeys, beforeError = _b.beforeError, afterError = _b.afterError, beforeComplete = _b.beforeComplete, afterComplete = _b.afterComplete, _c = _b.flush, flush = _c === void 0 ? true : _c, _d = _b.highRecordWatermark, highRecordWatermark = _d === void 0 ? Number.MAX_VALUE : _d, _e = _b.lowRecordWatermark, lowRecordWatermark = _e === void 0 ? Number.MAX_VALUE : _e;
var observer = new stream_observers_1.ResultStreamObserver({
server: this._server,
beforeKeys: beforeKeys,
afterKeys: afterKeys,
beforeError: beforeError,
afterError: afterError,
beforeComplete: beforeComplete,
afterComplete: afterComplete,
highRecordWatermark: highRecordWatermark,
lowRecordWatermark: lowRecordWatermark
});
// bookmarks and mode are ignored in this version of the protocol
(0, bolt_protocol_util_1.assertTxConfigIsEmpty)(txConfig, this._onProtocolError, observer);
// passing in a database name on this protocol version throws an error
(0, bolt_protocol_util_1.assertDatabaseIsEmpty)(database, this._onProtocolError, observer);
// passing impersonated user on this protocol version throws an error
(0, bolt_protocol_util_1.assertImpersonatedUserIsEmpty)(impersonatedUser, this._onProtocolError, observer);
// passing notification filter on this protocol version throws an error
(0, bolt_protocol_util_1.assertNotificationFilterIsEmpty)(notificationFilter, this._onProtocolError, observer);
this.write(request_message_1.default.run(query, parameters), observer, false);
this.write(request_message_1.default.pullAll(), observer, flush);
return observer;
};
Object.defineProperty(BoltProtocol.prototype, "currentFailure", {
get: function () {
return this._responseHandler.currentFailure;
},
enumerable: false,
configurable: true
});
/**
* Send a RESET through the underlying connection.
* @param {Object} param
* @param {function(err: Error)} param.onError the callback to invoke on error.
* @param {function()} param.onComplete the callback to invoke on completion.
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
*/
BoltProtocol.prototype.reset = function (_a) {
var _b = _a === void 0 ? {} : _a, onError = _b.onError, onComplete = _b.onComplete;
var observer = new stream_observers_1.ResetObserver({
onProtocolError: this._onProtocolError,
onError: onError,
onComplete: onComplete
});
this.write(request_message_1.default.reset(), observer, true);
return observer;
};
/**
* Send a TELEMETRY through the underlying connection.
*
* @param {object} param0 Message params
* @param {number} param0.api The API called
* @param {object} param1 Configuration and callbacks
* @param {function()} param1.onCompleted Called when completed
* @param {function()} param1.onError Called when error
* @return {StreamObserver} the stream observer that monitors the corresponding server response.
*/
BoltProtocol.prototype.telemetry = function (_a, _b) {
var api = _a.api;
var _c = _b === void 0 ? {} : _b, onError = _c.onError, onCompleted = _c.onCompleted;
var observer = new stream_observers_1.CompletedObserver();
if (onCompleted) {
onCompleted();
}
return observer;
};
BoltProtocol.prototype._createPacker = function (chunker) {
return new packstream_1.v1.Packer(chunker);
};
BoltProtocol.prototype._createUnpacker = function (disableLosslessIntegers, useBigInt) {
return new packstream_1.v1.Unpacker(disableLosslessIntegers, useBigInt);
};
/**
* Write a message to the network channel.
* @param {RequestMessage} message the message to write.
* @param {StreamObserver} observer the response observer.
* @param {boolean} flush `true` if flush should happen after the message is written to the buffer.
*/
BoltProtocol.prototype.write = function (message, observer, flush) {
var queued = this.queueObserverIfProtocolIsNotBroken(observer);
if (queued) {
if (this._log.isDebugEnabled()) {
this._log.debug("C: ".concat(message));
}
this._lastMessageSignature = message.signature;
var messageStruct = new packstream_1.structure.Structure(message.signature, message.fields);
this.packable(messageStruct)();
this._chunker.messageBoundary();
if (flush) {
this._chunker.flush();
}
}
};
BoltProtocol.prototype.isLastMessageLogon = function () {
return this._lastMessageSignature === request_message_1.SIGNATURES.HELLO ||
this._lastMessageSignature === request_message_1.SIGNATURES.LOGON;
};
BoltProtocol.prototype.isLastMessageReset = function () {
return this._lastMessageSignature === request_message_1.SIGNATURES.RESET;
};
/**
* Notifies faltal erros to the observers and mark the protocol in the fatal error state.
* @param {Error} error The error
*/
BoltProtocol.prototype.notifyFatalError = function (error) {
this._fatalError = error;
return this._responseHandler._notifyErrorToObservers(error);
};
/**
* Updates the the current observer with the next one on the queue.
*/
BoltProtocol.prototype.updateCurrentObserver = function () {
return this._responseHandler._updateCurrentObserver();
};
/**
* Checks if exist an ongoing observable requests
* @return {boolean}
*/
BoltProtocol.prototype.hasOngoingObservableRequests = function () {
return this._responseHandler.hasOngoingObservableRequests();
};
/**
* Enqueue the observer if the protocol is not broken.
* In case it's broken, the observer will be notified about the error.
*
* @param {StreamObserver} observer The observer
* @returns {boolean} if it was queued
*/
BoltProtocol.prototype.queueObserverIfProtocolIsNotBroken = function (observer) {
if (this.isBroken()) {
this.notifyFatalErrorToObserver(observer);
return false;
}
return this._responseHandler._queueObserver(observer);
};
/**
* Veritfy the protocol is not broken.
* @returns {boolean}
*/
BoltProtocol.prototype.isBroken = function () {
return !!this._fatalError;
};
/**
* Notifies the current fatal error to the observer
*
* @param {StreamObserver} observer The observer
*/
BoltProtocol.prototype.notifyFatalErrorToObserver = function (observer) {
if (observer && observer.onError) {
observer.onError(this._fatalError);
}
};
/**
* Reset current failure on the observable response handler to null.
*/
BoltProtocol.prototype.resetFailure = function () {
this._responseHandler._resetFailure();
};
BoltProtocol.prototype._onLoginCompleted = function (metadata, authToken, onCompleted) {
this._initialized = true;
this._authToken = authToken;
if (metadata) {
var serverVersion = metadata.server;
if (!this._server.version) {
this._server.version = serverVersion;
}
}
if (onCompleted) {
onCompleted(metadata);
}
};
BoltProtocol.prototype._onLoginError = function (error, onError) {
this._onProtocolError(error.message);
if (onError) {
onError(error);
}
};
return BoltProtocol;
}());
exports.default = BoltProtocol;