stream-observers.js•28.2 kB
"use strict";
var __extends = (this && this.__extends) || (function () {
var extendStatics = function (d, b) {
extendStatics = Object.setPrototypeOf ||
({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) ||
function (d, b) { for (var p in b) if (Object.prototype.hasOwnProperty.call(b, p)) d[p] = b[p]; };
return extendStatics(d, b);
};
return function (d, b) {
if (typeof b !== "function" && b !== null)
throw new TypeError("Class extends value " + String(b) + " is not a constructor or null");
extendStatics(d, b);
function __() { this.constructor = d; }
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
};
})();
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.TelemetryObserver = exports.ProcedureRouteObserver = exports.RouteObserver = exports.CompletedObserver = exports.FailedObserver = exports.ResetObserver = exports.LogoffObserver = exports.LoginObserver = exports.ResultStreamObserver = exports.StreamObserver = void 0;
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var neo4j_driver_core_1 = require("neo4j-driver-core");
var routing_table_raw_1 = __importDefault(require("./routing-table-raw"));
var lang_1 = require("../lang");
var FETCH_ALL = neo4j_driver_core_1.internal.constants.FETCH_ALL;
var PROTOCOL_ERROR = neo4j_driver_core_1.error.PROTOCOL_ERROR;
var StreamObserver = /** @class */ (function () {
function StreamObserver() {
}
StreamObserver.prototype.onNext = function (rawRecord) { };
StreamObserver.prototype.onError = function (_error) { };
StreamObserver.prototype.onCompleted = function (meta) { };
return StreamObserver;
}());
exports.StreamObserver = StreamObserver;
/**
* Handles a RUN/PULL_ALL, or RUN/DISCARD_ALL requests, maps the responses
* in a way that a user-provided observer can see these as a clean Stream
* of records.
* This class will queue up incoming messages until a user-provided observer
* for the incoming stream is registered. Thus, we keep fields around
* for tracking head/records/tail. These are only used if there is no
* observer registered.
* @access private
*/
var ResultStreamObserver = /** @class */ (function (_super) {
__extends(ResultStreamObserver, _super);
/**
*
* @param {Object} param
* @param {Object} param.server
* @param {boolean} param.reactive
* @param {function(stmtId: number|Integer, n: number|Integer, observer: StreamObserver)} param.moreFunction -
* @param {function(stmtId: number|Integer, observer: StreamObserver)} param.discardFunction -
* @param {number|Integer} param.fetchSize -
* @param {function(err: Error): Promise|void} param.beforeError -
* @param {function(err: Error): Promise|void} param.afterError -
* @param {function(keys: string[]): Promise|void} param.beforeKeys -
* @param {function(keys: string[]): Promise|void} param.afterKeys -
* @param {function(metadata: Object): Promise|void} param.beforeComplete -
* @param {function(metadata: Object): Promise|void} param.afterComplete -
* @param {function(metadata: Object): Promise|void} param.enrichMetadata -
*/
function ResultStreamObserver(_a) {
var _b = _a === void 0 ? {} : _a, _c = _b.reactive, reactive = _c === void 0 ? false : _c, moreFunction = _b.moreFunction, discardFunction = _b.discardFunction, _d = _b.fetchSize, fetchSize = _d === void 0 ? FETCH_ALL : _d, beforeError = _b.beforeError, afterError = _b.afterError, beforeKeys = _b.beforeKeys, afterKeys = _b.afterKeys, beforeComplete = _b.beforeComplete, afterComplete = _b.afterComplete, server = _b.server, _e = _b.highRecordWatermark, highRecordWatermark = _e === void 0 ? Number.MAX_VALUE : _e, _f = _b.lowRecordWatermark, lowRecordWatermark = _f === void 0 ? Number.MAX_VALUE : _f, enrichMetadata = _b.enrichMetadata, onDb = _b.onDb;
var _this = _super.call(this) || this;
_this._fieldKeys = null;
_this._fieldLookup = null;
_this._head = null;
_this._queuedRecords = [];
_this._tail = null;
_this._error = null;
_this._observers = [];
_this._meta = {};
_this._server = server;
_this._beforeError = beforeError;
_this._afterError = afterError;
_this._beforeKeys = beforeKeys;
_this._afterKeys = afterKeys;
_this._beforeComplete = beforeComplete;
_this._afterComplete = afterComplete;
_this._enrichMetadata = enrichMetadata || lang_1.functional.identity;
_this._queryId = null;
_this._moreFunction = moreFunction;
_this._discardFunction = discardFunction;
_this._discard = false;
_this._fetchSize = fetchSize;
_this._lowRecordWatermark = lowRecordWatermark;
_this._highRecordWatermark = highRecordWatermark;
_this._setState(reactive ? _states.READY : _states.READY_STREAMING);
_this._setupAutoPull();
_this._paused = false;
_this._pulled = !reactive;
_this._haveRecordStreamed = false;
_this._onDb = onDb;
return _this;
}
/**
* Pause the record consuming
*
* This function will supend the record consuming. It will not cancel the stream and the already
* requested records will be sent to the subscriber.
*/
ResultStreamObserver.prototype.pause = function () {
this._paused = true;
};
/**
* Resume the record consuming
*
* This function will resume the record consuming fetching more records from the server.
*/
ResultStreamObserver.prototype.resume = function () {
this._paused = false;
this._setupAutoPull(true);
this._state.pull(this);
};
/**
* Will be called on every record that comes in and transform a raw record
* to a Object. If user-provided observer is present, pass transformed record
* to it's onNext method, otherwise, push to record que.
* @param {Array} rawRecord - An array with the raw record
*/
ResultStreamObserver.prototype.onNext = function (rawRecord) {
this._haveRecordStreamed = true;
var record = new neo4j_driver_core_1.Record(this._fieldKeys, rawRecord, this._fieldLookup);
if (this._observers.some(function (o) { return o.onNext; })) {
this._observers.forEach(function (o) {
if (o.onNext) {
o.onNext(record);
}
});
}
else {
this._queuedRecords.push(record);
if (this._queuedRecords.length > this._highRecordWatermark) {
this._autoPull = false;
}
}
};
ResultStreamObserver.prototype.onCompleted = function (meta) {
this._state.onSuccess(this, meta);
};
/**
* Will be called on errors.
* If user-provided observer is present, pass the error
* to it's onError method, otherwise set instance variable _error.
* @param {Object} error - An error object
*/
ResultStreamObserver.prototype.onError = function (error) {
this._state.onError(this, error);
};
/**
* Cancel pending record stream
*/
ResultStreamObserver.prototype.cancel = function () {
this._discard = true;
};
/**
* Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL.
* Response for RUN initializes query keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream.
*
* However, some operations can be represented as a single message which receives full metadata in a single response.
* For example, operations to begin, commit and rollback an explicit transaction use two messages in Bolt V1 but a single message in Bolt V3.
* Messages are `RUN "BEGIN" {}` + `PULL_ALL` in Bolt V1 and `BEGIN` in Bolt V3.
*
* This function prepares the observer to only handle a single response message.
*/
ResultStreamObserver.prototype.prepareToHandleSingleResponse = function () {
this._head = [];
this._fieldKeys = [];
this._setState(_states.STREAMING);
};
/**
* Mark this observer as if it has completed with no metadata.
*/
ResultStreamObserver.prototype.markCompleted = function () {
this._head = [];
this._fieldKeys = [];
this._tail = {};
this._setState(_states.SUCCEEDED);
};
/**
* Subscribe to events with provided observer.
* @param {Object} observer - Observer object
* @param {function(keys: String[])} observer.onKeys - Handle stream header, field keys.
* @param {function(record: Object)} observer.onNext - Handle records, one by one.
* @param {function(metadata: Object)} observer.onCompleted - Handle stream tail, the metadata.
* @param {function(error: Object)} observer.onError - Handle errors, should always be provided.
*/
ResultStreamObserver.prototype.subscribe = function (observer) {
if (this._head && observer.onKeys) {
observer.onKeys(this._head);
}
if (this._queuedRecords.length > 0 && observer.onNext) {
for (var i = 0; i < this._queuedRecords.length; i++) {
observer.onNext(this._queuedRecords[i]);
if (this._queuedRecords.length - i - 1 <= this._lowRecordWatermark) {
this._autoPull = true;
if (this._state === _states.READY) {
this._handleStreaming();
}
}
}
}
if (this._tail && observer.onCompleted) {
observer.onCompleted(this._tail);
}
if (this._error) {
observer.onError(this._error);
}
this._observers.push(observer);
if (this._state === _states.READY) {
this._handleStreaming();
}
};
ResultStreamObserver.prototype._handleHasMore = function (meta) {
// We've consumed current batch and server notified us that there're more
// records to stream. Let's invoke more or discard function based on whether
// the user wants to discard streaming or not
this._setState(_states.READY); // we've done streaming
this._handleStreaming();
delete meta.has_more;
};
ResultStreamObserver.prototype._handlePullSuccess = function (meta) {
var _this = this;
var completionMetadata = this._enrichMetadata(Object.assign(this._server ? { server: this._server } : {}, this._meta, {
stream_summary: {
have_records_streamed: this._haveRecordStreamed,
pulled: this._pulled,
has_keys: this._fieldKeys.length > 0
}
}, meta));
if (![undefined, null, 'r', 'w', 'rw', 's'].includes(completionMetadata.type)) {
this.onError((0, neo4j_driver_core_1.newError)("Server returned invalid query type. Expected one of [undefined, null, \"r\", \"w\", \"rw\", \"s\"] but got '".concat(completionMetadata.type, "'"), PROTOCOL_ERROR));
return;
}
this._setState(_states.SUCCEEDED);
var beforeHandlerResult = null;
if (this._beforeComplete) {
beforeHandlerResult = this._beforeComplete(completionMetadata);
}
var continuation = function () {
// End of stream
_this._tail = completionMetadata;
if (_this._observers.some(function (o) { return o.onCompleted; })) {
_this._observers.forEach(function (o) {
if (o.onCompleted) {
o.onCompleted(completionMetadata);
}
});
}
if (_this._afterComplete) {
_this._afterComplete(completionMetadata);
}
};
if (beforeHandlerResult) {
Promise.resolve(beforeHandlerResult).then(function () { return continuation(); });
}
else {
continuation();
}
};
ResultStreamObserver.prototype._handleRunSuccess = function (meta, afterSuccess) {
var _this = this;
if (this._fieldKeys === null) {
// Stream header, build a name->index field lookup table
// to be used by records. This is an optimization to make it
// faster to look up fields in a record by name, rather than by index.
// Since the records we get back via Bolt are just arrays of values.
this._fieldKeys = [];
this._fieldLookup = {};
if (meta.fields && meta.fields.length > 0) {
this._fieldKeys = meta.fields;
for (var i = 0; i < meta.fields.length; i++) {
this._fieldLookup[meta.fields[i]] = i;
}
}
if (meta.db !== null && this._onDb !== undefined) {
this._onDb(meta.db);
}
if (meta.fields != null) {
// remove fields key from metadata object
delete meta.fields;
}
// Extract server generated query id for use in requestMore and discard
// functions
if (meta.qid !== null && meta.qid !== undefined) {
this._queryId = meta.qid;
// remove qid from metadata object
delete meta.qid;
}
this._storeMetadataForCompletion(meta);
var beforeHandlerResult = null;
if (this._beforeKeys) {
beforeHandlerResult = this._beforeKeys(this._fieldKeys);
}
var continuation_1 = function () {
_this._head = _this._fieldKeys;
if (_this._observers.some(function (o) { return o.onKeys; })) {
_this._observers.forEach(function (o) {
if (o.onKeys) {
o.onKeys(_this._fieldKeys);
}
});
}
if (_this._afterKeys) {
_this._afterKeys(_this._fieldKeys);
}
afterSuccess();
};
if (beforeHandlerResult) {
Promise.resolve(beforeHandlerResult).then(function () { return continuation_1(); });
}
else {
continuation_1();
}
}
};
ResultStreamObserver.prototype._handleError = function (error) {
var _this = this;
this._setState(_states.FAILED);
this._error = error;
var beforeHandlerResult = null;
if (this._beforeError) {
beforeHandlerResult = this._beforeError(error);
}
var continuation = function () {
if (_this._observers.some(function (o) { return o.onError; })) {
_this._observers.forEach(function (o) {
if (o.onError) {
o.onError(error);
}
});
}
if (_this._afterError) {
_this._afterError(error);
}
};
if (beforeHandlerResult) {
Promise.resolve(beforeHandlerResult).then(function () { return continuation(); });
}
else {
continuation();
}
};
ResultStreamObserver.prototype._handleStreaming = function () {
if (this._head && this._observers.some(function (o) { return o.onNext || o.onCompleted; })) {
if (!this._paused && (this._discard || this._autoPull)) {
this._more();
}
}
};
ResultStreamObserver.prototype._more = function () {
if (this._discard) {
this._discardFunction(this._queryId, this);
}
else {
this._pulled = true;
this._moreFunction(this._queryId, this._fetchSize, this);
}
this._setState(_states.STREAMING);
};
ResultStreamObserver.prototype._storeMetadataForCompletion = function (meta) {
var keys = Object.keys(meta);
var index = keys.length;
var key = '';
while (index--) {
key = keys[index];
this._meta[key] = meta[key];
}
};
ResultStreamObserver.prototype._setState = function (state) {
this._state = state;
};
ResultStreamObserver.prototype._setupAutoPull = function () {
this._autoPull = true;
};
return ResultStreamObserver;
}(StreamObserver));
exports.ResultStreamObserver = ResultStreamObserver;
var LoginObserver = /** @class */ (function (_super) {
__extends(LoginObserver, _super);
/**
*
* @param {Object} param -
* @param {function(err: Error)} param.onError
* @param {function(metadata)} param.onCompleted
*/
function LoginObserver(_a) {
var _b = _a === void 0 ? {} : _a, onError = _b.onError, onCompleted = _b.onCompleted;
var _this = _super.call(this) || this;
_this._onError = onError;
_this._onCompleted = onCompleted;
return _this;
}
LoginObserver.prototype.onNext = function (record) {
this.onError((0, neo4j_driver_core_1.newError)('Received RECORD when initializing ' + neo4j_driver_core_1.json.stringify(record)));
};
LoginObserver.prototype.onError = function (error) {
if (this._onError) {
this._onError(error);
}
};
LoginObserver.prototype.onCompleted = function (metadata) {
if (this._onCompleted) {
this._onCompleted(metadata);
}
};
return LoginObserver;
}(StreamObserver));
exports.LoginObserver = LoginObserver;
var LogoffObserver = /** @class */ (function (_super) {
__extends(LogoffObserver, _super);
/**
*
* @param {Object} param -
* @param {function(err: Error)} param.onError
* @param {function(metadata)} param.onCompleted
*/
function LogoffObserver(_a) {
var _b = _a === void 0 ? {} : _a, onError = _b.onError, onCompleted = _b.onCompleted;
var _this = _super.call(this) || this;
_this._onError = onError;
_this._onCompleted = onCompleted;
return _this;
}
LogoffObserver.prototype.onNext = function (record) {
this.onError((0, neo4j_driver_core_1.newError)('Received RECORD when logging off ' + neo4j_driver_core_1.json.stringify(record)));
};
LogoffObserver.prototype.onError = function (error) {
if (this._onError) {
this._onError(error);
}
};
LogoffObserver.prototype.onCompleted = function (metadata) {
if (this._onCompleted) {
this._onCompleted(metadata);
}
};
return LogoffObserver;
}(StreamObserver));
exports.LogoffObserver = LogoffObserver;
var ResetObserver = /** @class */ (function (_super) {
__extends(ResetObserver, _super);
/**
*
* @param {Object} param -
* @param {function(err: String)} param.onProtocolError
* @param {function(err: Error)} param.onError
* @param {function(metadata)} param.onComplete
*/
function ResetObserver(_a) {
var _b = _a === void 0 ? {} : _a, onProtocolError = _b.onProtocolError, onError = _b.onError, onComplete = _b.onComplete;
var _this = _super.call(this) || this;
_this._onProtocolError = onProtocolError;
_this._onError = onError;
_this._onComplete = onComplete;
return _this;
}
ResetObserver.prototype.onNext = function (record) {
this.onError((0, neo4j_driver_core_1.newError)('Received RECORD when resetting: received record is: ' +
neo4j_driver_core_1.json.stringify(record), PROTOCOL_ERROR));
};
ResetObserver.prototype.onError = function (error) {
if (error.code === PROTOCOL_ERROR && this._onProtocolError) {
this._onProtocolError(error.message);
}
if (this._onError) {
this._onError(error);
}
};
ResetObserver.prototype.onCompleted = function (metadata) {
if (this._onComplete) {
this._onComplete(metadata);
}
};
return ResetObserver;
}(StreamObserver));
exports.ResetObserver = ResetObserver;
var TelemetryObserver = /** @class */ (function (_super) {
__extends(TelemetryObserver, _super);
/**
*
* @param {Object} param -
* @param {function(err: Error)} param.onError
* @param {function(metadata)} param.onCompleted
*/
function TelemetryObserver(_a) {
var _b = _a === void 0 ? {} : _a, onError = _b.onError, onCompleted = _b.onCompleted;
var _this = _super.call(this) || this;
_this._onError = onError;
_this._onCompleted = onCompleted;
return _this;
}
TelemetryObserver.prototype.onNext = function (record) {
this.onError((0, neo4j_driver_core_1.newError)('Received RECORD when sending telemetry ' + neo4j_driver_core_1.json.stringify(record), PROTOCOL_ERROR));
};
TelemetryObserver.prototype.onError = function (error) {
if (this._onError) {
this._onError(error);
}
};
TelemetryObserver.prototype.onCompleted = function (metadata) {
if (this._onCompleted) {
this._onCompleted(metadata);
}
};
return TelemetryObserver;
}(ResultStreamObserver));
exports.TelemetryObserver = TelemetryObserver;
var FailedObserver = /** @class */ (function (_super) {
__extends(FailedObserver, _super);
function FailedObserver(_a) {
var error = _a.error, onError = _a.onError;
var _this = _super.call(this, { beforeError: onError }) || this;
_this.onError(error);
return _this;
}
return FailedObserver;
}(ResultStreamObserver));
exports.FailedObserver = FailedObserver;
var CompletedObserver = /** @class */ (function (_super) {
__extends(CompletedObserver, _super);
function CompletedObserver() {
var _this = _super.call(this) || this;
_super.prototype.markCompleted.call(_this);
return _this;
}
return CompletedObserver;
}(ResultStreamObserver));
exports.CompletedObserver = CompletedObserver;
var ProcedureRouteObserver = /** @class */ (function (_super) {
__extends(ProcedureRouteObserver, _super);
function ProcedureRouteObserver(_a) {
var resultObserver = _a.resultObserver, onProtocolError = _a.onProtocolError, onError = _a.onError, onCompleted = _a.onCompleted;
var _this = _super.call(this) || this;
_this._resultObserver = resultObserver;
_this._onError = onError;
_this._onCompleted = onCompleted;
_this._records = [];
_this._onProtocolError = onProtocolError;
resultObserver.subscribe(_this);
return _this;
}
ProcedureRouteObserver.prototype.onNext = function (record) {
this._records.push(record);
};
ProcedureRouteObserver.prototype.onError = function (error) {
if (error.code === PROTOCOL_ERROR && this._onProtocolError) {
this._onProtocolError(error.message);
}
if (this._onError) {
this._onError(error);
}
};
ProcedureRouteObserver.prototype.onCompleted = function () {
if (this._records !== null && this._records.length !== 1) {
this.onError((0, neo4j_driver_core_1.newError)('Illegal response from router. Received ' +
this._records.length +
' records but expected only one.\n' +
neo4j_driver_core_1.json.stringify(this._records), PROTOCOL_ERROR));
return;
}
if (this._onCompleted) {
this._onCompleted(routing_table_raw_1.default.ofRecord(this._records[0]));
}
};
return ProcedureRouteObserver;
}(StreamObserver));
exports.ProcedureRouteObserver = ProcedureRouteObserver;
var RouteObserver = /** @class */ (function (_super) {
__extends(RouteObserver, _super);
/**
*
* @param {Object} param -
* @param {function(err: String)} param.onProtocolError
* @param {function(err: Error)} param.onError
* @param {function(RawRoutingTable)} param.onCompleted
*/
function RouteObserver(_a) {
var _b = _a === void 0 ? {} : _a, onProtocolError = _b.onProtocolError, onError = _b.onError, onCompleted = _b.onCompleted;
var _this = _super.call(this) || this;
_this._onProtocolError = onProtocolError;
_this._onError = onError;
_this._onCompleted = onCompleted;
return _this;
}
RouteObserver.prototype.onNext = function (record) {
this.onError((0, neo4j_driver_core_1.newError)('Received RECORD when resetting: received record is: ' +
neo4j_driver_core_1.json.stringify(record), PROTOCOL_ERROR));
};
RouteObserver.prototype.onError = function (error) {
if (error.code === PROTOCOL_ERROR && this._onProtocolError) {
this._onProtocolError(error.message);
}
if (this._onError) {
this._onError(error);
}
};
RouteObserver.prototype.onCompleted = function (metadata) {
if (this._onCompleted) {
this._onCompleted(routing_table_raw_1.default.ofMessageResponse(metadata));
}
};
return RouteObserver;
}(StreamObserver));
exports.RouteObserver = RouteObserver;
var _states = {
READY_STREAMING: {
// async start state
onSuccess: function (streamObserver, meta) {
streamObserver._handleRunSuccess(meta, function () {
streamObserver._setState(_states.STREAMING);
} // after run succeeded, async directly move to streaming
// state
);
},
onError: function (streamObserver, error) {
streamObserver._handleError(error);
},
name: function () {
return 'READY_STREAMING';
},
pull: function () { }
},
READY: {
// reactive start state
onSuccess: function (streamObserver, meta) {
streamObserver._handleRunSuccess(meta, function () { return streamObserver._handleStreaming(); } // after run succeeded received, reactive shall start pulling
);
},
onError: function (streamObserver, error) {
streamObserver._handleError(error);
},
name: function () {
return 'READY';
},
pull: function (streamObserver) { return streamObserver._more(); }
},
STREAMING: {
onSuccess: function (streamObserver, meta) {
if (meta.has_more) {
streamObserver._handleHasMore(meta);
}
else {
streamObserver._handlePullSuccess(meta);
}
},
onError: function (streamObserver, error) {
streamObserver._handleError(error);
},
name: function () {
return 'STREAMING';
},
pull: function () { }
},
FAILED: {
onError: function (_error) {
// more errors are ignored
},
name: function () {
return 'FAILED';
},
pull: function () { }
},
SUCCEEDED: {
name: function () {
return 'SUCCEEDED';
},
pull: function () { }
}
};