Skip to main content
Glama
mjolnir_queue.ts6.7 kB
import PQueue from "p-queue"; import { context, trace } from "@opentelemetry/api"; import { ChangeSetId } from "@/api/sdf/dal/change_set"; import { BustCacheFn, Id } from "./types/dbinterface"; import { EntityKind } from "./types/entity_kind_types"; const _DEBUG = import.meta.env.VITE_SI_ENV === "local"; // eslint-disable-next-line @typescript-eslint/no-explicit-any function debug(...args: any | any[]) { // eslint-disable-next-line no-console if (_DEBUG) console.debug(args); } /** * ASSUMPTIONS & HOW WE SEE THE SYSTEM BEHAVING * AND WHAT THAT MEANS FOR QUEUEING DATA * * Assumption: now that cold start does hammers in bulk, we expect very few hammers * in the general behavior of the system. * Follow up: We can further reduce these by taking more care what order we process * individual patches in * * We have observed that when patching, and we throw a hammer, the hammer returns with * a index ahead of patches we are currently applying. This breaks the updating scheme. * Therefore, we will hold hammers for processing until we have processed patches. (Above: * these hammers are mostly due to processing things in the wrong order, subsequent patches * in the batch contain the data) * * We have observed that a batch of patches does not always complete before the next batch * of patches, and it breaks the updating scheme. * * As a result of these operations we will: * - process 1 batch of patches at a time (processPatchQueue) * - while patches are processing, hold hammers * - once patches are done processing, process hammers */ // only throw 10 hammers at a time. Increase this when the throwing is over WS export const mjolnirQueue = new PQueue({ concurrency: 10, autoStart: true }); // holding tank for cold start const bulkQueue = new PQueue({ concurrency: 10, autoStart: false }); // processing queues for the general lifecycle export const processPatchQueue = new PQueue({ concurrency: 1, autoStart: true, intervalCap: 1, carryoverConcurrencyCount: true, }); export const processMjolnirQueue = new PQueue({ concurrency: 1, autoStart: true, intervalCap: 1, carryoverConcurrencyCount: true, }); const bustQueue = new PQueue({ concurrency: 10, autoStart: true }); // de-dupe queue busting! // except... we're never waiting to bust, so it doesnt really de-dupe const _bustQueue = new Set<string>(); export const bustQueueAdd = ( workspaceId: string, changeSetId: string, kind: EntityKind, id: string, fn: BustCacheFn, ) => { const key = `${workspaceId}-${changeSetId}-${kind}-${id}`; if (!_bustQueue.has(key)) { _bustQueue.add(key); bustQueue.add(() => { fn(workspaceId, changeSetId, kind, id); _bustQueue.delete(key); }); } }; processPatchQueue.on("add", () => { debug("⚙️ patches size", processPatchQueue.size); }); processPatchQueue.on("active", () => { debug("⚙️ patches size", processPatchQueue.size); }); processMjolnirQueue.on("add", () => { debug("⚙️ mjolnir size", processMjolnirQueue.size); }); processMjolnirQueue.on("active", () => { debug("⚙️ mjolnir size", processMjolnirQueue.size); }); let msgFlag = 0; bustQueue.on("add", () => { if (msgFlag === 0 || msgFlag === -1) debug("🧹 busts queued", bustQueue.size); msgFlag = 1; }); bustQueue.on("active", () => { if (msgFlag === 0 || msgFlag === -1) debug("🧹 busts queued", bustQueue.size); msgFlag = 1; }); processPatchQueue.on("empty", () => { debug("⚙️ patches processed"); const ctx = context.active(); const span = trace.getSpan(ctx); if (span) span.setAttribute("processPatchQueueEmpty", true); // the queue may either be paused or running bustQueue.start(); }); processMjolnirQueue.on("empty", () => { debug("⚙️ mjolnir processed"); const ctx = context.active(); const span = trace.getSpan(ctx); if (span) span.setAttribute("processMjolnirQueueEmpty", true); // the queue may either be paused or running bustQueue.start(); }); bustQueue.on("empty", () => { if (msgFlag === 0 || msgFlag === 1) { debug("🧹 busts processed"); const ctx = context.active(); const span = trace.getSpan(ctx); if (span) span.setAttribute("bustQueueEmpty", true); } msgFlag = -1; }); // ensure that when we get patches we process them fully processPatchQueue.on("active", () => { processMjolnirQueue.pause(); }); // process any hammers after patches processPatchQueue.on("idle", () => { processMjolnirQueue.start(); }); const inflight = new Set<string>(); // When we are running a bulk mjolnir, dont let other hammers fly... // Hold them in a queue, and if the bulk fails, let them fly... // if the bulk succeeds, don't fire them... const _bulkInflight: Record<string, Set<string>> = {}; export const bulkInflight = (args: { workspaceId: string; changeSetId: string; }) => { debug("BULK IN FLIGHT", args); bulkQueue.pause(); processPatchQueue.pause(); bustQueue.pause(); let inflight = _bulkInflight[args.workspaceId]; if (!inflight) { inflight = new Set<string>(); _bulkInflight[args.workspaceId] = inflight; } inflight.add(args.changeSetId); }; export const bulkDone = ( args: { workspaceId: string; changeSetId: string }, runQueue = false, ) => { debug("BULK DONE", args); let inflight = _bulkInflight[args.workspaceId]; if (!inflight) { inflight = new Set<string>(); _bulkInflight[args.workspaceId] = inflight; } inflight.delete(args.changeSetId); if (inflight.size === 0) { if (runQueue) { bulkQueue.start(); } else bulkQueue.clear(); if (processPatchQueue.size === 0) bustQueue.start(); processPatchQueue.start(); } }; type Description = { workspaceId: string; changeSetId: ChangeSetId; kind: string; id: Id; }; const descToString = (desc: Description) => `${desc.workspaceId}-${desc.changeSetId}-${desc.kind}-${desc.id}`; const canThrow = (desc: Description) => { const d = descToString(desc); if (inflight.has(d)) return false; else { inflight.add(d); debug("INFLIGHT", inflight.size); return true; } }; export const hasReturned = (desc: Description) => { const d = descToString(desc); debug("RETURNED", d, "INFLIGHT", inflight.size); inflight.delete(d); }; export const maybeMjolnir = async (desc: Description, fn: () => void) => { const bulk = _bulkInflight[desc.workspaceId]; if (bulk && bulk.size === 0) { await bulkQueue.add(fn); return; } if (canThrow(desc)) { try { await mjolnirQueue.add(fn); } catch (e) { // eslint-disable-next-line no-console console.error(`mjolnir job failed: ${e}`, desc, e); const d = descToString(desc); inflight.delete(d); } } };

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server