Skip to main content
Glama

Convex MCP server

Official
by get-convex
election.ts11.2 kB
import { Channel } from "async-channel"; import { ClientId, followerMessage, FollowerMessage, leaderMessage, LeaderMessage, mutationToStoredMutation, pageToStoredPage, storedPageToPage, } from "./types"; import Dexie from "dexie"; import { LocalPersistence } from "../localPersistence"; import { CorePersistenceRequest, Page } from "../core/protocol"; import { MutationInfo, PersistId } from "../../shared/types"; const DB_VERSION = 1; type WorkerMessage = | { type: "follower"; message: FollowerMessage } | { type: "leader"; message: LeaderMessage }; export type ElectionOptions = { joinTimeoutMs?: number; debug?: boolean; }; /** * Leader election for IndexedDB persistence. * * This class relies on the Lock APIs to ensure that only one * instance of it across an origin is elected the *leader*. LocalPersistence calls * then send messages to the leader, which performs the desired action and retries. * * Each instance of the class has a "worker" thread that receives messages, connects * to the leader if it can, and otherwise decides to become the leader. */ export class Election implements LocalPersistence { clientId: ClientId; private currentState: State; /** * Broadcast channels for communicating with other threads. Both callers into * this class and the worker send follower messages, and the single leader * across all threads sends leader messages. */ followerBroadcast: BroadcastChannel; leaderBroadcast: BroadcastChannel; /** * Channel for communicating with the worker thread. Both broadcast channels * push messages onto this channel. */ workerChannel: Channel<WorkerMessage>; joinTimeoutMs: number; debug: boolean; constructor( private name: string, private address: string, options?: ElectionOptions, ) { this.joinTimeoutMs = options?.joinTimeoutMs ?? 1000; this.debug = options?.debug ?? false; this.clientId = crypto.randomUUID(); this.currentState = new State(); this.workerChannel = new Channel(16); this.followerBroadcast = new BroadcastChannel(`${this.name}-client`); this.followerBroadcast.onmessage = (e) => { this.log(`Received follower message`, e.data); const message = followerMessage.parse(e.data); this.workerChannel.push({ type: "follower", message }); }; this.followerBroadcast.onmessageerror = (e) => { this.log("Client broadcast message error:", e); }; this.leaderBroadcast = new BroadcastChannel(`${this.name}-server`); this.leaderBroadcast.onmessage = (e) => { this.log("Received leader message", e.data); const message = leaderMessage.parse(e.data); this.workerChannel.push({ type: "leader", message }); }; this.leaderBroadcast.onmessageerror = (e) => { this.log("Server broadcast message error:", e); }; void this.go(); } addListener(listener: (request: CorePersistenceRequest) => void) { this.currentState.addListener(listener); } persistMutation(persistId: PersistId, mutationInfo: MutationInfo) { this.broadcastFollowerMessage({ type: "persist", clientId: this.clientId, persistId, pages: [], mutationInfos: [mutationToStoredMutation(mutationInfo)], }); } persistPages(persistId: PersistId, pages: Page[]) { const storedPages = []; for (const page of pages) { const storedPage = pageToStoredPage(page); if (storedPage !== null) { storedPages.push(storedPage); } } this.broadcastFollowerMessage({ type: "persist", clientId: this.clientId, persistId, pages: storedPages, mutationInfos: [], }); } destroy() { this.followerBroadcast.close(); this.leaderBroadcast.close(); this.workerChannel.close(); } private log(message: string, ..._args: any[]) { if (this.debug) { console.log(`[election] ${message}`); } } private async leaderExists(): Promise<boolean> { const locks = await navigator.locks.query(); return locks.held?.find((l) => l.name === this.name) !== undefined; } private broadcastFollowerMessage(message: FollowerMessage) { this.log("Broadcasting follower message", message); this.followerBroadcast.postMessage(message); // NB: BroadcastChannels do not send messages to themselves, // so we push this message onto the worker channel in case we're the leader. this.workerChannel.push({ type: "follower", message }); } private broadcastLeaderMessage(message: LeaderMessage) { this.log("Broadcasting leader message", message); this.leaderBroadcast.postMessage(message); this.workerChannel.push({ type: "leader", message }); } private async go() { while (!this.workerChannel.done) { const tryLeadership = await this.follow(); if (tryLeadership) { await navigator.locks.request( this.name, { mode: "exclusive", ifAvailable: true }, (lock) => this.lead(lock), ); } } } // Cleanly exits after the leader goes away, returning whether if we should // try to become the leader. private async follow(): Promise<boolean> { const leaderExists = await this.leaderExists(); if (!leaderExists) { return true; } // Try to join the current leader. console.log("Trying to join as a follower..."); this.broadcastFollowerMessage({ type: "join", clientId: this.clientId, name: this.name, address: this.address, }); let leaderClientId: string | null = null; const deadline = Date.now() + this.joinTimeoutMs; while (Date.now() < deadline) { const workerMessage = await this.workerChannel.get(); if (workerMessage.type !== "leader") { continue; } if (workerMessage.message.requestingClientId === this.clientId) { leaderClientId = this.handleJoinResult(workerMessage.message); break; } } if (leaderClientId === null) { throw new Error("Leader not found"); } while (!this.workerChannel.done) { const leaderExists = await this.leaderExists(); if (!leaderExists) { return true; } const workerMessage = await this.workerChannel.get(); if (workerMessage.type === "follower") { continue; } // TODO: Handle retrying a message if a leader crashes // while servicing a request or times out. const { message } = workerMessage; if (message.requestingClientId !== this.clientId) { continue; } this.handleLeaderMessage(message); } return false; } private async lead(lock: Lock | null) { // If someone else grabbed the lock, return early and try to follow again. if (lock === null) { return; } console.log("Starting as leader..."); const db = new Dexie(this.name); db.version(DB_VERSION).stores({ // TODO: Normalize objects across pages. pages: "[table+index+serializedLowerBound]", }); // Subscribe to ourselves if we're now the leader. this.broadcastFollowerMessage({ type: "join", clientId: this.clientId, name: this.name, address: this.address, }); while (!this.workerChannel.done) { const workerMessage = await this.workerChannel.get(); this.log("Received worker message", workerMessage); if (workerMessage.type === "leader") { const { message } = workerMessage; if (message.leaderClientId !== this.clientId) { throw new Error( `Someone else sending a leader message while we have the lock?`, ); } if (message.requestingClientId === this.clientId) { this.handleLeaderMessage(message); } continue; } const { message } = workerMessage; switch (message.type) { case "join": { const storedPages = await db.table("pages").toArray(); this.broadcastLeaderMessage({ type: "joinResult", leaderClientId: this.clientId, requestingClientId: message.clientId, result: { type: "success", pages: storedPages, mutations: [], }, }); break; } case "persist": { await db.transaction("rw", "pages", async () => { await db.table("pages").bulkPut(message.pages); }); this.broadcastLeaderMessage({ type: "persistResult", leaderClientId: this.clientId, requestingClientId: message.clientId, persistId: message.persistId, result: { type: "success", }, }); break; } default: { throw new Error(`Invalid message: ${JSON.stringify(message)}`); } } } } private handleLeaderMessage(message: LeaderMessage) { switch (message.type) { case "joinResult": { this.handleJoinResult(message); break; } case "persistResult": { this.handlePersistResult(message); break; } default: { throw new Error(`Unexpected message ${message}.`); } } } private handleJoinResult(message: LeaderMessage) { if (message.type !== "joinResult") { throw new Error(`Unexpected message type ${message.type}.`); } if (message.result.type === "failure") { throw new Error(`Can't join leader: ${message.result.error}`); } console.log(`Connected to ${message.leaderClientId}!`); const { pages: storedPages, mutations: _storedMutations } = message.result; const pages = storedPages.map(storedPageToPage); // TODO: Handle reloading mutations. this.currentState.setCurrentPages(pages); return message.leaderClientId; } private handlePersistResult(message: LeaderMessage) { if (message.type !== "persistResult") { throw new Error(`Unexpected message type ${message.type}.`); } this.currentState.emitToListeners({ requestor: "LocalPersistence", kind: "localPersistComplete", persistId: message.persistId as PersistId, }); } } class State { listeners: Set<(request: CorePersistenceRequest) => void> = new Set(); currentPages?: Page[]; addListener(listener: (request: CorePersistenceRequest) => void) { this.listeners.add(listener); if (this.currentPages) { listener({ requestor: "LocalPersistence", kind: "ingestFromLocalPersistence", pages: this.currentPages, serverTs: 0, }); } } setCurrentPages(pages: Page[]) { // TODO: Handle reloading from persistence when we switch from // one leader to another. const firstTime = this.currentPages === undefined; this.currentPages = pages; if (firstTime) { this.emitToListeners({ requestor: "LocalPersistence", kind: "ingestFromLocalPersistence", pages, serverTs: 0, }); } } emitToListeners(request: CorePersistenceRequest) { for (const listener of this.listeners) { listener(request); } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server