Skip to main content
Glama

Neo4j MCP Server

stream-observers.js28.2 kB
"use strict"; var __extends = (this && this.__extends) || (function () { var extendStatics = function (d, b) { extendStatics = Object.setPrototypeOf || ({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) || function (d, b) { for (var p in b) if (Object.prototype.hasOwnProperty.call(b, p)) d[p] = b[p]; }; return extendStatics(d, b); }; return function (d, b) { if (typeof b !== "function" && b !== null) throw new TypeError("Class extends value " + String(b) + " is not a constructor or null"); extendStatics(d, b); function __() { this.constructor = d; } d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __()); }; })(); var __importDefault = (this && this.__importDefault) || function (mod) { return (mod && mod.__esModule) ? mod : { "default": mod }; }; Object.defineProperty(exports, "__esModule", { value: true }); exports.TelemetryObserver = exports.ProcedureRouteObserver = exports.RouteObserver = exports.CompletedObserver = exports.FailedObserver = exports.ResetObserver = exports.LogoffObserver = exports.LoginObserver = exports.ResultStreamObserver = exports.StreamObserver = void 0; /** * Copyright (c) "Neo4j" * Neo4j Sweden AB [https://neo4j.com] * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ var neo4j_driver_core_1 = require("neo4j-driver-core"); var routing_table_raw_1 = __importDefault(require("./routing-table-raw")); var lang_1 = require("../lang"); var FETCH_ALL = neo4j_driver_core_1.internal.constants.FETCH_ALL; var PROTOCOL_ERROR = neo4j_driver_core_1.error.PROTOCOL_ERROR; var StreamObserver = /** @class */ (function () { function StreamObserver() { } StreamObserver.prototype.onNext = function (rawRecord) { }; StreamObserver.prototype.onError = function (_error) { }; StreamObserver.prototype.onCompleted = function (meta) { }; return StreamObserver; }()); exports.StreamObserver = StreamObserver; /** * Handles a RUN/PULL_ALL, or RUN/DISCARD_ALL requests, maps the responses * in a way that a user-provided observer can see these as a clean Stream * of records. * This class will queue up incoming messages until a user-provided observer * for the incoming stream is registered. Thus, we keep fields around * for tracking head/records/tail. These are only used if there is no * observer registered. * @access private */ var ResultStreamObserver = /** @class */ (function (_super) { __extends(ResultStreamObserver, _super); /** * * @param {Object} param * @param {Object} param.server * @param {boolean} param.reactive * @param {function(stmtId: number|Integer, n: number|Integer, observer: StreamObserver)} param.moreFunction - * @param {function(stmtId: number|Integer, observer: StreamObserver)} param.discardFunction - * @param {number|Integer} param.fetchSize - * @param {function(err: Error): Promise|void} param.beforeError - * @param {function(err: Error): Promise|void} param.afterError - * @param {function(keys: string[]): Promise|void} param.beforeKeys - * @param {function(keys: string[]): Promise|void} param.afterKeys - * @param {function(metadata: Object): Promise|void} param.beforeComplete - * @param {function(metadata: Object): Promise|void} param.afterComplete - * @param {function(metadata: Object): Promise|void} param.enrichMetadata - */ function ResultStreamObserver(_a) { var _b = _a === void 0 ? {} : _a, _c = _b.reactive, reactive = _c === void 0 ? false : _c, moreFunction = _b.moreFunction, discardFunction = _b.discardFunction, _d = _b.fetchSize, fetchSize = _d === void 0 ? FETCH_ALL : _d, beforeError = _b.beforeError, afterError = _b.afterError, beforeKeys = _b.beforeKeys, afterKeys = _b.afterKeys, beforeComplete = _b.beforeComplete, afterComplete = _b.afterComplete, server = _b.server, _e = _b.highRecordWatermark, highRecordWatermark = _e === void 0 ? Number.MAX_VALUE : _e, _f = _b.lowRecordWatermark, lowRecordWatermark = _f === void 0 ? Number.MAX_VALUE : _f, enrichMetadata = _b.enrichMetadata, onDb = _b.onDb; var _this = _super.call(this) || this; _this._fieldKeys = null; _this._fieldLookup = null; _this._head = null; _this._queuedRecords = []; _this._tail = null; _this._error = null; _this._observers = []; _this._meta = {}; _this._server = server; _this._beforeError = beforeError; _this._afterError = afterError; _this._beforeKeys = beforeKeys; _this._afterKeys = afterKeys; _this._beforeComplete = beforeComplete; _this._afterComplete = afterComplete; _this._enrichMetadata = enrichMetadata || lang_1.functional.identity; _this._queryId = null; _this._moreFunction = moreFunction; _this._discardFunction = discardFunction; _this._discard = false; _this._fetchSize = fetchSize; _this._lowRecordWatermark = lowRecordWatermark; _this._highRecordWatermark = highRecordWatermark; _this._setState(reactive ? _states.READY : _states.READY_STREAMING); _this._setupAutoPull(); _this._paused = false; _this._pulled = !reactive; _this._haveRecordStreamed = false; _this._onDb = onDb; return _this; } /** * Pause the record consuming * * This function will supend the record consuming. It will not cancel the stream and the already * requested records will be sent to the subscriber. */ ResultStreamObserver.prototype.pause = function () { this._paused = true; }; /** * Resume the record consuming * * This function will resume the record consuming fetching more records from the server. */ ResultStreamObserver.prototype.resume = function () { this._paused = false; this._setupAutoPull(true); this._state.pull(this); }; /** * Will be called on every record that comes in and transform a raw record * to a Object. If user-provided observer is present, pass transformed record * to it's onNext method, otherwise, push to record que. * @param {Array} rawRecord - An array with the raw record */ ResultStreamObserver.prototype.onNext = function (rawRecord) { this._haveRecordStreamed = true; var record = new neo4j_driver_core_1.Record(this._fieldKeys, rawRecord, this._fieldLookup); if (this._observers.some(function (o) { return o.onNext; })) { this._observers.forEach(function (o) { if (o.onNext) { o.onNext(record); } }); } else { this._queuedRecords.push(record); if (this._queuedRecords.length > this._highRecordWatermark) { this._autoPull = false; } } }; ResultStreamObserver.prototype.onCompleted = function (meta) { this._state.onSuccess(this, meta); }; /** * Will be called on errors. * If user-provided observer is present, pass the error * to it's onError method, otherwise set instance variable _error. * @param {Object} error - An error object */ ResultStreamObserver.prototype.onError = function (error) { this._state.onError(this, error); }; /** * Cancel pending record stream */ ResultStreamObserver.prototype.cancel = function () { this._discard = true; }; /** * Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL. * Response for RUN initializes query keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream. * * However, some operations can be represented as a single message which receives full metadata in a single response. * For example, operations to begin, commit and rollback an explicit transaction use two messages in Bolt V1 but a single message in Bolt V3. * Messages are `RUN "BEGIN" {}` + `PULL_ALL` in Bolt V1 and `BEGIN` in Bolt V3. * * This function prepares the observer to only handle a single response message. */ ResultStreamObserver.prototype.prepareToHandleSingleResponse = function () { this._head = []; this._fieldKeys = []; this._setState(_states.STREAMING); }; /** * Mark this observer as if it has completed with no metadata. */ ResultStreamObserver.prototype.markCompleted = function () { this._head = []; this._fieldKeys = []; this._tail = {}; this._setState(_states.SUCCEEDED); }; /** * Subscribe to events with provided observer. * @param {Object} observer - Observer object * @param {function(keys: String[])} observer.onKeys - Handle stream header, field keys. * @param {function(record: Object)} observer.onNext - Handle records, one by one. * @param {function(metadata: Object)} observer.onCompleted - Handle stream tail, the metadata. * @param {function(error: Object)} observer.onError - Handle errors, should always be provided. */ ResultStreamObserver.prototype.subscribe = function (observer) { if (this._head && observer.onKeys) { observer.onKeys(this._head); } if (this._queuedRecords.length > 0 && observer.onNext) { for (var i = 0; i < this._queuedRecords.length; i++) { observer.onNext(this._queuedRecords[i]); if (this._queuedRecords.length - i - 1 <= this._lowRecordWatermark) { this._autoPull = true; if (this._state === _states.READY) { this._handleStreaming(); } } } } if (this._tail && observer.onCompleted) { observer.onCompleted(this._tail); } if (this._error) { observer.onError(this._error); } this._observers.push(observer); if (this._state === _states.READY) { this._handleStreaming(); } }; ResultStreamObserver.prototype._handleHasMore = function (meta) { // We've consumed current batch and server notified us that there're more // records to stream. Let's invoke more or discard function based on whether // the user wants to discard streaming or not this._setState(_states.READY); // we've done streaming this._handleStreaming(); delete meta.has_more; }; ResultStreamObserver.prototype._handlePullSuccess = function (meta) { var _this = this; var completionMetadata = this._enrichMetadata(Object.assign(this._server ? { server: this._server } : {}, this._meta, { stream_summary: { have_records_streamed: this._haveRecordStreamed, pulled: this._pulled, has_keys: this._fieldKeys.length > 0 } }, meta)); if (![undefined, null, 'r', 'w', 'rw', 's'].includes(completionMetadata.type)) { this.onError((0, neo4j_driver_core_1.newError)("Server returned invalid query type. Expected one of [undefined, null, \"r\", \"w\", \"rw\", \"s\"] but got '".concat(completionMetadata.type, "'"), PROTOCOL_ERROR)); return; } this._setState(_states.SUCCEEDED); var beforeHandlerResult = null; if (this._beforeComplete) { beforeHandlerResult = this._beforeComplete(completionMetadata); } var continuation = function () { // End of stream _this._tail = completionMetadata; if (_this._observers.some(function (o) { return o.onCompleted; })) { _this._observers.forEach(function (o) { if (o.onCompleted) { o.onCompleted(completionMetadata); } }); } if (_this._afterComplete) { _this._afterComplete(completionMetadata); } }; if (beforeHandlerResult) { Promise.resolve(beforeHandlerResult).then(function () { return continuation(); }); } else { continuation(); } }; ResultStreamObserver.prototype._handleRunSuccess = function (meta, afterSuccess) { var _this = this; if (this._fieldKeys === null) { // Stream header, build a name->index field lookup table // to be used by records. This is an optimization to make it // faster to look up fields in a record by name, rather than by index. // Since the records we get back via Bolt are just arrays of values. this._fieldKeys = []; this._fieldLookup = {}; if (meta.fields && meta.fields.length > 0) { this._fieldKeys = meta.fields; for (var i = 0; i < meta.fields.length; i++) { this._fieldLookup[meta.fields[i]] = i; } } if (meta.db !== null && this._onDb !== undefined) { this._onDb(meta.db); } if (meta.fields != null) { // remove fields key from metadata object delete meta.fields; } // Extract server generated query id for use in requestMore and discard // functions if (meta.qid !== null && meta.qid !== undefined) { this._queryId = meta.qid; // remove qid from metadata object delete meta.qid; } this._storeMetadataForCompletion(meta); var beforeHandlerResult = null; if (this._beforeKeys) { beforeHandlerResult = this._beforeKeys(this._fieldKeys); } var continuation_1 = function () { _this._head = _this._fieldKeys; if (_this._observers.some(function (o) { return o.onKeys; })) { _this._observers.forEach(function (o) { if (o.onKeys) { o.onKeys(_this._fieldKeys); } }); } if (_this._afterKeys) { _this._afterKeys(_this._fieldKeys); } afterSuccess(); }; if (beforeHandlerResult) { Promise.resolve(beforeHandlerResult).then(function () { return continuation_1(); }); } else { continuation_1(); } } }; ResultStreamObserver.prototype._handleError = function (error) { var _this = this; this._setState(_states.FAILED); this._error = error; var beforeHandlerResult = null; if (this._beforeError) { beforeHandlerResult = this._beforeError(error); } var continuation = function () { if (_this._observers.some(function (o) { return o.onError; })) { _this._observers.forEach(function (o) { if (o.onError) { o.onError(error); } }); } if (_this._afterError) { _this._afterError(error); } }; if (beforeHandlerResult) { Promise.resolve(beforeHandlerResult).then(function () { return continuation(); }); } else { continuation(); } }; ResultStreamObserver.prototype._handleStreaming = function () { if (this._head && this._observers.some(function (o) { return o.onNext || o.onCompleted; })) { if (!this._paused && (this._discard || this._autoPull)) { this._more(); } } }; ResultStreamObserver.prototype._more = function () { if (this._discard) { this._discardFunction(this._queryId, this); } else { this._pulled = true; this._moreFunction(this._queryId, this._fetchSize, this); } this._setState(_states.STREAMING); }; ResultStreamObserver.prototype._storeMetadataForCompletion = function (meta) { var keys = Object.keys(meta); var index = keys.length; var key = ''; while (index--) { key = keys[index]; this._meta[key] = meta[key]; } }; ResultStreamObserver.prototype._setState = function (state) { this._state = state; }; ResultStreamObserver.prototype._setupAutoPull = function () { this._autoPull = true; }; return ResultStreamObserver; }(StreamObserver)); exports.ResultStreamObserver = ResultStreamObserver; var LoginObserver = /** @class */ (function (_super) { __extends(LoginObserver, _super); /** * * @param {Object} param - * @param {function(err: Error)} param.onError * @param {function(metadata)} param.onCompleted */ function LoginObserver(_a) { var _b = _a === void 0 ? {} : _a, onError = _b.onError, onCompleted = _b.onCompleted; var _this = _super.call(this) || this; _this._onError = onError; _this._onCompleted = onCompleted; return _this; } LoginObserver.prototype.onNext = function (record) { this.onError((0, neo4j_driver_core_1.newError)('Received RECORD when initializing ' + neo4j_driver_core_1.json.stringify(record))); }; LoginObserver.prototype.onError = function (error) { if (this._onError) { this._onError(error); } }; LoginObserver.prototype.onCompleted = function (metadata) { if (this._onCompleted) { this._onCompleted(metadata); } }; return LoginObserver; }(StreamObserver)); exports.LoginObserver = LoginObserver; var LogoffObserver = /** @class */ (function (_super) { __extends(LogoffObserver, _super); /** * * @param {Object} param - * @param {function(err: Error)} param.onError * @param {function(metadata)} param.onCompleted */ function LogoffObserver(_a) { var _b = _a === void 0 ? {} : _a, onError = _b.onError, onCompleted = _b.onCompleted; var _this = _super.call(this) || this; _this._onError = onError; _this._onCompleted = onCompleted; return _this; } LogoffObserver.prototype.onNext = function (record) { this.onError((0, neo4j_driver_core_1.newError)('Received RECORD when logging off ' + neo4j_driver_core_1.json.stringify(record))); }; LogoffObserver.prototype.onError = function (error) { if (this._onError) { this._onError(error); } }; LogoffObserver.prototype.onCompleted = function (metadata) { if (this._onCompleted) { this._onCompleted(metadata); } }; return LogoffObserver; }(StreamObserver)); exports.LogoffObserver = LogoffObserver; var ResetObserver = /** @class */ (function (_super) { __extends(ResetObserver, _super); /** * * @param {Object} param - * @param {function(err: String)} param.onProtocolError * @param {function(err: Error)} param.onError * @param {function(metadata)} param.onComplete */ function ResetObserver(_a) { var _b = _a === void 0 ? {} : _a, onProtocolError = _b.onProtocolError, onError = _b.onError, onComplete = _b.onComplete; var _this = _super.call(this) || this; _this._onProtocolError = onProtocolError; _this._onError = onError; _this._onComplete = onComplete; return _this; } ResetObserver.prototype.onNext = function (record) { this.onError((0, neo4j_driver_core_1.newError)('Received RECORD when resetting: received record is: ' + neo4j_driver_core_1.json.stringify(record), PROTOCOL_ERROR)); }; ResetObserver.prototype.onError = function (error) { if (error.code === PROTOCOL_ERROR && this._onProtocolError) { this._onProtocolError(error.message); } if (this._onError) { this._onError(error); } }; ResetObserver.prototype.onCompleted = function (metadata) { if (this._onComplete) { this._onComplete(metadata); } }; return ResetObserver; }(StreamObserver)); exports.ResetObserver = ResetObserver; var TelemetryObserver = /** @class */ (function (_super) { __extends(TelemetryObserver, _super); /** * * @param {Object} param - * @param {function(err: Error)} param.onError * @param {function(metadata)} param.onCompleted */ function TelemetryObserver(_a) { var _b = _a === void 0 ? {} : _a, onError = _b.onError, onCompleted = _b.onCompleted; var _this = _super.call(this) || this; _this._onError = onError; _this._onCompleted = onCompleted; return _this; } TelemetryObserver.prototype.onNext = function (record) { this.onError((0, neo4j_driver_core_1.newError)('Received RECORD when sending telemetry ' + neo4j_driver_core_1.json.stringify(record), PROTOCOL_ERROR)); }; TelemetryObserver.prototype.onError = function (error) { if (this._onError) { this._onError(error); } }; TelemetryObserver.prototype.onCompleted = function (metadata) { if (this._onCompleted) { this._onCompleted(metadata); } }; return TelemetryObserver; }(ResultStreamObserver)); exports.TelemetryObserver = TelemetryObserver; var FailedObserver = /** @class */ (function (_super) { __extends(FailedObserver, _super); function FailedObserver(_a) { var error = _a.error, onError = _a.onError; var _this = _super.call(this, { beforeError: onError }) || this; _this.onError(error); return _this; } return FailedObserver; }(ResultStreamObserver)); exports.FailedObserver = FailedObserver; var CompletedObserver = /** @class */ (function (_super) { __extends(CompletedObserver, _super); function CompletedObserver() { var _this = _super.call(this) || this; _super.prototype.markCompleted.call(_this); return _this; } return CompletedObserver; }(ResultStreamObserver)); exports.CompletedObserver = CompletedObserver; var ProcedureRouteObserver = /** @class */ (function (_super) { __extends(ProcedureRouteObserver, _super); function ProcedureRouteObserver(_a) { var resultObserver = _a.resultObserver, onProtocolError = _a.onProtocolError, onError = _a.onError, onCompleted = _a.onCompleted; var _this = _super.call(this) || this; _this._resultObserver = resultObserver; _this._onError = onError; _this._onCompleted = onCompleted; _this._records = []; _this._onProtocolError = onProtocolError; resultObserver.subscribe(_this); return _this; } ProcedureRouteObserver.prototype.onNext = function (record) { this._records.push(record); }; ProcedureRouteObserver.prototype.onError = function (error) { if (error.code === PROTOCOL_ERROR && this._onProtocolError) { this._onProtocolError(error.message); } if (this._onError) { this._onError(error); } }; ProcedureRouteObserver.prototype.onCompleted = function () { if (this._records !== null && this._records.length !== 1) { this.onError((0, neo4j_driver_core_1.newError)('Illegal response from router. Received ' + this._records.length + ' records but expected only one.\n' + neo4j_driver_core_1.json.stringify(this._records), PROTOCOL_ERROR)); return; } if (this._onCompleted) { this._onCompleted(routing_table_raw_1.default.ofRecord(this._records[0])); } }; return ProcedureRouteObserver; }(StreamObserver)); exports.ProcedureRouteObserver = ProcedureRouteObserver; var RouteObserver = /** @class */ (function (_super) { __extends(RouteObserver, _super); /** * * @param {Object} param - * @param {function(err: String)} param.onProtocolError * @param {function(err: Error)} param.onError * @param {function(RawRoutingTable)} param.onCompleted */ function RouteObserver(_a) { var _b = _a === void 0 ? {} : _a, onProtocolError = _b.onProtocolError, onError = _b.onError, onCompleted = _b.onCompleted; var _this = _super.call(this) || this; _this._onProtocolError = onProtocolError; _this._onError = onError; _this._onCompleted = onCompleted; return _this; } RouteObserver.prototype.onNext = function (record) { this.onError((0, neo4j_driver_core_1.newError)('Received RECORD when resetting: received record is: ' + neo4j_driver_core_1.json.stringify(record), PROTOCOL_ERROR)); }; RouteObserver.prototype.onError = function (error) { if (error.code === PROTOCOL_ERROR && this._onProtocolError) { this._onProtocolError(error.message); } if (this._onError) { this._onError(error); } }; RouteObserver.prototype.onCompleted = function (metadata) { if (this._onCompleted) { this._onCompleted(routing_table_raw_1.default.ofMessageResponse(metadata)); } }; return RouteObserver; }(StreamObserver)); exports.RouteObserver = RouteObserver; var _states = { READY_STREAMING: { // async start state onSuccess: function (streamObserver, meta) { streamObserver._handleRunSuccess(meta, function () { streamObserver._setState(_states.STREAMING); } // after run succeeded, async directly move to streaming // state ); }, onError: function (streamObserver, error) { streamObserver._handleError(error); }, name: function () { return 'READY_STREAMING'; }, pull: function () { } }, READY: { // reactive start state onSuccess: function (streamObserver, meta) { streamObserver._handleRunSuccess(meta, function () { return streamObserver._handleStreaming(); } // after run succeeded received, reactive shall start pulling ); }, onError: function (streamObserver, error) { streamObserver._handleError(error); }, name: function () { return 'READY'; }, pull: function (streamObserver) { return streamObserver._more(); } }, STREAMING: { onSuccess: function (streamObserver, meta) { if (meta.has_more) { streamObserver._handleHasMore(meta); } else { streamObserver._handlePullSuccess(meta); } }, onError: function (streamObserver, error) { streamObserver._handleError(error); }, name: function () { return 'STREAMING'; }, pull: function () { } }, FAILED: { onError: function (_error) { // more errors are ignored }, name: function () { return 'FAILED'; }, pull: function () { } }, SUCCEEDED: { name: function () { return 'SUCCEEDED'; }, pull: function () { } } };

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Tobarrientos2/neo4j-mcpserver'

If you have feedback or need assistance with the MCP directory API, please join our Discord server