Skip to main content
Glama
shared_webworker.ts13.7 kB
import * as Comlink from "comlink"; import { ExecBaseOptions, ExecReturnResultRowsOptions, ExecRowModeArrayOptions, FlexibleString, SqlValue, } from "@sqlite.org/sqlite-wasm"; import { ChangeSetId } from "@/api/sdf/dal/change_set"; import { ComponentId } from "@/api/sdf/dal/component"; import { WorkspacePk } from "@/api/sdf/dal/workspace"; import { EntityKind, GlobalEntity } from "./types/entity_kind_types"; import { SharedDBInterface, TabDBInterface, AtomDocument, BroadcastMessage, SHARED_BROADCAST_CHANNEL_NAME, DB_NOT_INIT_ERR, Gettable, Listable, QueryAttributesTerm, } from "./types/dbinterface"; // Wait 5 seconds after we no longer have any remotes before terminating ourselves const SHUTDOWN_DELAY_MS = 5000; declare global { interface Window { onconnect?: (event: MessageEvent) => void; } } const _DEBUG = true; // import.meta.env.VITE_SI_ENV === "local"; // eslint-disable-next-line @typescript-eslint/no-explicit-any function debug(...args: any | any[]) { // eslint-disable-next-line no-console if (_DEBUG) console.debug(args); } let currentLeader: Comlink.Remote<TabDBInterface> | undefined; let currentLeaderId: string | undefined; const remotes: { [key: string]: Comlink.Remote<TabDBInterface> } = {}; const bearerTokens: { [key: string]: string } = {}; const hasLeaderChannel = new MessageChannel(); const leaderChangedChannel = new MessageChannel(); const LEADER_CHANGED = "LEADER_CHANGED"; const failOnLeaderChange = (): Promise<never> => { return new Promise((_, reject) => { const onMessage = () => { reject(LEADER_CHANGED); }; leaderChangedChannel.port1.addEventListener("message", onMessage, { capture: false, passive: true, once: true, }); leaderChangedChannel.port1.start(); }); }; const getLeader = (): Promise<Comlink.Remote<TabDBInterface>> => { return new Promise((resolve, reject) => { if (currentLeader) { resolve(currentLeader); } hasLeaderChannel.port1.onmessage = () => { if (currentLeader) { resolve(currentLeader); } else { reject(new Error("Got remote message but no remote set")); } }; }); }; const MAX_RETRIES = 250; const sleep = (ms: number) => new Promise((resolve) => { setTimeout(resolve, ms); }); async function withLeader<R>( cb: (remote: Comlink.Remote<TabDBInterface>) => Promise<R>, retry?: number, ): Promise<R> { const remote = await getLeader(); const retries = retry ?? 0; if (retries >= MAX_RETRIES) { throw new Error( "Retries exceeded attempting to perform query against database leader. Please refresh this tab.", ); } // If the leader with the remote changes while a call is in progress, we need // to retry the call, now with the new remote. Otherwise, we will likely hang // forever. try { const result = await Promise.race([cb(remote), failOnLeaderChange()]); return result; } catch (err) { if (typeof err === "string" && err === LEADER_CHANGED) { debug("LEADER CHANGED MID REQUEST, rerunning callback", retries); return withLeader(cb, retries + 1); } if (err instanceof Error && err.message === DB_NOT_INIT_ERR) { debug("DB NOT INITIALIZED?", retries); await sleep(100); return withLeader(cb, retries + 1); } throw err; } } const clientInterest: Record<string, number> = {}; const connectionStatus: Record<string, boolean> = {}; const dbInterface: SharedDBInterface = { async broadcastMessage(message: BroadcastMessage) { Object.keys(remotes).forEach((remoteId) => { if (remoteId !== currentLeaderId) { try { remotes[remoteId]?.receiveBroadcast(message); } catch (err) { debug("failed to send to remote", remoteId, err); } } }); }, unregisterRemote(id: string) { debug("unregister remote in shared", id); if (currentLeaderId === id) { debug("tab with lock unregistered. no remote set"); currentLeader = undefined; } delete remotes[id]; if (Object.keys(remotes).length === 0) { // Just in case there is a race between closing a tab and a new one // loading, and this worker gets a new remote, don't shut down right away. // Double check after a few seconds. setTimeout(async () => { if (Object.keys(remotes).length === 0) { shutDownWebWorker(); } }, SHUTDOWN_DELAY_MS); } }, async registerRemote(id: string, remote: Comlink.Remote<TabDBInterface>) { if (!remotes[id]) { debug("register remote in shared", id); remotes[id] = remote; } if (await remote.hasDbLock()) { await this.setLeader(id); } }, async hasLeader() { return !!currentLeader; }, async currentLeaderId() { return currentLeaderId; }, async setLeader(remoteId: string) { debug("setting remote in shared web worker to", remoteId); currentLeader = remotes[remoteId]; if (!currentLeader) { throw new Error(`remote ${remoteId} not registered`); } const leaderChanged = currentLeaderId !== remoteId; currentLeaderId = remoteId; for (const [workspaceId, workspaceToken] of Object.entries(bearerTokens)) { await currentLeader.setBearer(workspaceId, workspaceToken); await currentLeader.initSocket(workspaceId); } currentLeader.bifrostReconnect(); hasLeaderChannel.port2.postMessage("got leader"); if (leaderChanged) { debug("follow the leader"); leaderChangedChannel.port2.postMessage("leader changed"); } }, async initDB(_testing: boolean) { debug("init db called in shared webworker"); }, async migrate(testing: boolean) { return withLeader(async (remote) => await remote.migrate(testing)); }, async setBearer(workspaceId, token): Promise<void> { bearerTokens[workspaceId] = token; const updateRemote = async () => { await currentLeader?.setBearer(workspaceId, token); currentLeader?.initSocket(workspaceId); }; updateRemote(); }, async getBearers(): Promise<{ [key: string]: string }> { return bearerTokens; }, async addBearers(bearers) { for (const [workspaceId, bearerToken] of Object.entries(bearers)) { bearerTokens[workspaceId] = bearerToken; await currentLeader?.setBearer(workspaceId, bearerToken); currentLeader?.initSocket(workspaceId); } }, async showInterest(workspaceId: string, changeSetId: ChangeSetId) { const key = `${workspaceId}-${changeSetId}`; let priority = clientInterest[key] ?? 0; priority += 1; clientInterest[key] = priority; currentLeader?.receiveInterest(clientInterest); this.broadcastMessage({ messageKind: "interest", arguments: clientInterest, }); }, async setConnections(connections: Record<string, boolean>) { Object.entries(connections).forEach(([k, v]) => { connectionStatus[k] = v; }); }, async getConnections() { return connectionStatus; }, async initSocket(workspaceId: string): Promise<void> { await withLeader(async (remote) => await remote.initSocket(workspaceId)); }, async initBifrost(_gotlockPort: MessagePort) { debug("init bifrost in shared worker called"); }, async bifrostClose() { await withLeader(async (remote) => await remote.bifrostClose()); }, async bifrostReconnect() { await withLeader(async (remote) => await remote.bifrostReconnect()); }, async linkNewChangeset( workspaceId: string, headChangeSetId: string, changeSetId: string, ): Promise<void> { await withLeader( async (remote) => await remote.linkNewChangeset( workspaceId, headChangeSetId, changeSetId, ), ); }, async getOutgoingConnectionsByComponentId( workspaceId: string, changeSetId: string, ) { return await withLeader( async (remote) => await remote.getOutgoingConnectionsByComponentId( workspaceId, changeSetId, ), ); }, async getIncomingManagementByComponentId( workspaceId: string, changeSetId: string, ) { return await withLeader( async (remote) => await remote.getIncomingManagementByComponentId( workspaceId, changeSetId, ), ); }, async getGlobal( workspaceId: string, kind: GlobalEntity, id: string, ): Promise<-1 | AtomDocument> { return await withLeader( async (remote) => await remote.getGlobal(workspaceId, kind, id), ); }, async get( workspaceId: string, changeSetId: string, kind: Gettable, id: string, ): Promise<-1 | AtomDocument> { return await withLeader( async (remote) => await remote.get(workspaceId, changeSetId, kind, id), ); }, async getExists( workspaceId: string, changeSetId: string, kind: Gettable, id: string, ): Promise<boolean> { return await withLeader( async (remote) => await remote.getExists(workspaceId, changeSetId, kind, id), ); }, async getList( workspaceId: string, changeSetId: string, kind: Listable, id: string, ): Promise<string> { return await withLeader( async (remote) => await remote.getList(workspaceId, changeSetId, kind, id), ); }, async getKind(workspaceId, changeSetId, kind) { return await withLeader( async (remote) => await remote.getKind(workspaceId, changeSetId, kind), ); }, async queryAttributes( workspaceId: WorkspacePk, changeSetId: ChangeSetId, terms: QueryAttributesTerm[], ): Promise<ComponentId[]> { return await withLeader( async (remote) => await remote.queryAttributes(workspaceId, changeSetId, terms), ); }, async getPossibleConnections(workspaceId, changeSetId) { return await withLeader( async (remote) => await remote.getPossibleConnections(workspaceId, changeSetId), ); }, async getOutgoingConnectionsCounts(workspaceId: string, changeSetId: string) { return await withLeader( async (remote) => await remote.getOutgoingConnectionsCounts(workspaceId, changeSetId), ); }, async getComponentDetails(workspaceId: string, changeSetId: string) { return await withLeader( async (remote) => await remote.getComponentDetails(workspaceId, changeSetId), ); }, async getComponentsInViews(workspaceId: string, changeSetId: string) { return await withLeader( async (remote) => await remote.getComponentsInViews(workspaceId, changeSetId), ); }, async getComponentsInOnlyOneView(workspaceId: string, changeSetId: string) { return await withLeader( async (remote) => await remote.getComponentsInOnlyOneView(workspaceId, changeSetId), ); }, async getSchemaMembers(workspaceId: string, changeSetId: string) { return await withLeader( async (remote) => await remote.getSchemaMembers(workspaceId, changeSetId), ); }, async getDefaultSubscriptions(workspaceId: string, changeSetId: string) { return await withLeader( async (remote) => await remote.getDefaultSubscriptions(workspaceId, changeSetId), ); }, async mjolnir( workspaceId: string, changeSetId: string, kind: EntityKind, id: string, checksum?: string, ) { await withLeader( async (remote) => await remote.mjolnir(workspaceId, changeSetId, kind, id, checksum), ); }, async changeSetExists( workspaceId: string, changeSetId: string, ): Promise<boolean> { return await withLeader( async (remote) => await remote.changeSetExists(workspaceId, changeSetId), ); }, async niflheim( workspaceId: string, changeSetId: string, ): Promise<-1 | 0 | 1> { return await withLeader( async (remote) => await remote.niflheim(workspaceId, changeSetId), ); }, async syncAtoms(workspaceId: string, changeSetId: string): Promise<void> { return await withLeader( async (remote) => await remote.syncAtoms(workspaceId, changeSetId), ); }, async vanaheim(workspaceId: string): Promise<boolean> { return await withLeader( async (remote) => await remote.vanaheim(workspaceId), ); }, async pruneAtomsForClosedChangeSet(workspaceId: string, changeSetId: string) { return await withLeader( async (remote) => await remote.pruneAtomsForClosedChangeSet(workspaceId, changeSetId), ); }, async exec( opts: ExecBaseOptions & ExecRowModeArrayOptions & ExecReturnResultRowsOptions & { sql: FlexibleString; }, ): Promise<SqlValue[][]> { return await withLeader(async (remote) => await remote.exec(opts)); }, async bobby(): Promise<void> { await withLeader(async (remote) => await remote.bobby()); }, async ragnarok( workspaceId: string, changeSetId: string, noColdStart?: boolean, ): Promise<void> { await withLeader( async (remote) => await remote.ragnarok(workspaceId, changeSetId, noColdStart), ); }, async odin(changeSetId: string): Promise<object> { return await withLeader(async (remote) => await remote.odin(changeSetId)); }, }; const onConnectBroadcast = new BroadcastChannel(SHARED_BROADCAST_CHANNEL_NAME); // eslint-disable-next-line no-restricted-globals const name = self.name; // eslint-disable-next-line no-restricted-globals const shutDownWebWorker = () => self.close(); // eslint-disable-next-line no-restricted-globals self.onconnect = (event: MessageEvent) => { Comlink.expose(dbInterface, event.ports[0]); onConnectBroadcast.postMessage(name); };

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server