Skip to main content
Glama
reconnecting-websocket.ts17.8 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import { TypedEventTarget } from '../eventtarget'; /*! * Reconnecting WebSocket * by Pedro Ladaria <pedro.ladaria@gmail.com> * https://github.com/pladaria/reconnecting-websocket * License MIT * * Copy of "partysocket" from Partykit team, a fork of the original "Reconnecting WebSocket" * https://github.com/partykit/partykit/blob/main/packages/partysocket */ export interface IReconnectingWebSocket extends TypedEventTarget<WebSocketEventMap> { readyState: number; close(code?: number, reason?: string): void; send(message: string): void; reconnect(code?: number, reason?: string): void; } export interface IReconnectingWebSocketCtor { new (url: string, protocols?: ProtocolsProvider, options?: Options): IReconnectingWebSocket; } export interface ErrorEvent extends globalThis.Event { message: string; error: Error; } export interface CloseEvent extends globalThis.Event { code: number; reason: string; wasClean: boolean; } export type WebSocketEventMap = { close: CloseEvent; error: ErrorEvent; message: MessageEvent; open: Event; }; /** * This map exists separately from `WebSocketEventMap`, which is the actual event map used for the `ReconnectingWebSocket` class itself, * due to slight difference in the type between the events as we use them, and the events as they exist as global interfaces. We need the global interfaces * to be generic enough to satisfy conformant implementations that don't exactly match the events we export and use in `ReconnectingWebSocket` itself. */ export type IWebSocketEventMap = { close: globalThis.CloseEvent; error: globalThis.ErrorEvent; message: globalThis.MessageEvent; open: Event; }; /** * Generic interface that an implementation of `WebSocket` must satisfy to be used with `ReconnectingWebSocket`. * This is a slightly modified fork of the `WebSocket` global type used in Node. * * The main key difference is making all the `onclose`, `onerror`, etc. functions have `any[]` args, making `data` in `send()` of type `any`, and making `binaryType` of type string, * though the particular implementation should narrow each of these implementation-specific types. */ export interface IWebSocket { binaryType: string; readonly bufferedAmount: number; readonly extensions: string; onclose: ((...args: any[]) => any) | null; onerror: ((...args: any[]) => any) | null; onmessage: ((...args: any[]) => any) | null; onopen: ((...args: any[]) => any) | null; readonly protocol: string; readonly readyState: number; readonly url: string; close(code?: number, reason?: string): void; send(data: any): void; readonly CLOSED: number; readonly CLOSING: number; readonly CONNECTING: number; readonly OPEN: number; addEventListener<K extends keyof WebSocketEventMap>( type: K, listener: (ev: WebSocketEventMap[K]) => any, options?: boolean | AddEventListenerOptions ): void; addEventListener( type: string, listener: EventListenerOrEventListenerObject, options?: boolean | AddEventListenerOptions ): void; removeEventListener<K extends keyof WebSocketEventMap>( type: K, listener: (ev: WebSocketEventMap[K]) => any, options?: boolean | EventListenerOptions ): void; removeEventListener( type: string, listener: EventListenerOrEventListenerObject, options?: boolean | EventListenerOptions ): void; } const Events = { Event: (typeof globalThis.Event !== 'undefined' ? globalThis.Event : undefined) as | typeof globalThis.Event | undefined, ErrorEvent: undefined as any, CloseEvent: undefined as any, }; let eventsInitialized = false; function lazyInitEvents(): void { if (typeof globalThis.Event === 'undefined') { throw new Error('Unable to lazy init events for ReconnectingWebSocket. globalThis.Event is not defined yet'); } Events.Event = globalThis.Event; Events.ErrorEvent = class ErrorEvent extends Event implements ErrorEvent { public message: string; public error: Error; constructor(error: Error, target: any) { super('error', target); this.message = error.message; this.error = error; } } as unknown as typeof globalThis.ErrorEvent; Events.CloseEvent = class CloseEvent extends Event implements CloseEvent { public code: number; public reason: string; public wasClean = true; // eslint-disable-next-line default-param-last constructor(code = 1000, reason = '', target: any) { super('close', target); this.code = code; this.reason = reason; } } as unknown as typeof globalThis.CloseEvent; } export function assert(condition: unknown, msg?: string): asserts condition { if (!condition) { throw new Error(msg); } } function cloneEvent(e: Event): Event { return new (e as any).constructor(e.type, e); } export type Options<WS extends IWebSocket = WebSocket> = { WebSocket?: any; binaryType?: WS['binaryType']; maxReconnectionDelay?: number; minReconnectionDelay?: number; reconnectionDelayGrowFactor?: number; minUptime?: number; connectionTimeout?: number; maxRetries?: number; maxEnqueuedMessages?: number; startClosed?: boolean; debug?: boolean; debugLogger?: (...args: any[]) => void; }; const DEFAULT = { maxReconnectionDelay: 10000, minReconnectionDelay: 1000 + Math.random() * 4000, minUptime: 5000, reconnectionDelayGrowFactor: 1.3, connectionTimeout: 4000, maxRetries: Infinity, maxEnqueuedMessages: Infinity, startClosed: false, debug: false, }; export type ProtocolsProvider = null | string | string[]; export type Message = string | ArrayBuffer | Blob | ArrayBufferView; let didWarnAboutMissingWebSocket = false; export class ReconnectingWebSocket<WS extends IWebSocket = WebSocket> extends TypedEventTarget<WebSocketEventMap> implements IReconnectingWebSocket { private _ws: IWebSocket | undefined; private _retryCount = -1; private _uptimeTimeout: ReturnType<typeof setTimeout> | undefined; private _connectTimeout: ReturnType<typeof setTimeout> | undefined; private _shouldReconnect = true; private _connectLock = false; private _binaryType: WS['binaryType']; private _closeCalled = false; private _messageQueue: Message[] = []; private readonly _debugLogger = console.log.bind(console); protected _url: string; protected _protocols?: ProtocolsProvider; protected _options: Options<WS>; constructor(url: string, protocols?: ProtocolsProvider, options: Options<WS> = {}) { // Initialize all events if they haven't been created yet if (!eventsInitialized) { lazyInitEvents(); eventsInitialized = true; } super(); this._url = url; this._protocols = protocols; this._options = options; if (this._options.startClosed) { this._shouldReconnect = false; } if (this._options.binaryType) { this._binaryType = this._options.binaryType; } else { this._binaryType = 'blob'; } if (this._options.debugLogger) { this._debugLogger = this._options.debugLogger; } this._connect(); } static get CONNECTING(): number { return 0; } static get OPEN(): number { return 1; } static get CLOSING(): number { return 2; } static get CLOSED(): number { return 3; } get CONNECTING(): number { return ReconnectingWebSocket.CONNECTING; } get OPEN(): number { return ReconnectingWebSocket.OPEN; } get CLOSING(): number { return ReconnectingWebSocket.CLOSING; } get CLOSED(): number { return ReconnectingWebSocket.CLOSED; } get binaryType(): WS['binaryType'] { return this._ws ? this._ws.binaryType : this._binaryType; } set binaryType(value: WS['binaryType']) { this._binaryType = value; if (this._ws) { this._ws.binaryType = value; } } /** * @returns The number or connection retries. */ get retryCount(): number { return Math.max(this._retryCount, 0); } /** * @returns The number of bytes of data that have been queued using calls to send() but not yet * transmitted to the network. This value resets to zero once all queued data has been sent. * This value does not reset to zero when the connection is closed; if you keep calling send(), * this will continue to climb. Read only * */ get bufferedAmount(): number { const bytes = this._messageQueue.reduce((acc, message) => { if (typeof message === 'string') { acc += message.length; // not byte size } else if (message instanceof Blob) { acc += message.size; } else { acc += message.byteLength; } return acc; }, 0); return bytes + (this._ws?.bufferedAmount ?? 0); } /** * @returns The extensions selected by the server. This is currently only the empty string or a list of * extensions as negotiated by the connection */ get extensions(): string { return this._ws?.extensions ?? ''; } /** * @returns A string indicating the name of the sub-protocol the server selected; * this will be one of the strings specified in the protocols parameter when creating the * WebSocket object. */ get protocol(): string { return this._ws?.protocol ?? ''; } /** * @returns The current state of the connection; this is one of the Ready state constants. */ get readyState(): number { if (this._ws) { return this._ws.readyState; } return this._options.startClosed ? ReconnectingWebSocket.CLOSED : ReconnectingWebSocket.CONNECTING; } /** * @returns The URL as resolved by the constructor. */ get url(): string { return this._ws ? this._ws.url : ''; } /** * @returns Whether the websocket object is now in reconnectable state. */ get shouldReconnect(): boolean { return this._shouldReconnect; } /** * An event listener to be called when the WebSocket connection's readyState changes to CLOSED */ public onclose: ((event: CloseEvent) => void) | null = null; /** * An event listener to be called when an error occurs */ public onerror: ((event: ErrorEvent) => void) | null = null; /** * An event listener to be called when a message is received from the server */ public onmessage: ((event: MessageEvent) => void) | null = null; /** * An event listener to be called when the WebSocket connection's readyState changes to OPEN; * this indicates that the connection is ready to send and receive data */ public onopen: ((event: Event) => void) | null = null; /** * Closes the WebSocket connection or connection attempt, if any. If the connection is already * CLOSED, this method does nothing * @param code - The code to close with. Default is 1000. * @param reason - An optional reason for closing the connection. */ public close(code = 1000, reason?: string): void { this._closeCalled = true; this._shouldReconnect = false; this._clearTimeouts(); if (!this._ws) { this._debug('close enqueued: no ws instance'); return; } if (this._ws.readyState === this.CLOSED) { this._debug('close: already closed'); return; } this._ws.close(code, reason); } /** * Closes the WebSocket connection or connection attempt and connects again. * Resets retry counter; * @param code - The code to disconnect with. Default is 1000. * @param reason - An optional reason for disconnecting the connection. */ public reconnect(code?: number, reason?: string): void { this._shouldReconnect = true; this._closeCalled = false; this._retryCount = -1; if (!this._ws || this._ws.readyState === this.CLOSED) { this._connect(); } else { this._disconnect(code, reason); this._connect(); } } /** * Enqueue specified data to be transmitted to the server over the WebSocket connection * @param data - The data to enqueue. */ public send(data: Message): void { if (this._ws && this._ws.readyState === this.OPEN) { this._debug('send', data); this._ws.send(data); } else { const { maxEnqueuedMessages = DEFAULT.maxEnqueuedMessages } = this._options; if (this._messageQueue.length < maxEnqueuedMessages) { this._debug('enqueue', data); this._messageQueue.push(data); } } } private _debug(...args: unknown[]): void { if (this._options.debug) { this._debugLogger('RWS>', ...args); } } private _getNextDelay(): number { const { reconnectionDelayGrowFactor = DEFAULT.reconnectionDelayGrowFactor, minReconnectionDelay = DEFAULT.minReconnectionDelay, maxReconnectionDelay = DEFAULT.maxReconnectionDelay, } = this._options; let delay = 0; if (this._retryCount > 0) { delay = minReconnectionDelay * Math.pow(reconnectionDelayGrowFactor, this._retryCount - 1); if (delay > maxReconnectionDelay) { delay = maxReconnectionDelay; } } this._debug('next delay', delay); return delay; } private _wait(): Promise<void> { return new Promise((resolve) => { setTimeout(resolve, this._getNextDelay()); }); } private _connect(): void { if (this._connectLock || !this._shouldReconnect) { return; } this._connectLock = true; const { maxRetries = DEFAULT.maxRetries, connectionTimeout = DEFAULT.connectionTimeout } = this._options; if (this._retryCount >= maxRetries) { this._debug('max retries reached', this._retryCount, '>=', maxRetries); return; } this._retryCount++; this._debug('connect', this._retryCount); this._removeListeners(); this._wait() .then(() => { // close could be called before creating the ws if (this._closeCalled) { this._connectLock = false; return; } if (!this._options.WebSocket && typeof WebSocket === 'undefined' && !didWarnAboutMissingWebSocket) { console.error('‼️ No WebSocket implementation available. You should define options.WebSocket.'); didWarnAboutMissingWebSocket = true; } const WS: typeof WebSocket = this._options.WebSocket || WebSocket; this._debug('connect', { url: this._url, protocols: this._protocols }); this._ws = this._protocols ? new WS(this._url, this._protocols) : new WS(this._url); this._ws.binaryType = this._binaryType; this._connectLock = false; this._addListeners(); this._connectTimeout = setTimeout(() => this._handleTimeout(), connectionTimeout); }) // via https://github.com/pladaria/reconnecting-websocket/pull/166 .catch((err) => { this._connectLock = false; this._handleError(new Events.ErrorEvent(Error(err.message), this)); }); } private _handleTimeout(): void { this._debug('timeout event'); this._handleError(new Events.ErrorEvent(Error('TIMEOUT'), this)); } private _disconnect(code = 1000, reason?: string): void { this._clearTimeouts(); if (!this._ws) { return; } this._removeListeners(); try { this._ws.close(code, reason); this._handleClose(new Events.CloseEvent(code, reason, this)); } catch (_error) { // ignore } } private _acceptOpen(): void { this._debug('accept open'); this._retryCount = 0; } private readonly _handleOpen = (event: Event): void => { this._debug('open event'); const { minUptime = DEFAULT.minUptime } = this._options; clearTimeout(this._connectTimeout); this._uptimeTimeout = setTimeout(() => this._acceptOpen(), minUptime); assert(this._ws, 'WebSocket is not defined'); this._ws.binaryType = this._binaryType; // send enqueued messages (messages sent before websocket open event) this._messageQueue.forEach((message) => this._ws?.send(message)); this._messageQueue = []; if (this.onopen) { this.onopen(event); } this.dispatchEvent(cloneEvent(event)); }; private readonly _handleMessage = (event: MessageEvent): void => { this._debug('message event'); if (this.onmessage) { this.onmessage(event); } this.dispatchEvent(cloneEvent(event)); }; private readonly _handleError = (event: ErrorEvent): void => { this._debug('error event', event.message); this._disconnect(undefined, event.message === 'TIMEOUT' ? 'timeout' : undefined); if (this.onerror) { this.onerror(event); } this._debug('exec error listeners'); this.dispatchEvent(cloneEvent(event)); this._connect(); }; private readonly _handleClose = (event: CloseEvent): void => { this._debug('close event'); this._clearTimeouts(); if (this._shouldReconnect) { this._connect(); } if (this.onclose) { this.onclose(event); } this.dispatchEvent(cloneEvent(event)); }; private _removeListeners(): void { if (!this._ws) { return; } this._debug('removeListeners'); this._ws.removeEventListener('open', this._handleOpen); this._ws.removeEventListener('close', this._handleClose); this._ws.removeEventListener('message', this._handleMessage); this._ws.removeEventListener('error', this._handleError); } private _addListeners(): void { if (!this._ws) { return; } this._debug('addListeners'); this._ws.addEventListener('open', this._handleOpen); this._ws.addEventListener('close', this._handleClose); this._ws.addEventListener('message', this._handleMessage); this._ws.addEventListener('error', this._handleError); } private _clearTimeouts(): void { clearTimeout(this._connectTimeout); clearTimeout(this._uptimeTimeout); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server