Codebase MCP

import { randomUUID } from "node:crypto"; import { IncomingMessage, ServerResponse } from "node:http"; import { Transport } from "../shared/transport.js"; import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import getRawBody from "raw-body"; import contentType from "content-type"; const MAXIMUM_MESSAGE_SIZE = "4mb"; /** * Server transport for SSE: this will send messages over an SSE connection and receive messages from HTTP POST requests. * * This transport is only available in Node.js environments. */ export class SSEServerTransport implements Transport { private _sseResponse?: ServerResponse; private _sessionId: string; onclose?: () => void; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; /** * Creates a new SSE server transport, which will direct the client to POST messages to the relative or absolute URL identified by `_endpoint`. */ constructor( private _endpoint: string, private res: ServerResponse, ) { this._sessionId = randomUUID(); } /** * Handles the initial SSE connection request. * * This should be called when a GET request is made to establish the SSE stream. */ async start(): Promise<void> { if (this._sseResponse) { throw new Error( "SSEServerTransport already started! If using Server class, note that connect() calls start() automatically.", ); } this.res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", }); // Send the endpoint event this.res.write( `event: endpoint\ndata: ${encodeURI(this._endpoint)}?sessionId=${this._sessionId}\n\n`, ); this._sseResponse = this.res; this.res.on("close", () => { this._sseResponse = undefined; this.onclose?.(); }); } /** * Handles incoming POST messages. * * This should be called when a POST request is made to send a message to the server. */ async handlePostMessage( req: IncomingMessage, res: ServerResponse, parsedBody?: unknown, ): Promise<void> { if (!this._sseResponse) { const message = "SSE connection not established"; res.writeHead(500).end(message); throw new Error(message); } let body: string | unknown; try { const ct = contentType.parse(req.headers["content-type"] ?? ""); if (ct.type !== "application/json") { throw new Error(`Unsupported content-type: ${ct}`); } body = parsedBody ?? await getRawBody(req, { limit: MAXIMUM_MESSAGE_SIZE, encoding: ct.parameters.charset ?? "utf-8", }); } catch (error) { res.writeHead(400).end(String(error)); this.onerror?.(error as Error); return; } try { await this.handleMessage(typeof body === 'string' ? JSON.parse(body) : body); } catch { res.writeHead(400).end(`Invalid message: ${body}`); return; } res.writeHead(202).end("Accepted"); } /** * Handle a client message, regardless of how it arrived. This can be used to inform the server of messages that arrive via a means different than HTTP POST. */ async handleMessage(message: unknown): Promise<void> { let parsedMessage: JSONRPCMessage; try { parsedMessage = JSONRPCMessageSchema.parse(message); } catch (error) { this.onerror?.(error as Error); throw error; } this.onmessage?.(parsedMessage); } async close(): Promise<void> { this._sseResponse?.end(); this._sseResponse = undefined; this.onclose?.(); } async send(message: JSONRPCMessage): Promise<void> { if (!this._sseResponse) { throw new Error("Not connected"); } this._sseResponse.write( `event: message\ndata: ${JSON.stringify(message)}\n\n`, ); } /** * Returns the session ID for this transport. * * This can be used to route incoming POST requests. */ get sessionId(): string { return this._sessionId; } }