Skip to main content
Glama

Bun SSE Transport for MCP

by tigranbs
index.ts4.04 kB
import { randomUUID } from "node:crypto"; import { JSONRPCMessageSchema } from "@modelcontextprotocol/sdk/types.js"; /** * Server transport for SSE using Bun's Response type. * Adapts the SSEServerTransport functionality to work with Bun. */ export class BunSSEServerTransport { private _sessionId: string; private _sseResponse?: Response; private _responseObj?: Response; private _writer?: WritableStreamDefaultWriter<Uint8Array>; onmessage?: (message: any) => void; onclose?: () => void; onerror?: (error: any) => void; /** * Creates a new SSE server transport for Bun. * @param _endpoint The endpoint where clients should POST messages */ constructor(private _endpoint: string) { this._sessionId = randomUUID(); } /** * Creates a Response object suitable for Bun.serve to return. * This method should be called to get the Response for the initial SSE request. */ createResponse(): Promise<Response> { if (this._responseObj) { return Promise.resolve(this._responseObj); } // Create a readable stream that we'll write SSE events to const { readable, writable } = new TransformStream(); this._writer = writable.getWriter(); // Write the initial headers const encoder = new TextEncoder(); this._writer.write( encoder.encode( `event: endpoint\ndata: ${encodeURI(this._endpoint)}?sessionId=${ this._sessionId }\n\n` ) ); // Create the response this._responseObj = new Response(readable, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache, no-transform", Connection: "keep-alive", }, }); return Promise.resolve(this._responseObj); } /** * Start the SSE connection - required by McpServer * Note: Does not return a Response, unlike createResponse */ async start(): Promise<void> { if (!this._responseObj) { await this.createResponse(); } this._sseResponse = this._responseObj; } /** * Handles incoming POST messages. */ async handlePostMessage(req: Request): Promise<Response> { if (!this._sseResponse) { const message = "SSE connection not established"; return new Response(message, { status: 500 }); } try { const contentTypeHeader = req.headers.get("content-type"); if ( !contentTypeHeader || !contentTypeHeader.includes("application/json") ) { throw new Error(`Unsupported content-type: ${contentTypeHeader}`); } const body = await req.json(); await this.handleMessage(body); return new Response("Accepted", { status: 202 }); } catch (error) { this.onerror?.(error); return new Response(String(error), { status: 400 }); } } /** * Handle a client message, regardless of how it arrived. */ async handleMessage(message: any) { try { const parseResult = JSONRPCMessageSchema.safeParse(message); if (parseResult.success) { this.onmessage?.(parseResult.data); } else { throw new Error( `Invalid JSON-RPC message: ${parseResult.error.message}` ); } } catch (error) { this.onerror?.(error); throw error; } } /** * Close the SSE connection. */ async close() { if (this._writer && typeof this._writer.close === "function") { await this._writer.close(); this._writer = undefined; } this._sseResponse = undefined; this._responseObj = undefined; this.onclose?.(); } /** * Send a message over the SSE connection. */ async send(message: any) { if (!this._writer) { throw new Error("Not connected"); } const encoder = new TextEncoder(); this._writer.write( encoder.encode(`event: message\ndata: ${JSON.stringify(message)}\n\n`) ); } /** * Returns the session ID for this transport. */ get sessionId(): string { return this._sessionId; } } export default BunSSEServerTransport;

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tigranbs/bun-mcp-sse-transport'

If you have feedback or need assistance with the MCP directory API, please join our Discord server