/**
* Unified SSE Parser — ONE implementation replacing 4 duplicates.
*
* Used by:
* - agent-loop.ts (processStreamWithCallbacks for real-time UI)
* - subagent.ts (collectStreamResult for batch collection)
* - teammate.ts (collectStreamResult for batch collection)
* - server-agent-loop.ts (processStreamWithCallbacks for SSE relay)
*/
import type {
BetaStreamEvent,
ContentBlockStartEvent,
ContentBlockDeltaEvent,
MessageStartEvent,
MessageDeltaEvent,
} from "./anthropic-types.js";
import type { StreamResult, StreamCallbacks, ToolUseBlock, ThinkingBlock } from "./types.js";
// ============================================================================
// RAW SSE PARSER — yields typed events from HTTP response body
// ============================================================================
/**
* Parse SSE stream from proxy HTTP response into typed events.
* Handles `data: {...}\n\n` format with [DONE] sentinel.
*/
export async function* parseSSEStream(
body: ReadableStream<Uint8Array>,
signal?: AbortSignal,
): AsyncGenerator<BetaStreamEvent> {
const reader = body.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
if (signal?.aborted) break;
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || !trimmed.startsWith("data: ")) continue;
const payload = trimmed.slice(6);
if (payload === "[DONE]") return;
try {
yield JSON.parse(payload) as BetaStreamEvent;
} catch { /* skip malformed JSON */ }
}
}
} finally {
reader.releaseLock();
}
}
// ============================================================================
// STREAM COLLECTOR — batch-collects all events (subagent/teammate)
// ============================================================================
/**
* Collect all events into a StreamResult. No callbacks — used by
* subagent and teammate where real-time text isn't needed.
*/
export async function collectStreamResult(
events: AsyncIterable<BetaStreamEvent>,
): Promise<StreamResult> {
return processStreamWithCallbacks(events, {});
}
// ============================================================================
// STREAM PROCESSOR — full event processing with callbacks (agent-loop)
// ============================================================================
/**
* Process streaming events with optional real-time callbacks.
* Used by agent-loop (needs onText for UI) and as internal impl for collect.
*/
export async function processStreamWithCallbacks(
events: AsyncIterable<BetaStreamEvent>,
callbacks: StreamCallbacks,
signal?: AbortSignal,
): Promise<StreamResult> {
let text = "";
const toolUseBlocks: ToolUseBlock[] = [];
const thinkingBlocks: ThinkingBlock[] = [];
let currentToolUse: { id: string; name: string; input: string; thoughtSignature?: string } | null = null;
let currentThinking: { text: string; signature: string } | null = null;
let inputTokens = 0;
let outputTokens = 0;
let cacheCreationTokens = 0;
let cacheReadTokens = 0;
let apiThinkingTokens = 0; // Actual count from API (Gemini/OpenAI), 0 = use char estimate
let compactionContent: string | null = null;
let isCompactionBlock = false;
let contextManagementApplied = false;
let stopReason = "end_turn";
for await (const event of events) {
if (signal?.aborted) break;
switch (event.type) {
case "content_block_start": {
const e = event as ContentBlockStartEvent;
const block = e.content_block;
if (block.type === "tool_use") {
currentToolUse = {
id: block.id,
name: block.name,
input: "",
thoughtSignature: (block as any).thought_signature,
};
callbacks.onToolStart?.(block.name);
} else if (block.type === "thinking") {
currentThinking = { text: "", signature: "" };
} else if (block.type === "compaction") {
isCompactionBlock = true;
compactionContent = "";
}
break;
}
case "content_block_delta": {
const e = event as ContentBlockDeltaEvent;
const delta = e.delta;
if (delta.type === "text_delta") {
text += delta.text;
callbacks.onText?.(delta.text);
} else if (delta.type === "input_json_delta" && currentToolUse) {
currentToolUse.input += delta.partial_json;
} else if (delta.type === "thinking_delta" && currentThinking) {
currentThinking.text += delta.thinking || "";
callbacks.onThinking?.(delta.thinking || "");
} else if (delta.type === "signature_delta" && currentThinking) {
currentThinking.signature += delta.signature || "";
} else if (delta.type === "compaction_delta" && isCompactionBlock) {
if (delta.content != null) {
compactionContent = (compactionContent || "") + delta.content;
}
}
break;
}
case "content_block_stop": {
if (currentToolUse) {
try {
const parsed = JSON.parse(currentToolUse.input || "{}");
toolUseBlocks.push({
id: currentToolUse.id,
name: currentToolUse.name,
input: parsed,
...(currentToolUse.thoughtSignature ? { thoughtSignature: currentToolUse.thoughtSignature } : {}),
});
// Second callback with parsed input for UI context
callbacks.onToolStart?.(currentToolUse.name, parsed);
} catch { /* skip bad JSON */ }
currentToolUse = null;
}
if (currentThinking) {
thinkingBlocks.push({
type: "thinking",
thinking: currentThinking.text,
signature: currentThinking.signature,
});
currentThinking = null;
}
if (isCompactionBlock) {
isCompactionBlock = false;
contextManagementApplied = true;
}
break;
}
case "message_start": {
const e = event as MessageStartEvent;
if (e.message?.usage) {
inputTokens += e.message.usage.input_tokens;
cacheCreationTokens += e.message.usage.cache_creation_input_tokens || 0;
cacheReadTokens += e.message.usage.cache_read_input_tokens || 0;
}
break;
}
case "message_delta": {
const e = event as MessageDeltaEvent;
if (e.usage) {
outputTokens += e.usage.output_tokens;
// Non-Anthropic providers (Gemini, OpenAI) emit input_tokens and cache info in message_delta
if (e.usage.input_tokens) {
inputTokens += e.usage.input_tokens;
}
if (e.usage.cache_read_input_tokens) {
cacheReadTokens += e.usage.cache_read_input_tokens;
}
if (e.usage.thinking_tokens) {
apiThinkingTokens += e.usage.thinking_tokens;
}
}
if (e.delta?.stop_reason) {
stopReason = e.delta.stop_reason;
}
if (e.delta?.context_management?.applied_edits?.length) {
contextManagementApplied = true;
}
break;
}
case "error": {
const errMsg = typeof event.error === "string"
? event.error
: JSON.stringify(event.error);
throw new Error(errMsg);
}
}
}
// Use actual thinking token count from API when available (Gemini/OpenAI),
// otherwise estimate from character count (~4 chars per token)
const thinkingTokens = apiThinkingTokens > 0
? apiThinkingTokens
: thinkingBlocks.reduce((sum, b) => sum + Math.ceil(b.thinking.length / 4), 0);
return {
text,
toolUseBlocks,
thinkingBlocks,
thinkingTokens,
usage: {
inputTokens,
outputTokens,
cacheCreationTokens,
cacheReadTokens,
},
compactionContent,
contextManagementApplied,
stopReason,
};
}