Skip to main content
Glama
InMemoryEventStore.test.ts9.06 kB
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js"; import { describe, expect, it, vi } from "vitest"; import { InMemoryEventStore } from "./InMemoryEventStore.js"; describe("InMemoryEventStore", () => { it("stores events and replays them after a specific event ID", async () => { const store = new InMemoryEventStore(); const streamId = "test-stream-123"; // Create test messages const messages: JSONRPCMessage[] = [ { id: 1, jsonrpc: "2.0", method: "initialize" }, { id: 2, jsonrpc: "2.0", method: "tools/list" }, { id: 3, jsonrpc: "2.0", method: "tools/call", params: { name: "test" } }, { id: 3, jsonrpc: "2.0", result: { success: true } }, { id: 4, jsonrpc: "2.0", method: "shutdown" }, ]; // Store all events and keep track of event IDs // Add small delays to ensure different timestamps for proper ordering const eventIds: string[] = []; for (const message of messages) { const eventId = await store.storeEvent(streamId, message); expect(eventId).toContain(streamId); eventIds.push(eventId); // Small delay to ensure different timestamps await new Promise((resolve) => setTimeout(resolve, 1)); } // Test replaying events after the second event const replayedEvents: Array<{ eventId: string; message: JSONRPCMessage }> = []; const sendMock = vi.fn(async (eventId: string, message: JSONRPCMessage) => { replayedEvents.push({ eventId, message }); }); const returnedStreamId = await store.replayEventsAfter( eventIds[1], // Replay after the second event { send: sendMock }, ); // Verify the correct stream ID was returned expect(returnedStreamId).toBe(streamId); // Verify that events 3, 4, and 5 were replayed (after event 2) expect(replayedEvents).toHaveLength(3); expect(sendMock).toHaveBeenCalledTimes(3); // Verify the replayed messages are correct and in order expect(replayedEvents[0].message).toEqual(messages[2]); expect(replayedEvents[1].message).toEqual(messages[3]); expect(replayedEvents[2].message).toEqual(messages[4]); // Verify event IDs are preserved expect(replayedEvents[0].eventId).toBe(eventIds[2]); expect(replayedEvents[1].eventId).toBe(eventIds[3]); expect(replayedEvents[2].eventId).toBe(eventIds[4]); }); it("isolates events by stream ID and only replays events from the same stream", async () => { const store = new InMemoryEventStore(); const streamId1 = "stream-alpha"; const streamId2 = "stream-beta"; // Create messages for two different streams const stream1Messages: JSONRPCMessage[] = [ { id: 1, jsonrpc: "2.0", method: "stream1.init" }, { id: 2, jsonrpc: "2.0", method: "stream1.process" }, { id: 3, jsonrpc: "2.0", method: "stream1.complete" }, ]; const stream2Messages: JSONRPCMessage[] = [ { id: 10, jsonrpc: "2.0", method: "stream2.init" }, { id: 20, jsonrpc: "2.0", method: "stream2.process" }, { id: 30, jsonrpc: "2.0", method: "stream2.complete" }, ]; // Interleave storing events from both streams with small delays const stream1EventIds: string[] = []; const stream2EventIds: string[] = []; // Store first event from each stream stream1EventIds.push(await store.storeEvent(streamId1, stream1Messages[0])); await new Promise((resolve) => setTimeout(resolve, 1)); stream2EventIds.push(await store.storeEvent(streamId2, stream2Messages[0])); await new Promise((resolve) => setTimeout(resolve, 1)); // Store second event from each stream stream1EventIds.push(await store.storeEvent(streamId1, stream1Messages[1])); await new Promise((resolve) => setTimeout(resolve, 1)); stream2EventIds.push(await store.storeEvent(streamId2, stream2Messages[1])); await new Promise((resolve) => setTimeout(resolve, 1)); // Store third event from each stream stream1EventIds.push(await store.storeEvent(streamId1, stream1Messages[2])); await new Promise((resolve) => setTimeout(resolve, 1)); stream2EventIds.push(await store.storeEvent(streamId2, stream2Messages[2])); // Replay events from stream 1 after its first event const stream1ReplayedEvents: Array<{ eventId: string; message: JSONRPCMessage; }> = []; const stream1SendMock = vi.fn( async (eventId: string, message: JSONRPCMessage) => { stream1ReplayedEvents.push({ eventId, message }); }, ); const returnedStreamId1 = await store.replayEventsAfter( stream1EventIds[0], { send: stream1SendMock }, ); // Verify only stream 1 events were replayed expect(returnedStreamId1).toBe(streamId1); expect(stream1ReplayedEvents).toHaveLength(2); expect(stream1ReplayedEvents[0].message).toEqual(stream1Messages[1]); expect(stream1ReplayedEvents[1].message).toEqual(stream1Messages[2]); // Verify no stream 2 events were included for (const event of stream1ReplayedEvents) { expect(event.eventId).toContain(streamId1); expect(event.eventId).not.toContain(streamId2); } // Now replay events from stream 2 after its first event const stream2ReplayedEvents: Array<{ eventId: string; message: JSONRPCMessage; }> = []; const stream2SendMock = vi.fn( async (eventId: string, message: JSONRPCMessage) => { stream2ReplayedEvents.push({ eventId, message }); }, ); const returnedStreamId2 = await store.replayEventsAfter( stream2EventIds[0], { send: stream2SendMock }, ); // Verify only stream 2 events were replayed expect(returnedStreamId2).toBe(streamId2); expect(stream2ReplayedEvents).toHaveLength(2); expect(stream2ReplayedEvents[0].message).toEqual(stream2Messages[1]); expect(stream2ReplayedEvents[1].message).toEqual(stream2Messages[2]); // Verify no stream 1 events were included for (const event of stream2ReplayedEvents) { expect(event.eventId).toContain(streamId2); expect(event.eventId).not.toContain(streamId1); } // Test edge case: replay with non-existent event ID returns empty string const invalidResult = await store.replayEventsAfter( "non-existent-event-id", { send: vi.fn() }, ); expect(invalidResult).toBe(""); // Test edge case: replay with empty event ID returns empty string const emptyResult = await store.replayEventsAfter("", { send: vi.fn() }); expect(emptyResult).toBe(""); }); it("keeps deterministic ordering even when events share the same timestamp", async () => { const store = new InMemoryEventStore(); const streamId = "deterministic-stream"; const messages: JSONRPCMessage[] = [ { id: 1, jsonrpc: "2.0", method: "step/one" }, { id: 2, jsonrpc: "2.0", method: "step/two" }, { id: 3, jsonrpc: "2.0", method: "step/three" }, { id: 4, jsonrpc: "2.0", method: "step/four" }, ]; const fixedTimestamp = 1_730_000_000_000; const nowSpy = vi.spyOn(Date, "now").mockReturnValue(fixedTimestamp); const eventIds: string[] = []; try { for (const message of messages) { eventIds.push(await store.storeEvent(streamId, message)); } } finally { nowSpy.mockRestore(); } // Ensure IDs already arrive sorted since we stored sequentially expect(eventIds).toEqual([...eventIds].sort()); const parts = eventIds.map((eventId) => eventId.split("_")); const timestampParts = parts.map(([, timestamp]) => timestamp); expect(timestampParts).toEqual( Array(messages.length).fill(fixedTimestamp.toString()), ); const counterSuffixes = parts.map(([, , counter]) => counter); expect(counterSuffixes).toEqual( messages.map((_, index) => index.toString(36).padStart(4, "0")), ); // Random parts should be 3 base36 characters each (due to substring(2, 5)) const randomParts = parts.map(([, , , random]) => random); for (const random of randomParts) { expect(random).toMatch(/^[0-9a-z]{3}$/); } // Replay after the first event and ensure the remainder flow in order const replayedMessages: JSONRPCMessage[] = []; const returnedStreamId = await store.replayEventsAfter(eventIds[0], { send: async (_eventId: string, message: JSONRPCMessage) => { replayedMessages.push(message); }, }); expect(returnedStreamId).toBe(streamId); expect(replayedMessages).toEqual(messages.slice(1)); // Now allow timestamp to advance to ensure counter resets const nextTimestamp = fixedTimestamp + 1; const secondSpy = vi.spyOn(Date, "now").mockReturnValue(nextTimestamp); try { const nextId = await store.storeEvent(streamId, { id: 5, jsonrpc: "2.0", method: "step/five", }); const [, , counter, random] = nextId.split("_"); expect(counter).toBe("0000"); expect(random).toMatch(/^[0-9a-z]{3}$/); } finally { secondSpy.mockRestore(); } }); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Valerio357/bet-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server