Skip to main content
Glama
sse.ts4.56 kB
import { StreamLike } from "../../utils/stream"; import { StreamResponseSchema, TextResponseSchema } from "./schemas"; // Ask command specific SSE processing (could be reusable for other SSE APIs) export interface SSEProcessor { onData: (chunk: unknown) => void; onEnd: () => void; onError: (err: unknown) => void; } export const createSSEProcessor = ( onFirstOutput?: () => void, onTextChunk?: (text: string) => void, ): SSEProcessor => { let buffer = ""; let prebuffer = ""; let sawSseData = false; let eventBuf: string[] = []; let sawDone = false; let notifiedFirstOutput = false; const notifyFirstOutput = () => { if (notifiedFirstOutput) return; notifiedFirstOutput = true; try { if (onFirstOutput) onFirstOutput(); } catch (_) { // ignore callback errors } }; const flushEvent = () => { if (!eventBuf.length) return; const payload = eventBuf.join("\n"); eventBuf = []; if (payload === "[DONE]") { sawDone = true; return; } let json: unknown = null; try { json = JSON.parse(payload); } catch (_) { json = null; } let textOut: string | null = null; // Try parsing as text response first const textResult = TextResponseSchema.safeParse(json); if (textResult.success) { textOut = textResult.data.text; } else { // Try parsing as stream response const streamResult = StreamResponseSchema.safeParse(json); if ( streamResult.success && streamResult.data.choices?.[0]?.delta?.content ) { textOut = streamResult.data.choices[0].delta.content; } else if (typeof json === "string") { textOut = json; } } notifyFirstOutput(); if (typeof textOut === "string") { if (onTextChunk) onTextChunk(textOut); process.stdout.write(textOut); } else if (payload && payload.trim() !== "[object Object]") { if (onTextChunk) onTextChunk(payload); process.stdout.write(payload); } }; const onData = (chunk: unknown) => { if (sawDone) return; const chunkStr = Buffer.isBuffer(chunk) ? chunk.toString() : String(chunk); // Stream raw text immediately until we detect SSE lines (data: ...) if (!sawSseData) { const combined = prebuffer + chunkStr; if (/(\r?\n|^)data:/.test(combined)) { // Detected SSE, switch to SSE parsing mode sawSseData = true; buffer += combined; prebuffer = ""; } else { prebuffer = ""; notifyFirstOutput(); if (onTextChunk) onTextChunk(chunkStr); process.stdout.write(chunkStr); return; } } else { buffer += chunkStr; } const lines = buffer.split(/\r?\n/); buffer = lines.pop() ?? ""; for (const line of lines) { const trimmed = line.trimStart(); if (trimmed === "") { flushEvent(); continue; } if (trimmed.startsWith(":")) continue; // comment if (trimmed.startsWith("event:")) continue; // ignore event name if (trimmed.startsWith("data:")) { const p = trimmed.slice(5).trimStart(); eventBuf.push(p); continue; } } }; const onEnd = () => { if (!sawSseData && prebuffer) { notifyFirstOutput(); if (onTextChunk) onTextChunk(prebuffer); process.stdout.write(prebuffer); prebuffer = ""; } if (buffer) { const trimmed = buffer.trimStart(); if (!sawSseData || !/(\r?\n|^)data:/.test(trimmed)) { notifyFirstOutput(); if (onTextChunk) onTextChunk(buffer); process.stdout.write(buffer); } else { // finalize any pending event if (trimmed) { const maybe = trimmed.startsWith("data:") ? trimmed.slice(5).trimStart() : trimmed; if (maybe) { eventBuf.push(maybe); } } flushEvent(); } } process.stdout.write("\n"); }; const onError = (err: unknown) => { throw err; }; return { onData, onEnd, onError }; }; export const processStream = async ( stream: StreamLike, processor: SSEProcessor, ): Promise<void> => { return new Promise<void>((resolve, reject) => { stream.on("data", processor.onData); stream.on("end", () => { processor.onEnd(); resolve(); }); stream.on("error", (err: unknown) => { try { processor.onError(err); } catch (processedError) { reject(processedError); } }); }); };

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ExpertVagabond/universal-blockchain'

If you have feedback or need assistance with the MCP directory API, please join our Discord server