import {
afterAll,
afterEach,
beforeAll,
beforeEach,
describe,
expect,
inject,
it,
} from "vitest";
import type { Queue, QueueClient } from "@karakeep/shared/queueing";
import { QueueRetryAfterError } from "@karakeep/shared/queueing";
import { AdminClient } from "../admin";
import { RestateQueueProvider } from "../index";
import { waitUntil } from "./utils";
class Baton {
private promise: Promise<void>;
private resolve: () => void;
private waiting = 0;
constructor() {
this.resolve = () => {
/* empty */
};
this.promise = new Promise<void>((resolve) => {
this.resolve = resolve;
});
}
async acquire() {
this.waiting++;
await this.promise;
}
async waitUntilCountWaiting(count: number) {
while (this.waiting < count) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
release() {
this.resolve();
}
}
type TestAction =
| { type: "val"; val: number }
| { type: "err"; err: string }
| { type: "stall"; durSec: number }
| { type: "semaphore-acquire" }
| {
type: "rate-limit";
val: number;
delayMs: number;
attemptsBeforeSuccess: number;
}
| {
type: "timeout-then-succeed";
val: number;
timeoutDurSec: number;
attemptsBeforeSuccess: number;
};
describe("Restate Queue Provider", () => {
let queueClient: QueueClient;
let queue: Queue<TestAction>;
let adminClient: AdminClient;
const testState = {
results: [] as number[],
errors: [] as string[],
inFlight: 0,
maxInFlight: 0,
baton: new Baton(),
rateLimitAttempts: new Map<string, number>(),
timeoutAttempts: new Map<string, number>(),
};
async function waitUntilQueueEmpty() {
await waitUntil(
async () => {
const stats = await queue.stats();
return stats.pending + stats.pending_retry + stats.running === 0;
},
"Queue to be empty",
60000,
);
}
beforeEach(async () => {
testState.results = [];
testState.errors = [];
testState.inFlight = 0;
testState.maxInFlight = 0;
testState.baton = new Baton();
testState.rateLimitAttempts = new Map<string, number>();
testState.timeoutAttempts = new Map<string, number>();
});
afterEach(async () => {
await waitUntilQueueEmpty();
});
beforeAll(async () => {
const ingressPort = inject("restateIngressPort");
const adminPort = inject("restateAdminPort");
process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
process.env.RESTATE_LISTEN_PORT = "9080";
const provider = new RestateQueueProvider();
const client = await provider.getClient();
if (!client) {
throw new Error("Failed to create queue client");
}
queueClient = client;
adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR);
queue = queueClient.createQueue<TestAction>("test-queue", {
defaultJobArgs: {
numRetries: 3,
},
keepFailedJobs: false,
});
queueClient.createRunner(
queue,
{
run: async (job) => {
testState.inFlight++;
testState.maxInFlight = Math.max(
testState.maxInFlight,
testState.inFlight,
);
const jobData = job.data;
switch (jobData.type) {
case "val":
return jobData.val;
case "err":
throw new Error(jobData.err);
case "stall":
await new Promise((resolve) =>
setTimeout(resolve, jobData.durSec * 1000),
);
break;
case "semaphore-acquire":
await testState.baton.acquire();
break;
case "rate-limit": {
const attemptKey = `${job.id}`;
const currentAttempts =
testState.rateLimitAttempts.get(attemptKey) || 0;
testState.rateLimitAttempts.set(attemptKey, currentAttempts + 1);
if (currentAttempts < jobData.attemptsBeforeSuccess) {
throw new QueueRetryAfterError(
`Rate limited (attempt ${currentAttempts + 1})`,
jobData.delayMs,
);
}
return jobData.val;
}
case "timeout-then-succeed": {
const attemptKey = `${job.id}`;
const currentAttempts =
testState.timeoutAttempts.get(attemptKey) || 0;
testState.timeoutAttempts.set(attemptKey, currentAttempts + 1);
if (currentAttempts < jobData.attemptsBeforeSuccess) {
// Stall longer than the timeout to trigger a timeout
await new Promise((resolve) =>
setTimeout(resolve, jobData.timeoutDurSec * 1000),
);
// This should not be reached if timeout works correctly
throw new Error("Should have timed out");
}
return jobData.val;
}
}
},
onError: async (job) => {
testState.inFlight--;
const jobData = job.data;
if (jobData && jobData.type === "err") {
testState.errors.push(jobData.err);
}
},
onComplete: async (_j, res) => {
testState.inFlight--;
if (res) {
testState.results.push(res);
}
},
},
{
concurrency: 3,
timeoutSecs: 2,
pollIntervalMs: 0 /* Doesn't matter */,
},
);
await queueClient.prepare();
await queueClient.start();
await adminClient.upsertDeployment("http://host.docker.internal:9080");
}, 90000);
afterAll(async () => {
if (queueClient?.shutdown) {
await queueClient.shutdown();
}
});
it("should enqueue and process a job", async () => {
const jobId = await queue.enqueue({ type: "val", val: 42 });
expect(jobId).toBeDefined();
expect(typeof jobId).toBe("string");
await waitUntilQueueEmpty();
expect(testState.results).toEqual([42]);
}, 60000);
it("should process multiple jobs", async () => {
await queue.enqueue({ type: "val", val: 1 });
await queue.enqueue({ type: "val", val: 2 });
await queue.enqueue({ type: "val", val: 3 });
await waitUntilQueueEmpty();
expect(testState.results.length).toEqual(3);
expect(testState.results).toContain(1);
expect(testState.results).toContain(2);
expect(testState.results).toContain(3);
}, 60000);
it("should retry failed jobs", async () => {
await queue.enqueue({ type: "err", err: "Test error" });
await waitUntilQueueEmpty();
// Initial attempt + 3 retries
expect(testState.errors).toEqual([
"Test error",
"Test error",
"Test error",
"Test error",
]);
}, 90000);
it("should use idempotency key", async () => {
const idempotencyKey = `test-${Date.now()}`;
// hog the queue
await Promise.all([
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
]);
await testState.baton.waitUntilCountWaiting(3);
await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
await new Promise((resolve) => setTimeout(resolve, 1000));
testState.baton.release();
await waitUntilQueueEmpty();
expect(testState.results).toEqual([200]);
}, 60000);
it("should handle concurrent jobs", async () => {
const promises = [];
for (let i = 300; i < 320; i++) {
promises.push(queue.enqueue({ type: "stall", durSec: 0.1 }));
}
await Promise.all(promises);
await waitUntilQueueEmpty();
expect(testState.maxInFlight).toEqual(3);
}, 60000);
it("should handle priorities", async () => {
// hog the queue
await Promise.all([
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
]);
await testState.baton.waitUntilCountWaiting(3);
// Then those will get reprioritized
await Promise.all([
queue.enqueue({ type: "val", val: 200 }, { priority: -1 }),
queue.enqueue({ type: "val", val: 201 }, { priority: -2 }),
queue.enqueue({ type: "val", val: 202 }, { priority: -3 }),
queue.enqueue({ type: "val", val: 300 }, { priority: 0 }),
queue.enqueue({ type: "val", val: 301 }, { priority: 1 }),
queue.enqueue({ type: "val", val: 302 }, { priority: 2 }),
]);
// Wait for all jobs to be enqueued
await new Promise((resolve) => setTimeout(resolve, 1000));
testState.baton.release();
await waitUntilQueueEmpty();
expect(testState.results).toEqual([
// Lower numeric priority value should run first
202, 201, 200, 300, 301, 302,
]);
}, 60000);
describe("Group Fairness", () => {
it("should process jobs from different groups fairly with same priority", async () => {
// hog the queue
await Promise.all([
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
]);
await testState.baton.waitUntilCountWaiting(3);
// Enqueue jobs from two different groups with same priority
// Group A has more jobs
await queue.enqueue(
{ type: "val", val: 200 },
{ priority: 0, groupId: "B" },
);
await queue.enqueue(
{ type: "val", val: 201 },
{ priority: 0, groupId: "B" },
);
await queue.enqueue(
{ type: "val", val: 100 },
{ priority: 0, groupId: "A" },
);
await queue.enqueue(
{ type: "val", val: 101 },
{ priority: 0, groupId: "A" },
);
await queue.enqueue(
{ type: "val", val: 102 },
{ priority: 0, groupId: "A" },
);
await queue.enqueue(
{ type: "val", val: 103 },
{ priority: 0, groupId: "A" },
);
await queue.enqueue(
{ type: "val", val: 300 },
{ priority: 0, groupId: "C" },
);
await queue.enqueue(
{ type: "val", val: 301 },
{ priority: 0, groupId: "C" },
);
// Wait for all jobs to be enqueued
await new Promise((resolve) => setTimeout(resolve, 1000));
testState.baton.release();
await waitUntilQueueEmpty();
expect(testState.results).toEqual([
200, 100, 300, 201, 101, 301, 102, 103,
]);
}, 60000);
it("should respect priority over group fairness", async () => {
// hog the queue
await Promise.all([
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
]);
await testState.baton.waitUntilCountWaiting(3);
await queue.enqueue(
{ type: "val", val: 100 },
{ priority: 1, groupId: "A" },
);
await queue.enqueue(
{ type: "val", val: 101 },
{ priority: 1, groupId: "A" },
);
await queue.enqueue(
{ type: "val", val: 200 },
{ priority: 0, groupId: "B" },
);
await queue.enqueue(
{ type: "val", val: 201 },
{ priority: 0, groupId: "B" },
);
// Wait for all jobs to be enqueued
await new Promise((resolve) => setTimeout(resolve, 1000));
testState.baton.release();
await waitUntilQueueEmpty();
// Priority 0 (higher) should run before priority 1 (lower)
expect(testState.results).toEqual([200, 201, 100, 101]);
}, 60000);
it("should handle jobs without groupId", async () => {
// hog the queue
await Promise.all([
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
]);
await testState.baton.waitUntilCountWaiting(3);
// Mix of grouped and ungrouped jobs
await queue.enqueue({ type: "val", val: 100 }, { priority: 0 }); // ungrouped
await queue.enqueue({ type: "val", val: 101 }, { priority: 0 }); // ungrouped
await queue.enqueue(
{ type: "val", val: 200 },
{ priority: 0, groupId: "A" },
);
await queue.enqueue(
{ type: "val", val: 201 },
{ priority: 0, groupId: "A" },
);
// Wait for all jobs to be enqueued
await new Promise((resolve) => setTimeout(resolve, 1000));
testState.baton.release();
await waitUntilQueueEmpty();
// All jobs should complete successfully
expect(testState.results).toEqual([100, 200, 101, 201]);
}, 60000);
it("should work with jobs that don't specify groupId", async () => {
// hog the queue
await Promise.all([
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
]);
await testState.baton.waitUntilCountWaiting(3);
// These should all go to the default "__ungrouped__" group
await queue.enqueue({ type: "val", val: 1 }, { priority: 0 });
await queue.enqueue({ type: "val", val: 2 }, { priority: 1 });
await queue.enqueue({ type: "val", val: 3 }, { priority: -1 });
// Wait for all jobs to be enqueued
await new Promise((resolve) => setTimeout(resolve, 1000));
testState.baton.release();
await waitUntilQueueEmpty();
// Should respect priority
expect(testState.results).toEqual([3, 1, 2]);
}, 60000);
it("should handle same job in same group with different priorities", async () => {
// hog the queue
await Promise.all([
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
queue.enqueue(
{ type: "semaphore-acquire" },
{ groupId: "init", priority: -10 },
),
]);
await testState.baton.waitUntilCountWaiting(3);
await queue.enqueue(
{ type: "val", val: 100 },
{ priority: 2, groupId: "A" },
);
await queue.enqueue(
{ type: "val", val: 101 },
{ priority: 1, groupId: "A" },
);
await queue.enqueue(
{ type: "val", val: 102 },
{ priority: 0, groupId: "A" },
);
// Wait for all jobs to be enqueued
await new Promise((resolve) => setTimeout(resolve, 1000));
testState.baton.release();
await waitUntilQueueEmpty();
// Should respect priority even within the same group
expect(testState.results).toEqual([102, 101, 100]);
}, 60000);
});
describe("QueueRetryAfterError handling", () => {
it("should retry after delay without counting against retry attempts", async () => {
const startTime = Date.now();
// This job will fail with QueueRetryAfterError twice before succeeding
await queue.enqueue({
type: "rate-limit",
val: 42,
delayMs: 500, // 500ms delay
attemptsBeforeSuccess: 2, // Fail twice, succeed on third try
});
await waitUntilQueueEmpty();
const duration = Date.now() - startTime;
// Should have succeeded
expect(testState.results).toEqual([42]);
// Should have been called 3 times (2 rate limit failures + 1 success)
expect(testState.rateLimitAttempts.size).toBe(1);
const attempts = Array.from(testState.rateLimitAttempts.values())[0];
expect(attempts).toBe(3);
// Should have waited at least 1 second total (2 x 500ms delays)
expect(duration).toBeGreaterThanOrEqual(1000);
// onError should NOT have been called for rate limit retries
expect(testState.errors).toEqual([]);
}, 60000);
it("should not exhaust retries when rate limited", async () => {
// This job will be rate limited many more times than the retry limit
// but should still eventually succeed
await queue.enqueue({
type: "rate-limit",
val: 100,
delayMs: 100, // Short delay for faster test
attemptsBeforeSuccess: 10, // Fail 10 times (more than the 3 retry limit)
});
await waitUntilQueueEmpty();
// Should have succeeded despite being "retried" more than the limit
expect(testState.results).toEqual([100]);
// Should have been called 11 times (10 rate limit failures + 1 success)
const attempts = Array.from(testState.rateLimitAttempts.values())[0];
expect(attempts).toBe(11);
// No errors should have been recorded
expect(testState.errors).toEqual([]);
}, 90000);
it("should still respect retry limit for non-rate-limit errors", async () => {
// Enqueue a regular error job that should fail permanently
await queue.enqueue({ type: "err", err: "Regular error" });
await waitUntilQueueEmpty();
// Should have failed 4 times (initial + 3 retries) and not succeeded
expect(testState.errors).toEqual([
"Regular error",
"Regular error",
"Regular error",
"Regular error",
]);
expect(testState.results).toEqual([]);
}, 90000);
});
describe("Timeout handling", () => {
it("should retry timed out jobs and not waste semaphore slots", async () => {
// This test verifies that:
// 1. Jobs that timeout get retried correctly
// 2. Semaphore slots are freed when jobs timeout (via lease expiry)
// 3. Other jobs can still run while a job is being retried
// Enqueue a job that will timeout on first attempt, succeed on second
// timeoutSecs is 2, so we stall for 5 seconds to ensure timeout
await queue.enqueue({
type: "timeout-then-succeed",
val: 42,
timeoutDurSec: 5,
attemptsBeforeSuccess: 1, // Timeout once, then succeed
});
// Wait a bit for the first attempt to start
await new Promise((resolve) => setTimeout(resolve, 500));
// Enqueue more jobs to verify semaphore slots are eventually freed
// With concurrency=3, these should be able to run after the timeout
await queue.enqueue({ type: "val", val: 100 });
await queue.enqueue({ type: "val", val: 101 });
await queue.enqueue({ type: "val", val: 102 });
await waitUntilQueueEmpty();
// The timeout job should have succeeded after retry
expect(testState.results).toContain(42);
// All other jobs should have completed
expect(testState.results).toContain(100);
expect(testState.results).toContain(101);
expect(testState.results).toContain(102);
// The timeout job should have been attempted twice
const attempts = Array.from(testState.timeoutAttempts.values())[0];
expect(attempts).toBe(2);
// Concurrency should not have exceeded the limit
expect(testState.maxInFlight).toBeLessThanOrEqual(3);
}, 120000);
it("should handle job that times out multiple times before succeeding", async () => {
// Enqueue a single job that times out twice before succeeding
// This tests that the retry mechanism works correctly for timeouts
await queue.enqueue({
type: "timeout-then-succeed",
val: 99,
timeoutDurSec: 5,
attemptsBeforeSuccess: 2, // Timeout twice, then succeed
});
await waitUntilQueueEmpty();
// Job should eventually succeed
expect(testState.results).toEqual([99]);
// Should have been attempted 3 times (2 timeouts + 1 success)
const attempts = Array.from(testState.timeoutAttempts.values())[0];
expect(attempts).toBe(3);
}, 180000);
});
});