response-handler.js•9.69 kB
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var neo4j_driver_core_1 = require("neo4j-driver-core");
// Signature bytes for each response message type
var SUCCESS = 0x70; // 0111 0000 // SUCCESS <metadata>
var RECORD = 0x71; // 0111 0001 // RECORD <value>
var IGNORED = 0x7e; // 0111 1110 // IGNORED <metadata>
var FAILURE = 0x7f; // 0111 1111 // FAILURE <metadata>
function NO_OP() { }
function NO_OP_IDENTITY(subject) {
return subject;
}
var NO_OP_OBSERVER = {
onNext: NO_OP,
onCompleted: NO_OP,
onError: NO_OP
};
/**
* Treat the protocol responses and notify the observers
*/
var ResponseHandler = /** @class */ (function () {
/**
* Called when something went wrong with the connectio
* @callback ResponseHandler~Observer~OnErrorApplyTransformation
* @param {any} error The error
* @returns {any} The new error
*/
/**
* Called when something went wrong with the connectio
* @callback ResponseHandler~Observer~OnError
* @param {any} error The error
*/
/**
* Called when something went wrong with the connectio
* @callback ResponseHandler~MetadataTransformer
* @param {any} metadata The metadata got onSuccess
* @returns {any} The transformed metadata
*/
/**
* @typedef {Object} ResponseHandler~Observer
* @property {ResponseHandler~Observer~OnError} onError Invoke when a connection error occurs
* @property {ResponseHandler~Observer~OnError} onFailure Invoke when a protocol failure occurs
* @property {ResponseHandler~Observer~OnErrorApplyTransformation} onErrorApplyTransformation Invoke just after the failure occurs,
* before notify to respective observer. This method should transform the failure reason to the approprited one.
*/
/**
* Constructor
* @param {Object} param The params
* @param {ResponseHandler~MetadataTransformer} transformMetadata Transform metadata when the SUCCESS is received.
* @param {Channel} channel The channel used to exchange messages
* @param {Logger} log The logger
* @param {ResponseHandler~Observer} observer Object which will be notified about errors
*/
function ResponseHandler(_a) {
var _b = _a === void 0 ? {} : _a, transformMetadata = _b.transformMetadata, enrichErrorMetadata = _b.enrichErrorMetadata, log = _b.log, observer = _b.observer;
this._pendingObservers = [];
this._log = log;
this._transformMetadata = transformMetadata || NO_OP_IDENTITY;
this._enrichErrorMetadata = enrichErrorMetadata || NO_OP_IDENTITY;
this._observer = Object.assign({
onObserversCountChange: NO_OP,
onError: NO_OP,
onFailure: NO_OP,
onErrorApplyTransformation: NO_OP_IDENTITY
}, observer);
}
Object.defineProperty(ResponseHandler.prototype, "currentFailure", {
get: function () {
return this._currentFailure;
},
enumerable: false,
configurable: true
});
ResponseHandler.prototype.handleResponse = function (msg) {
var payload = msg.fields[0];
switch (msg.signature) {
case RECORD:
if (this._log.isDebugEnabled()) {
this._log.debug("S: RECORD ".concat(neo4j_driver_core_1.json.stringify(msg)));
}
this._currentObserver.onNext(payload);
break;
case SUCCESS:
if (this._log.isDebugEnabled()) {
this._log.debug("S: SUCCESS ".concat(neo4j_driver_core_1.json.stringify(msg)));
}
try {
var metadata = this._transformMetadata(payload);
this._currentObserver.onCompleted(metadata);
}
finally {
this._updateCurrentObserver();
}
break;
case FAILURE:
if (this._log.isDebugEnabled()) {
this._log.debug("S: FAILURE ".concat(neo4j_driver_core_1.json.stringify(msg)));
}
try {
this._currentFailure = this._handleErrorPayload(this._enrichErrorMetadata(payload));
this._currentObserver.onError(this._currentFailure);
}
finally {
this._updateCurrentObserver();
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
this._observer.onFailure(this._currentFailure);
}
break;
case IGNORED:
if (this._log.isDebugEnabled()) {
this._log.debug("S: IGNORED ".concat(neo4j_driver_core_1.json.stringify(msg)));
}
try {
if (this._currentFailure && this._currentObserver.onError) {
this._currentObserver.onError(this._currentFailure);
}
else if (this._currentObserver.onError) {
this._currentObserver.onError((0, neo4j_driver_core_1.newError)('Ignored either because of an error or RESET'));
}
}
finally {
this._updateCurrentObserver();
}
break;
default:
this._observer.onError((0, neo4j_driver_core_1.newError)('Unknown Bolt protocol message: ' + msg));
}
};
/*
* Pop next pending observer form the list of observers and make it current observer.
* @protected
*/
ResponseHandler.prototype._updateCurrentObserver = function () {
this._currentObserver = this._pendingObservers.shift();
this._observer.onObserversCountChange(this._observersCount);
};
Object.defineProperty(ResponseHandler.prototype, "_observersCount", {
get: function () {
return this._currentObserver == null ? this._pendingObservers.length : this._pendingObservers.length + 1;
},
enumerable: false,
configurable: true
});
ResponseHandler.prototype._queueObserver = function (observer) {
observer = observer || NO_OP_OBSERVER;
observer.onCompleted = observer.onCompleted || NO_OP;
observer.onError = observer.onError || NO_OP;
observer.onNext = observer.onNext || NO_OP;
if (this._currentObserver === undefined) {
this._currentObserver = observer;
}
else {
this._pendingObservers.push(observer);
}
this._observer.onObserversCountChange(this._observersCount);
return true;
};
ResponseHandler.prototype._notifyErrorToObservers = function (error) {
if (this._currentObserver && this._currentObserver.onError) {
this._currentObserver.onError(error);
}
while (this._pendingObservers.length > 0) {
var observer = this._pendingObservers.shift();
if (observer && observer.onError) {
observer.onError(error);
}
}
};
ResponseHandler.prototype.hasOngoingObservableRequests = function () {
return this._currentObserver != null || this._pendingObservers.length > 0;
};
ResponseHandler.prototype._resetFailure = function () {
this._currentFailure = null;
};
ResponseHandler.prototype._handleErrorPayload = function (payload) {
var standardizedCode = _standardizeCode(payload.code);
var cause = payload.cause != null ? this._handleErrorCause(payload.cause) : undefined;
var error = (0, neo4j_driver_core_1.newError)(payload.message, standardizedCode, cause, payload.gql_status, payload.description, payload.diagnostic_record);
return this._observer.onErrorApplyTransformation(error);
};
ResponseHandler.prototype._handleErrorCause = function (payload) {
var cause = payload.cause != null ? this._handleErrorCause(payload.cause) : undefined;
var error = (0, neo4j_driver_core_1.newGQLError)(payload.message, cause, payload.gql_status, payload.description, payload.diagnostic_record);
return this._observer.onErrorApplyTransformation(error);
};
return ResponseHandler;
}());
exports.default = ResponseHandler;
/**
* Standardize error classification that are different between 5.x and previous versions.
*
* The transient error were clean-up for being retrieable and because of this
* `Terminated` and `LockClientStopped` were reclassified as `ClientError`.
*
* @param {string} code
* @returns {string} the standardized error code
*/
function _standardizeCode(code) {
if (code === 'Neo.TransientError.Transaction.Terminated') {
return 'Neo.ClientError.Transaction.Terminated';
}
else if (code === 'Neo.TransientError.Transaction.LockClientStopped') {
return 'Neo.ClientError.Transaction.LockClientStopped';
}
return code;
}