import { describe, it, expect, vi } from "vitest";
import {
parseSSEStream,
collectStreamResult,
processStreamWithCallbacks,
} from "./sse-parser.js";
import type { BetaStreamEvent } from "./anthropic-types.js";
import type { StreamCallbacks } from "./types.js";
// ============================================================================
// HELPERS — create ReadableStreams and AsyncIterables from test data
// ============================================================================
/** Encode a string into a ReadableStream<Uint8Array> simulating an HTTP body */
function makeStream(text: string): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
return new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode(text));
controller.close();
},
});
}
/** Encode strings as separate chunks (simulates chunked transfer) */
function makeChunkedStream(chunks: string[]): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
return new ReadableStream({
start(controller) {
for (const chunk of chunks) {
controller.enqueue(encoder.encode(chunk));
}
controller.close();
},
});
}
/** Create an async iterable from an array of events */
async function* eventsFromArray(
events: BetaStreamEvent[],
): AsyncGenerator<BetaStreamEvent> {
for (const e of events) {
yield e;
}
}
// ============================================================================
// parseSSEStream
// ============================================================================
describe("parseSSEStream", () => {
it("parses a single SSE event", async () => {
const event = { type: "message_start", message: { usage: { input_tokens: 10, output_tokens: 0 } } };
const body = makeStream(`data: ${JSON.stringify(event)}\n\n`);
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body)) {
results.push(e);
}
expect(results).toHaveLength(1);
expect(results[0]).toEqual(event);
});
it("parses multiple SSE events", async () => {
const events = [
{ type: "message_start", message: { usage: { input_tokens: 5, output_tokens: 0 } } },
{ type: "content_block_start", index: 0, content_block: { type: "text", text: "" } },
{ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "Hello" } },
];
const sseText = events.map((e) => `data: ${JSON.stringify(e)}\n\n`).join("");
const body = makeStream(sseText);
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body)) {
results.push(e);
}
expect(results).toHaveLength(3);
expect(results[0].type).toBe("message_start");
expect(results[1].type).toBe("content_block_start");
expect(results[2].type).toBe("content_block_delta");
});
it("handles [DONE] sentinel and stops", async () => {
const event = { type: "message_start", message: { usage: { input_tokens: 1, output_tokens: 0 } } };
const trailing = { type: "content_block_start", index: 0, content_block: { type: "text", text: "" } };
const sseText = `data: ${JSON.stringify(event)}\n\ndata: [DONE]\n\ndata: ${JSON.stringify(trailing)}\n\n`;
const body = makeStream(sseText);
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body)) {
results.push(e);
}
expect(results).toHaveLength(1);
expect(results[0].type).toBe("message_start");
});
it("skips empty lines and comment-like lines", async () => {
const event = { type: "message_start", message: { usage: { input_tokens: 0, output_tokens: 0 } } };
const sseText = `\n\n: this is a comment\n\ndata: ${JSON.stringify(event)}\n\n`;
const body = makeStream(sseText);
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body)) {
results.push(e);
}
expect(results).toHaveLength(1);
});
it("skips malformed JSON payloads", async () => {
const good = { type: "message_start", message: { usage: { input_tokens: 0, output_tokens: 0 } } };
const sseText = `data: {broken json\n\ndata: ${JSON.stringify(good)}\n\n`;
const body = makeStream(sseText);
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body)) {
results.push(e);
}
expect(results).toHaveLength(1);
expect(results[0].type).toBe("message_start");
});
it("returns nothing for empty stream", async () => {
const body = makeStream("");
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body)) {
results.push(e);
}
expect(results).toHaveLength(0);
});
it("handles data split across chunks", async () => {
const event = { type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "hi" } };
const json = JSON.stringify(event);
// Split in the middle of the JSON
const half = Math.floor(json.length / 2);
const body = makeChunkedStream([
`data: ${json.slice(0, half)}`,
`${json.slice(half)}\n\n`,
]);
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body)) {
results.push(e);
}
expect(results).toHaveLength(1);
expect(results[0]).toEqual(event);
});
it("handles data: line split across chunks (newline in second chunk)", async () => {
const event = { type: "message_start", message: { usage: { input_tokens: 0, output_tokens: 0 } } };
const body = makeChunkedStream([
`data: ${JSON.stringify(event)}`,
"\n\n",
]);
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body)) {
results.push(e);
}
expect(results).toHaveLength(1);
});
it("respects abort signal", async () => {
const controller = new AbortController();
controller.abort(); // Already aborted
const event = { type: "message_start", message: { usage: { input_tokens: 0, output_tokens: 0 } } };
const body = makeStream(`data: ${JSON.stringify(event)}\n\n`);
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body, controller.signal)) {
results.push(e);
}
expect(results).toHaveLength(0);
});
it("ignores lines without data: prefix", async () => {
const event = { type: "message_start", message: { usage: { input_tokens: 0, output_tokens: 0 } } };
const sseText = `event: ping\nid: 123\nretry: 5000\ndata: ${JSON.stringify(event)}\n\n`;
const body = makeStream(sseText);
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body)) {
results.push(e);
}
expect(results).toHaveLength(1);
});
it("handles whitespace around data lines", async () => {
const event = { type: "message_start", message: { usage: { input_tokens: 0, output_tokens: 0 } } };
const sseText = ` data: ${JSON.stringify(event)} \n\n`;
const body = makeStream(sseText);
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body)) {
results.push(e);
}
expect(results).toHaveLength(1);
});
});
// ============================================================================
// collectStreamResult
// ============================================================================
describe("collectStreamResult", () => {
it("returns default empty result for no events", async () => {
const result = await collectStreamResult(eventsFromArray([]));
expect(result.text).toBe("");
expect(result.toolUseBlocks).toEqual([]);
expect(result.thinkingBlocks).toEqual([]);
expect(result.thinkingTokens).toBe(0);
expect(result.usage.inputTokens).toBe(0);
expect(result.usage.outputTokens).toBe(0);
expect(result.usage.cacheCreationTokens).toBe(0);
expect(result.usage.cacheReadTokens).toBe(0);
expect(result.compactionContent).toBeNull();
expect(result.contextManagementApplied).toBe(false);
expect(result.stopReason).toBe("end_turn");
});
it("collects text from text_delta events", async () => {
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "text", text: "" } },
{ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "Hello " } },
{ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "world" } },
{ type: "content_block_stop", index: 0 },
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.text).toBe("Hello world");
});
it("collects tool use blocks from streaming events", async () => {
const events: BetaStreamEvent[] = [
{
type: "content_block_start",
index: 0,
content_block: { type: "tool_use", id: "tool_1", name: "read_file" },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "input_json_delta", partial_json: '{"path":' },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "input_json_delta", partial_json: '"/tmp/test.txt"}' },
},
{ type: "content_block_stop", index: 0 },
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.toolUseBlocks).toHaveLength(1);
expect(result.toolUseBlocks[0].id).toBe("tool_1");
expect(result.toolUseBlocks[0].name).toBe("read_file");
expect(result.toolUseBlocks[0].input).toEqual({ path: "/tmp/test.txt" });
});
it("collects multiple tool use blocks", async () => {
const events: BetaStreamEvent[] = [
{
type: "content_block_start",
index: 0,
content_block: { type: "tool_use", id: "t1", name: "read_file" },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "input_json_delta", partial_json: '{"a": 1}' },
},
{ type: "content_block_stop", index: 0 },
{
type: "content_block_start",
index: 1,
content_block: { type: "tool_use", id: "t2", name: "write_file" },
},
{
type: "content_block_delta",
index: 1,
delta: { type: "input_json_delta", partial_json: '{"b": 2}' },
},
{ type: "content_block_stop", index: 1 },
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.toolUseBlocks).toHaveLength(2);
expect(result.toolUseBlocks[0].name).toBe("read_file");
expect(result.toolUseBlocks[1].name).toBe("write_file");
});
it("handles tool use with empty JSON input", async () => {
const events: BetaStreamEvent[] = [
{
type: "content_block_start",
index: 0,
content_block: { type: "tool_use", id: "t1", name: "list_files" },
},
// No input_json_delta events — empty input
{ type: "content_block_stop", index: 0 },
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.toolUseBlocks).toHaveLength(1);
expect(result.toolUseBlocks[0].input).toEqual({});
});
it("skips tool use with malformed JSON input", async () => {
const events: BetaStreamEvent[] = [
{
type: "content_block_start",
index: 0,
content_block: { type: "tool_use", id: "t1", name: "bad_tool" },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "input_json_delta", partial_json: "{not valid json" },
},
{ type: "content_block_stop", index: 0 },
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.toolUseBlocks).toHaveLength(0);
});
it("collects usage from message_start", async () => {
const events: BetaStreamEvent[] = [
{
type: "message_start",
message: {
usage: {
input_tokens: 100,
output_tokens: 0,
cache_creation_input_tokens: 50,
cache_read_input_tokens: 25,
},
},
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.usage.inputTokens).toBe(100);
expect(result.usage.cacheCreationTokens).toBe(50);
expect(result.usage.cacheReadTokens).toBe(25);
});
it("collects output tokens from message_delta", async () => {
const events: BetaStreamEvent[] = [
{
type: "message_delta",
delta: { stop_reason: "end_turn" },
usage: { output_tokens: 42 },
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.usage.outputTokens).toBe(42);
});
it("handles message_start with no cache tokens", async () => {
const events: BetaStreamEvent[] = [
{
type: "message_start",
message: {
usage: {
input_tokens: 200,
output_tokens: 0,
},
},
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.usage.inputTokens).toBe(200);
expect(result.usage.cacheCreationTokens).toBe(0);
expect(result.usage.cacheReadTokens).toBe(0);
});
it("captures stop_reason from message_delta", async () => {
const events: BetaStreamEvent[] = [
{
type: "message_delta",
delta: { stop_reason: "tool_use" },
usage: { output_tokens: 10 },
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.stopReason).toBe("tool_use");
});
it("defaults stopReason to end_turn when not provided", async () => {
const events: BetaStreamEvent[] = [
{
type: "message_delta",
delta: {},
usage: { output_tokens: 0 },
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.stopReason).toBe("end_turn");
});
});
// ============================================================================
// processStreamWithCallbacks — thinking blocks
// ============================================================================
describe("processStreamWithCallbacks — thinking", () => {
it("collects thinking blocks", async () => {
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "thinking" } },
{
type: "content_block_delta",
index: 0,
delta: { type: "thinking_delta", thinking: "Let me think..." },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "thinking_delta", thinking: " about this." },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "signature_delta", signature: "sig123" },
},
{ type: "content_block_stop", index: 0 },
];
const result = await processStreamWithCallbacks(eventsFromArray(events), {});
expect(result.thinkingBlocks).toHaveLength(1);
expect(result.thinkingBlocks[0].thinking).toBe("Let me think... about this.");
expect(result.thinkingBlocks[0].signature).toBe("sig123");
expect(result.thinkingBlocks[0].type).toBe("thinking");
});
it("estimates thinking tokens from character count", async () => {
// 40 chars / 4 = 10 tokens
const thinkingText = "A".repeat(40);
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "thinking" } },
{
type: "content_block_delta",
index: 0,
delta: { type: "thinking_delta", thinking: thinkingText },
},
{ type: "content_block_stop", index: 0 },
];
const result = await processStreamWithCallbacks(eventsFromArray(events), {});
expect(result.thinkingTokens).toBe(10);
});
it("rounds up thinking token estimate", async () => {
// 5 chars / 4 = 1.25 => ceil = 2
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "thinking" } },
{
type: "content_block_delta",
index: 0,
delta: { type: "thinking_delta", thinking: "Think" },
},
{ type: "content_block_stop", index: 0 },
];
const result = await processStreamWithCallbacks(eventsFromArray(events), {});
expect(result.thinkingTokens).toBe(2);
});
it("handles thinking_delta with empty string", async () => {
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "thinking" } },
{
type: "content_block_delta",
index: 0,
delta: { type: "thinking_delta", thinking: "" },
},
{ type: "content_block_stop", index: 0 },
];
const result = await processStreamWithCallbacks(eventsFromArray(events), {});
expect(result.thinkingBlocks).toHaveLength(1);
expect(result.thinkingBlocks[0].thinking).toBe("");
expect(result.thinkingTokens).toBe(0);
});
});
// ============================================================================
// processStreamWithCallbacks — compaction
// ============================================================================
describe("processStreamWithCallbacks — compaction", () => {
it("collects compaction content", async () => {
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "compaction" } },
{
type: "content_block_delta",
index: 0,
delta: { type: "compaction_delta", content: "Summary of " },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "compaction_delta", content: "conversation." },
},
{ type: "content_block_stop", index: 0 },
];
const result = await processStreamWithCallbacks(eventsFromArray(events), {});
expect(result.compactionContent).toBe("Summary of conversation.");
expect(result.contextManagementApplied).toBe(true);
});
it("leaves compactionContent null when no compaction block", async () => {
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "text", text: "" } },
{ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "Hi" } },
{ type: "content_block_stop", index: 0 },
];
const result = await processStreamWithCallbacks(eventsFromArray(events), {});
expect(result.compactionContent).toBeNull();
expect(result.contextManagementApplied).toBe(false);
});
it("sets contextManagementApplied from message_delta applied_edits", async () => {
const events: BetaStreamEvent[] = [
{
type: "message_delta",
delta: {
stop_reason: "end_turn",
context_management: {
applied_edits: [{ type: "clear_tool_results" }],
},
},
usage: { output_tokens: 0 },
},
];
const result = await processStreamWithCallbacks(eventsFromArray(events), {});
expect(result.contextManagementApplied).toBe(true);
});
it("does not set contextManagementApplied for empty applied_edits", async () => {
const events: BetaStreamEvent[] = [
{
type: "message_delta",
delta: {
stop_reason: "end_turn",
context_management: {
applied_edits: [],
},
},
usage: { output_tokens: 0 },
},
];
const result = await processStreamWithCallbacks(eventsFromArray(events), {});
expect(result.contextManagementApplied).toBe(false);
});
});
// ============================================================================
// processStreamWithCallbacks — callbacks
// ============================================================================
describe("processStreamWithCallbacks — callbacks", () => {
it("fires onText callback for each text delta", async () => {
const onText = vi.fn();
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "text", text: "" } },
{ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "Hello" } },
{ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: " world" } },
{ type: "content_block_stop", index: 0 },
];
await processStreamWithCallbacks(eventsFromArray(events), { onText });
expect(onText).toHaveBeenCalledTimes(2);
expect(onText).toHaveBeenNthCalledWith(1, "Hello");
expect(onText).toHaveBeenNthCalledWith(2, " world");
});
it("fires onToolStart on block start and block stop with parsed input", async () => {
const onToolStart = vi.fn();
const events: BetaStreamEvent[] = [
{
type: "content_block_start",
index: 0,
content_block: { type: "tool_use", id: "t1", name: "bash" },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "input_json_delta", partial_json: '{"command": "ls"}' },
},
{ type: "content_block_stop", index: 0 },
];
await processStreamWithCallbacks(eventsFromArray(events), { onToolStart });
// Called twice: once on content_block_start (name only), once on content_block_stop (name + input)
expect(onToolStart).toHaveBeenCalledTimes(2);
expect(onToolStart).toHaveBeenNthCalledWith(1, "bash");
expect(onToolStart).toHaveBeenNthCalledWith(2, "bash", { command: "ls" });
});
it("fires onThinking callback for thinking deltas", async () => {
const onThinking = vi.fn();
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "thinking" } },
{
type: "content_block_delta",
index: 0,
delta: { type: "thinking_delta", thinking: "Hmm" },
},
{ type: "content_block_stop", index: 0 },
];
await processStreamWithCallbacks(eventsFromArray(events), { onThinking });
expect(onThinking).toHaveBeenCalledTimes(1);
expect(onThinking).toHaveBeenCalledWith("Hmm");
});
it("works with no callbacks provided (collectStreamResult path)", async () => {
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "text", text: "" } },
{ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "ok" } },
{ type: "content_block_stop", index: 0 },
];
// Should not throw
const result = await processStreamWithCallbacks(eventsFromArray(events), {});
expect(result.text).toBe("ok");
});
});
// ============================================================================
// processStreamWithCallbacks — error handling
// ============================================================================
describe("processStreamWithCallbacks — error events", () => {
it("throws on string error event", async () => {
const events: BetaStreamEvent[] = [
{ type: "error", error: "rate_limit_exceeded" } as BetaStreamEvent,
];
await expect(
processStreamWithCallbacks(eventsFromArray(events), {}),
).rejects.toThrow("rate_limit_exceeded");
});
it("throws on object error event (serialized as JSON)", async () => {
const errorObj = { type: "overloaded_error", message: "Server is overloaded" };
const events: BetaStreamEvent[] = [
{ type: "error", error: errorObj } as BetaStreamEvent,
];
await expect(
processStreamWithCallbacks(eventsFromArray(events), {}),
).rejects.toThrow(JSON.stringify(errorObj));
});
});
// ============================================================================
// processStreamWithCallbacks — abort signal
// ============================================================================
describe("processStreamWithCallbacks — abort signal", () => {
it("stops processing when signal is aborted", async () => {
const controller = new AbortController();
async function* slowEvents(): AsyncGenerator<BetaStreamEvent> {
yield {
type: "content_block_start",
index: 0,
content_block: { type: "text", text: "" },
};
yield {
type: "content_block_delta",
index: 0,
delta: { type: "text_delta", text: "Before" },
};
// Abort after first delta
controller.abort();
yield {
type: "content_block_delta",
index: 0,
delta: { type: "text_delta", text: " After" },
};
}
const result = await processStreamWithCallbacks(
slowEvents(),
{},
controller.signal,
);
// Should have "Before" but not " After"
expect(result.text).toBe("Before");
});
it("returns partial result when aborted before any events", async () => {
const controller = new AbortController();
controller.abort();
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "text", text: "" } },
{ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "Should not appear" } },
];
const result = await processStreamWithCallbacks(
eventsFromArray(events),
{},
controller.signal,
);
expect(result.text).toBe("");
});
});
// ============================================================================
// Full end-to-end — parseSSEStream -> collectStreamResult
// ============================================================================
describe("end-to-end: parseSSEStream -> collectStreamResult", () => {
it("processes a complete stream with text and tool use", async () => {
const events: object[] = [
{ type: "message_start", message: { usage: { input_tokens: 100, output_tokens: 0, cache_creation_input_tokens: 20, cache_read_input_tokens: 30 } } },
{ type: "content_block_start", index: 0, content_block: { type: "text", text: "" } },
{ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "I'll read that file." } },
{ type: "content_block_stop", index: 0 },
{ type: "content_block_start", index: 1, content_block: { type: "tool_use", id: "tu_1", name: "read_file" } },
{ type: "content_block_delta", index: 1, delta: { type: "input_json_delta", partial_json: '{"file_path": "/test.txt"}' } },
{ type: "content_block_stop", index: 1 },
{ type: "message_delta", delta: { stop_reason: "tool_use" }, usage: { output_tokens: 55 } },
];
const sseText = events.map((e) => `data: ${JSON.stringify(e)}\n\n`).join("") + "data: [DONE]\n\n";
const body = makeStream(sseText);
const parsed = parseSSEStream(body);
const result = await collectStreamResult(parsed);
expect(result.text).toBe("I'll read that file.");
expect(result.toolUseBlocks).toHaveLength(1);
expect(result.toolUseBlocks[0].name).toBe("read_file");
expect(result.toolUseBlocks[0].input).toEqual({ file_path: "/test.txt" });
expect(result.usage.inputTokens).toBe(100);
expect(result.usage.outputTokens).toBe(55);
expect(result.usage.cacheCreationTokens).toBe(20);
expect(result.usage.cacheReadTokens).toBe(30);
expect(result.stopReason).toBe("tool_use");
expect(result.compactionContent).toBeNull();
expect(result.contextManagementApplied).toBe(false);
});
it("processes a stream with thinking and compaction", async () => {
const events: object[] = [
{ type: "message_start", message: { usage: { input_tokens: 50, output_tokens: 0 } } },
{ type: "content_block_start", index: 0, content_block: { type: "thinking" } },
{ type: "content_block_delta", index: 0, delta: { type: "thinking_delta", thinking: "Deep thought" } },
{ type: "content_block_delta", index: 0, delta: { type: "signature_delta", signature: "abc" } },
{ type: "content_block_stop", index: 0 },
{ type: "content_block_start", index: 1, content_block: { type: "text", text: "" } },
{ type: "content_block_delta", index: 1, delta: { type: "text_delta", text: "Answer." } },
{ type: "content_block_stop", index: 1 },
{ type: "content_block_start", index: 2, content_block: { type: "compaction" } },
{ type: "content_block_delta", index: 2, delta: { type: "compaction_delta", content: "Compacted summary" } },
{ type: "content_block_stop", index: 2 },
{ type: "message_delta", delta: { stop_reason: "end_turn" }, usage: { output_tokens: 20 } },
];
const sseText = events.map((e) => `data: ${JSON.stringify(e)}\n\n`).join("");
const body = makeStream(sseText);
const parsed = parseSSEStream(body);
const result = await collectStreamResult(parsed);
expect(result.text).toBe("Answer.");
expect(result.thinkingBlocks).toHaveLength(1);
expect(result.thinkingBlocks[0].thinking).toBe("Deep thought");
expect(result.thinkingBlocks[0].signature).toBe("abc");
// "Deep thought" = 12 chars, ceil(12/4) = 3
expect(result.thinkingTokens).toBe(3);
expect(result.compactionContent).toBe("Compacted summary");
expect(result.contextManagementApplied).toBe(true);
expect(result.stopReason).toBe("end_turn");
});
});
// ============================================================================
// Edge cases
// ============================================================================
describe("edge cases", () => {
it("accumulates usage across multiple message_start events", async () => {
// Theoretically possible in some proxy scenarios
const events: BetaStreamEvent[] = [
{
type: "message_start",
message: { usage: { input_tokens: 50, output_tokens: 0, cache_creation_input_tokens: 10, cache_read_input_tokens: 5 } },
},
{
type: "message_start",
message: { usage: { input_tokens: 30, output_tokens: 0, cache_creation_input_tokens: 20, cache_read_input_tokens: 15 } },
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.usage.inputTokens).toBe(80);
expect(result.usage.cacheCreationTokens).toBe(30);
expect(result.usage.cacheReadTokens).toBe(20);
});
it("accumulates output tokens across multiple message_delta events", async () => {
const events: BetaStreamEvent[] = [
{ type: "message_delta", delta: {}, usage: { output_tokens: 10 } },
{ type: "message_delta", delta: { stop_reason: "end_turn" }, usage: { output_tokens: 25 } },
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.usage.outputTokens).toBe(35);
});
it("collects input_tokens from message_delta (Gemini/OpenAI path)", async () => {
// Gemini/OpenAI emit input_tokens in message_delta, not message_start
const events: BetaStreamEvent[] = [
{
type: "message_start",
message: { usage: { input_tokens: 0, output_tokens: 0 } },
},
{
type: "message_delta",
delta: { stop_reason: "end_turn" },
usage: { output_tokens: 50, input_tokens: 1200 },
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.usage.inputTokens).toBe(1200);
expect(result.usage.outputTokens).toBe(50);
});
it("collects cache_read_input_tokens from message_delta", async () => {
const events: BetaStreamEvent[] = [
{
type: "message_delta",
delta: { stop_reason: "end_turn" },
usage: { output_tokens: 10, cache_read_input_tokens: 500 },
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.usage.cacheReadTokens).toBe(500);
});
it("collects thinking_tokens from message_delta (actual API count)", async () => {
// When API provides actual thinking_tokens, use those instead of char estimate
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "thinking" } },
{
type: "content_block_delta",
index: 0,
delta: { type: "thinking_delta", thinking: "A".repeat(100) },
},
{ type: "content_block_stop", index: 0 },
{
type: "message_delta",
delta: { stop_reason: "end_turn" },
usage: { output_tokens: 10, thinking_tokens: 42 },
},
];
const result = await collectStreamResult(eventsFromArray(events));
// Should use API thinking_tokens (42), not char estimate (100/4 = 25)
expect(result.thinkingTokens).toBe(42);
});
it("falls back to char estimate when no API thinking_tokens", async () => {
// Anthropic doesn't provide thinking_tokens in message_delta
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "thinking" } },
{
type: "content_block_delta",
index: 0,
delta: { type: "thinking_delta", thinking: "A".repeat(100) },
},
{ type: "content_block_stop", index: 0 },
{
type: "message_delta",
delta: { stop_reason: "end_turn" },
usage: { output_tokens: 10 },
},
];
const result = await collectStreamResult(eventsFromArray(events));
// Should use char estimate: 100/4 = 25
expect(result.thinkingTokens).toBe(25);
});
it("accumulates input_tokens across message_start and message_delta", async () => {
// Anthropic models emit in message_start, so both sources should add up
const events: BetaStreamEvent[] = [
{
type: "message_start",
message: { usage: { input_tokens: 100, output_tokens: 0 } },
},
{
type: "message_delta",
delta: { stop_reason: "end_turn" },
usage: { output_tokens: 10, input_tokens: 200 },
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.usage.inputTokens).toBe(300);
});
it("handles text and tool use interleaved", async () => {
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "text", text: "" } },
{ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "Before tool." } },
{ type: "content_block_stop", index: 0 },
{
type: "content_block_start",
index: 1,
content_block: { type: "tool_use", id: "t1", name: "bash" },
},
{
type: "content_block_delta",
index: 1,
delta: { type: "input_json_delta", partial_json: '{"cmd":"ls"}' },
},
{ type: "content_block_stop", index: 1 },
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.text).toBe("Before tool.");
expect(result.toolUseBlocks).toHaveLength(1);
expect(result.toolUseBlocks[0].input).toEqual({ cmd: "ls" });
});
it("handles content_block_stop without active tool or thinking", async () => {
// content_block_stop for a text block — should not crash
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "text", text: "" } },
{ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "text" } },
{ type: "content_block_stop", index: 0 },
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.text).toBe("text");
expect(result.toolUseBlocks).toHaveLength(0);
expect(result.thinkingBlocks).toHaveLength(0);
});
it("ignores input_json_delta when no tool use is active", async () => {
// This shouldn't happen in practice, but the code should handle it gracefully
const events: BetaStreamEvent[] = [
{
type: "content_block_delta",
index: 0,
delta: { type: "input_json_delta", partial_json: '{"orphan": true}' },
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.toolUseBlocks).toHaveLength(0);
});
it("ignores thinking_delta when no thinking block is active", async () => {
const events: BetaStreamEvent[] = [
{
type: "content_block_delta",
index: 0,
delta: { type: "thinking_delta", thinking: "orphan thought" },
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.thinkingBlocks).toHaveLength(0);
expect(result.thinkingTokens).toBe(0);
});
it("ignores signature_delta when no thinking block is active", async () => {
const events: BetaStreamEvent[] = [
{
type: "content_block_delta",
index: 0,
delta: { type: "signature_delta", signature: "orphan sig" },
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.thinkingBlocks).toHaveLength(0);
});
it("ignores compaction_delta when no compaction block is active", async () => {
const events: BetaStreamEvent[] = [
{
type: "content_block_delta",
index: 0,
delta: { type: "compaction_delta", content: "orphan compaction" },
},
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.compactionContent).toBeNull();
});
it("handles multiple thinking blocks", async () => {
const events: BetaStreamEvent[] = [
{ type: "content_block_start", index: 0, content_block: { type: "thinking" } },
{ type: "content_block_delta", index: 0, delta: { type: "thinking_delta", thinking: "First thought" } },
{ type: "content_block_delta", index: 0, delta: { type: "signature_delta", signature: "sig1" } },
{ type: "content_block_stop", index: 0 },
{ type: "content_block_start", index: 1, content_block: { type: "thinking" } },
{ type: "content_block_delta", index: 1, delta: { type: "thinking_delta", thinking: "Second" } },
{ type: "content_block_delta", index: 1, delta: { type: "signature_delta", signature: "sig2" } },
{ type: "content_block_stop", index: 1 },
];
const result = await collectStreamResult(eventsFromArray(events));
expect(result.thinkingBlocks).toHaveLength(2);
expect(result.thinkingBlocks[0].thinking).toBe("First thought");
expect(result.thinkingBlocks[1].thinking).toBe("Second");
// "First thought" = 13 chars, ceil(13/4) = 4; "Second" = 6 chars, ceil(6/4) = 2; total = 6
expect(result.thinkingTokens).toBe(6);
});
it("parseSSEStream handles stream with only [DONE]", async () => {
const body = makeStream("data: [DONE]\n\n");
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body)) {
results.push(e);
}
expect(results).toHaveLength(0);
});
it("parseSSEStream handles stream with trailing data in buffer", async () => {
// Data that doesn't end with \n — stays in buffer and is never processed
const event = { type: "message_start", message: { usage: { input_tokens: 0, output_tokens: 0 } } };
const body = makeStream(`data: ${JSON.stringify(event)}\n\ndata: incomplete`);
const results: BetaStreamEvent[] = [];
for await (const e of parseSSEStream(body)) {
results.push(e);
}
// Only the complete event should be yielded; "data: incomplete" stays in buffer
expect(results).toHaveLength(1);
});
});