Skip to main content
Glama

Karakeep MCP server

by karakeep-app
queue.test.ts5.95 kB
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, inject, it, } from "vitest"; import type { Queue, QueueClient } from "@karakeep/shared/queueing"; import { AdminClient } from "../admin.js"; import { RestateQueueProvider } from "../index.js"; import { waitUntil } from "./utils.js"; type TestAction = | { type: "val"; val: number } | { type: "err"; err: string } | { type: "stall"; durSec: number }; describe("Restate Queue Provider", () => { let queueClient: QueueClient; let queue: Queue<TestAction>; let adminClient: AdminClient; const testState = { results: [] as number[], errors: [] as string[], inFlight: 0, maxInFlight: 0, }; async function waitUntilQueueEmpty() { await waitUntil( async () => { const stats = await queue.stats(); return stats.pending + stats.pending_retry + stats.running === 0; }, "Queue to be empty", 60000, ); } beforeEach(async () => { testState.results = []; testState.errors = []; testState.inFlight = 0; testState.maxInFlight = 0; }); afterEach(async () => { await waitUntilQueueEmpty(); }); beforeAll(async () => { const ingressPort = inject("restateIngressPort"); const adminPort = inject("restateAdminPort"); process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; process.env.RESTATE_LISTEN_PORT = "9080"; const provider = new RestateQueueProvider(); const client = await provider.getClient(); if (!client) { throw new Error("Failed to create queue client"); } queueClient = client; adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR); queue = queueClient.createQueue<TestAction>("test-queue", { defaultJobArgs: { numRetries: 3, }, keepFailedJobs: false, }); queueClient.createRunner( queue, { run: async (job) => { testState.inFlight++; testState.maxInFlight = Math.max( testState.maxInFlight, testState.inFlight, ); const jobData = job.data; switch (jobData.type) { case "val": testState.results.push(jobData.val); break; case "err": throw new Error(jobData.err); case "stall": await new Promise((resolve) => setTimeout(resolve, jobData.durSec * 1000), ); break; } }, onError: async (job) => { testState.inFlight--; const jobData = job.data; if (jobData && jobData.type === "err") { testState.errors.push(jobData.err); } }, onComplete: async () => { testState.inFlight--; }, }, { concurrency: 3, timeoutSecs: 2, pollIntervalMs: 0 /* Doesn't matter */, }, ); await queueClient.prepare(); await queueClient.start(); await adminClient.upsertDeployment("http://host.docker.internal:9080"); }, 90000); afterAll(async () => { if (queueClient?.shutdown) { await queueClient.shutdown(); } }); it("should enqueue and process a job", async () => { const jobId = await queue.enqueue({ type: "val", val: 42 }); expect(jobId).toBeDefined(); expect(typeof jobId).toBe("string"); await waitUntilQueueEmpty(); expect(testState.results).toEqual([42]); }, 60000); it("should process multiple jobs", async () => { await queue.enqueue({ type: "val", val: 1 }); await queue.enqueue({ type: "val", val: 2 }); await queue.enqueue({ type: "val", val: 3 }); await waitUntilQueueEmpty(); expect(testState.results.length).toEqual(3); expect(testState.results).toContain(1); expect(testState.results).toContain(2); expect(testState.results).toContain(3); }, 60000); it("should retry failed jobs", async () => { await queue.enqueue({ type: "err", err: "Test error" }); await waitUntilQueueEmpty(); // Initial attempt + 3 retries expect(testState.errors).toEqual([ "Test error", "Test error", "Test error", "Test error", ]); }, 90000); it("should use idempotency key", async () => { const idempotencyKey = `test-${Date.now()}`; await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); await waitUntilQueueEmpty(); expect(testState.results).toEqual([200]); }, 60000); it("should handle concurrent jobs", async () => { const promises = []; for (let i = 300; i < 320; i++) { promises.push(queue.enqueue({ type: "stall", durSec: 0.1 })); } await Promise.all(promises); await waitUntilQueueEmpty(); expect(testState.maxInFlight).toEqual(3); }, 60000); it("should handle priorities", async () => { // Hog the queue first await Promise.all([ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }), queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }), queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }), ]); // Then those will get reprioritized await Promise.all([ queue.enqueue({ type: "val", val: 200 }, { priority: -1 }), queue.enqueue({ type: "val", val: 201 }, { priority: -2 }), queue.enqueue({ type: "val", val: 202 }, { priority: -3 }), queue.enqueue({ type: "val", val: 300 }, { priority: 0 }), queue.enqueue({ type: "val", val: 301 }, { priority: 1 }), queue.enqueue({ type: "val", val: 302 }, { priority: 2 }), ]); await waitUntilQueueEmpty(); expect(testState.results).toEqual([ // Lower numeric priority value should run first 202, 201, 200, 300, 301, 302, ]); }, 60000); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/karakeep-app/karakeep'

If you have feedback or need assistance with the MCP directory API, please join our Discord server