/**
* Web Standards Streamable HTTP Server Transport
*
* This is an experimental transport that implements the MCP Streamable HTTP specification
* using Web Standard APIs (Request, Response, TransformStream) instead of Node.js HTTP.
*
* Based on: https://github.com/modelcontextprotocol/typescript-sdk/pull/1209
*
* @experimental
*/
import type { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js";
import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import {
type JSONRPCMessage,
type MessageExtraInfo,
type RequestId,
type RequestInfo,
DEFAULT_NEGOTIATED_PROTOCOL_VERSION,
isInitializeRequest,
isJSONRPCError,
isJSONRPCRequest,
isJSONRPCResponse,
JSONRPCMessageSchema,
SUPPORTED_PROTOCOL_VERSIONS,
} from "@modelcontextprotocol/sdk/types.js";
export type StreamId = string;
export type EventId = string;
/**
* Interface for resumability support via event storage
*/
export interface EventStore {
/**
* Stores an event for later retrieval
* @param streamId ID of the stream the event belongs to
* @param message The JSON-RPC message to store
* @returns The generated event ID for the stored event
*/
storeEvent(streamId: StreamId, message: JSONRPCMessage): Promise<EventId>;
/**
* Get the stream ID associated with a given event ID.
* @param eventId The event ID to look up
* @returns The stream ID, or undefined if not found
*/
getStreamIdForEventId?(eventId: EventId): Promise<StreamId | undefined>;
/**
* Replay events after a given event ID
*/
replayEventsAfter(
lastEventId: EventId,
options: {
send: (eventId: EventId, message: JSONRPCMessage) => Promise<void>;
},
): Promise<StreamId>;
}
/**
* Session state that can be persisted externally for serverless deployments.
*/
export interface SessionState {
/** Whether the session has completed initialization */
initialized: boolean;
/** The negotiated protocol version */
protocolVersion: string;
/** Timestamp when the session was created */
createdAt: number;
}
/**
* Interface for session storage in distributed/serverless deployments.
*/
export interface SessionStore {
/**
* Retrieve session state by ID.
*/
get(sessionId: string): Promise<SessionState | undefined>;
/**
* Save session state.
*/
save(sessionId: string, state: SessionState): Promise<void>;
/**
* Delete session state.
*/
delete(sessionId: string): Promise<void>;
}
/**
* Internal stream mapping for managing SSE connections
*/
interface StreamMapping {
/** Stream controller for pushing SSE data */
controller?: ReadableStreamDefaultController<Uint8Array>;
/** Text encoder for SSE formatting */
encoder?: TextEncoder;
/** Promise resolver for JSON response mode */
resolveJson?: (response: Response) => void;
/** Cleanup function to close stream and remove mapping */
cleanup: () => void;
}
/**
* Configuration options for FetchStreamableHTTPServerTransport
*/
export interface FetchStreamableHTTPServerTransportOptions {
/**
* Function that generates a session ID for the transport.
* The session ID SHOULD be globally unique and cryptographically secure.
* Return undefined to disable session management.
*/
sessionIdGenerator: (() => string) | undefined;
/**
* A callback for session initialization events.
*/
onsessioninitialized?: (sessionId: string) => void | Promise<void>;
/**
* A callback for session close events.
*/
onsessionclosed?: (sessionId: string) => void | Promise<void>;
/**
* If true, the server will return JSON responses instead of starting an SSE stream.
* Default is false (SSE streams are preferred).
*/
enableJsonResponse?: boolean;
/**
* Event store for resumability support.
*/
eventStore?: EventStore;
/**
* List of allowed host header values for DNS rebinding protection.
*/
allowedHosts?: string[];
/**
* List of allowed origin header values for DNS rebinding protection.
*/
allowedOrigins?: string[];
/**
* Enable DNS rebinding protection.
* Default is false for backwards compatibility.
*/
enableDnsRebindingProtection?: boolean;
/**
* Retry interval in milliseconds to suggest to clients in SSE retry field.
*/
retryInterval?: number;
/**
* Session store for distributed/serverless deployments.
*/
sessionStore?: SessionStore;
}
/**
* Server transport for Web Standards Streamable HTTP
*
* Implements the MCP Streamable HTTP transport specification using Web Standard APIs
* (Request, Response, TransformStream).
*
* @experimental
*/
export class FetchStreamableHTTPServerTransport implements Transport {
private sessionIdGenerator: (() => string) | undefined;
private _started: boolean = false;
private _streamMapping: Map<string, StreamMapping> = new Map();
private _requestToStreamMapping: Map<RequestId, string> = new Map();
private _requestResponseMap: Map<RequestId, JSONRPCMessage> = new Map();
private _initialized: boolean = false;
private _enableJsonResponse: boolean = false;
private _standaloneSseStreamId: string = "_GET_stream";
private _eventStore?: EventStore;
private _onsessioninitialized?: (sessionId: string) => void | Promise<void>;
private _onsessionclosed?: (sessionId: string) => void | Promise<void>;
private _allowedHosts?: string[];
private _allowedOrigins?: string[];
private _enableDnsRebindingProtection: boolean;
private _retryInterval?: number;
private _sessionStore?: SessionStore;
sessionId?: string;
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void;
constructor(options: FetchStreamableHTTPServerTransportOptions) {
this.sessionIdGenerator = options.sessionIdGenerator;
this._enableJsonResponse = options.enableJsonResponse ?? false;
this._eventStore = options.eventStore;
this._onsessioninitialized = options.onsessioninitialized;
this._onsessionclosed = options.onsessionclosed;
this._allowedHosts = options.allowedHosts;
this._allowedOrigins = options.allowedOrigins;
this._enableDnsRebindingProtection =
options.enableDnsRebindingProtection ?? false;
this._retryInterval = options.retryInterval;
this._sessionStore = options.sessionStore;
}
/**
* Starts the transport. Required by the Transport interface but is a no-op
* for the Streamable HTTP transport as connections are managed per-request.
*/
async start(): Promise<void> {
if (this._started) {
throw new Error("Transport already started");
}
this._started = true;
}
/**
* Helper to create a JSON error response
*/
private createJsonErrorResponse(
status: number,
code: number,
message: string,
headers?: Record<string, string>,
): Response {
return new Response(
JSON.stringify({
jsonrpc: "2.0",
error: { code, message },
id: null,
}),
{
status,
headers: {
"Content-Type": "application/json",
...headers,
},
},
);
}
/**
* Validates request headers for DNS rebinding protection.
*/
private validateRequestHeaders(req: Request): Response | undefined {
if (!this._enableDnsRebindingProtection) {
return undefined;
}
if (this._allowedHosts && this._allowedHosts.length > 0) {
const hostHeader = req.headers.get("host");
if (!hostHeader || !this._allowedHosts.includes(hostHeader)) {
const error = `Invalid Host header: ${hostHeader}`;
this.onerror?.(new Error(error));
return this.createJsonErrorResponse(403, -32000, error);
}
}
if (this._allowedOrigins && this._allowedOrigins.length > 0) {
const originHeader = req.headers.get("origin");
if (!originHeader || !this._allowedOrigins.includes(originHeader)) {
const error = `Invalid Origin header: ${originHeader}`;
this.onerror?.(new Error(error));
return this.createJsonErrorResponse(403, -32000, error);
}
}
return undefined;
}
/**
* Handles an incoming HTTP request (GET, POST, or DELETE)
*/
async handleRequest(req: Request & { auth?: AuthInfo }): Promise<Response> {
const validationError = this.validateRequestHeaders(req);
if (validationError) {
return validationError;
}
switch (req.method) {
case "POST":
return this.handlePostRequest(req);
case "GET":
return this.handleGetRequest(req);
case "DELETE":
return this.handleDeleteRequest(req);
default:
return this.handleUnsupportedRequest();
}
}
/**
* Writes a priming event to establish resumption capability.
*/
private async writePrimingEvent(
controller: ReadableStreamDefaultController<Uint8Array>,
encoder: TextEncoder,
streamId: string,
): Promise<void> {
if (!this._eventStore) {
return;
}
const primingEventId = await this._eventStore.storeEvent(
streamId,
{} as JSONRPCMessage,
);
let primingEvent = `id: ${primingEventId}\ndata: \n\n`;
if (this._retryInterval !== undefined) {
primingEvent = `id: ${primingEventId}\nretry: ${this._retryInterval}\ndata: \n\n`;
}
controller.enqueue(encoder.encode(primingEvent));
}
/**
* Handles GET requests for SSE stream
*/
private async handleGetRequest(req: Request): Promise<Response> {
const acceptHeader = req.headers.get("accept");
if (!acceptHeader?.includes("text/event-stream")) {
return this.createJsonErrorResponse(
406,
-32000,
"Not Acceptable: Client must accept text/event-stream",
);
}
const sessionError = await this.validateSession(req);
if (sessionError) {
return sessionError;
}
const protocolError = this.validateProtocolVersion(req);
if (protocolError) {
return protocolError;
}
// Handle resumability: check for Last-Event-ID header
if (this._eventStore) {
const lastEventId = req.headers.get("last-event-id");
if (lastEventId) {
return this.replayEvents(lastEventId);
}
}
// Check if there's already an active standalone SSE stream for this session
if (
this._streamMapping.get(this._standaloneSseStreamId) !== undefined
) {
return this.createJsonErrorResponse(
409,
-32000,
"Conflict: Only one SSE stream is allowed per session",
);
}
const encoder = new TextEncoder();
let streamController: ReadableStreamDefaultController<Uint8Array>;
const readable = new ReadableStream<Uint8Array>({
start: (controller) => {
streamController = controller;
},
cancel: () => {
this._streamMapping.delete(this._standaloneSseStreamId);
},
});
const headers: Record<string, string> = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
};
if (this.sessionId !== undefined) {
headers["mcp-session-id"] = this.sessionId;
}
this._streamMapping.set(this._standaloneSseStreamId, {
controller: streamController!,
encoder,
cleanup: () => {
this._streamMapping.delete(this._standaloneSseStreamId);
try {
streamController!.close();
} catch {
// Controller might already be closed
}
},
});
return new Response(readable, { headers });
}
/**
* Replays events that would have been sent after the specified event ID
*/
private async replayEvents(lastEventId: string): Promise<Response> {
if (!this._eventStore) {
return this.createJsonErrorResponse(
400,
-32000,
"Event store not configured",
);
}
try {
let streamId: string | undefined;
if (this._eventStore.getStreamIdForEventId) {
streamId =
await this._eventStore.getStreamIdForEventId(lastEventId);
if (!streamId) {
return this.createJsonErrorResponse(
400,
-32000,
"Invalid event ID format",
);
}
if (this._streamMapping.get(streamId) !== undefined) {
return this.createJsonErrorResponse(
409,
-32000,
"Conflict: Stream already has an active connection",
);
}
}
const headers: Record<string, string> = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
};
if (this.sessionId !== undefined) {
headers["mcp-session-id"] = this.sessionId;
}
const encoder = new TextEncoder();
let streamController: ReadableStreamDefaultController<Uint8Array>;
const readable = new ReadableStream<Uint8Array>({
start: (controller) => {
streamController = controller;
},
cancel: () => {
// Cleanup will be handled by the mapping
},
});
const replayedStreamId = await this._eventStore.replayEventsAfter(
lastEventId,
{
send: async (eventId: string, message: JSONRPCMessage) => {
const success = this.writeSSEEvent(
streamController!,
encoder,
message,
eventId,
);
if (!success) {
this.onerror?.(new Error("Failed replay events"));
}
},
},
);
this._streamMapping.set(replayedStreamId, {
controller: streamController!,
encoder,
cleanup: () => {
this._streamMapping.delete(replayedStreamId);
try {
streamController!.close();
} catch {
// Controller might already be closed
}
},
});
return new Response(readable, { headers });
} catch (error) {
this.onerror?.(error as Error);
return this.createJsonErrorResponse(
500,
-32000,
"Error replaying events",
);
}
}
/**
* Writes an event to an SSE stream via controller with proper formatting
*/
private writeSSEEvent(
controller: ReadableStreamDefaultController<Uint8Array>,
encoder: TextEncoder,
message: JSONRPCMessage,
eventId?: string,
): boolean {
try {
let eventData = `event: message\n`;
if (eventId) {
eventData += `id: ${eventId}\n`;
}
eventData += `data: ${JSON.stringify(message)}\n\n`;
controller.enqueue(encoder.encode(eventData));
return true;
} catch {
return false;
}
}
/**
* Handles unsupported requests (PUT, PATCH, etc.)
*/
private handleUnsupportedRequest(): Response {
return new Response(
JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Method not allowed.",
},
id: null,
}),
{
status: 405,
headers: {
Allow: "GET, POST, DELETE",
"Content-Type": "application/json",
},
},
);
}
/**
* Handles POST requests containing JSON-RPC messages
*/
private async handlePostRequest(
req: Request & { auth?: AuthInfo },
): Promise<Response> {
try {
const acceptHeader = req.headers.get("accept");
if (
!acceptHeader?.includes("application/json") ||
!acceptHeader.includes("text/event-stream")
) {
return this.createJsonErrorResponse(
406,
-32000,
"Not Acceptable: Client must accept both application/json and text/event-stream",
);
}
const ct = req.headers.get("content-type");
if (!ct || !ct.includes("application/json")) {
return this.createJsonErrorResponse(
415,
-32000,
"Unsupported Media Type: Content-Type must be application/json",
);
}
const authInfo: AuthInfo | undefined = req.auth;
const requestInfo: RequestInfo = {
headers: Object.fromEntries(req.headers.entries()),
};
let rawMessage: unknown;
try {
rawMessage = await req.json();
} catch {
return this.createJsonErrorResponse(
400,
-32700,
"Parse error: Invalid JSON",
);
}
let messages: JSONRPCMessage[];
try {
if (Array.isArray(rawMessage)) {
messages = rawMessage.map((msg) =>
JSONRPCMessageSchema.parse(msg),
);
} else {
messages = [JSONRPCMessageSchema.parse(rawMessage)];
}
} catch {
return this.createJsonErrorResponse(
400,
-32700,
"Parse error: Invalid JSON-RPC message",
);
}
const isInitializationRequest = messages.some(isInitializeRequest);
if (isInitializationRequest) {
if (this._initialized && this.sessionId !== undefined) {
return this.createJsonErrorResponse(
400,
-32600,
"Invalid Request: Server already initialized",
);
}
if (messages.length > 1) {
return this.createJsonErrorResponse(
400,
-32600,
"Invalid Request: Only one initialization request is allowed",
);
}
this.sessionId = this.sessionIdGenerator?.();
this._initialized = true;
if (this.sessionId && this._sessionStore) {
const protocolVersion =
req.headers.get("mcp-protocol-version") ??
DEFAULT_NEGOTIATED_PROTOCOL_VERSION;
await this._sessionStore.save(this.sessionId, {
initialized: true,
protocolVersion,
createdAt: Date.now(),
});
}
if (this.sessionId && this._onsessioninitialized) {
await Promise.resolve(
this._onsessioninitialized(this.sessionId),
);
}
}
if (!isInitializationRequest) {
const sessionError = await this.validateSession(req);
if (sessionError) {
return sessionError;
}
const protocolError = this.validateProtocolVersion(req);
if (protocolError) {
return protocolError;
}
}
const hasRequests = messages.some(isJSONRPCRequest);
if (!hasRequests) {
for (const message of messages) {
this.onmessage?.(message, { authInfo, requestInfo });
}
return new Response(null, { status: 202 });
}
const streamId = crypto.randomUUID();
if (this._enableJsonResponse) {
return new Promise<Response>((resolve) => {
this._streamMapping.set(streamId, {
resolveJson: resolve,
cleanup: () => {
this._streamMapping.delete(streamId);
},
});
for (const message of messages) {
if (isJSONRPCRequest(message)) {
this._requestToStreamMapping.set(
message.id,
streamId,
);
}
}
for (const message of messages) {
this.onmessage?.(message, { authInfo, requestInfo });
}
});
}
// SSE streaming mode
const encoder = new TextEncoder();
let streamController: ReadableStreamDefaultController<Uint8Array>;
const readable = new ReadableStream<Uint8Array>({
start: (controller) => {
streamController = controller;
},
cancel: () => {
this._streamMapping.delete(streamId);
},
});
const headers: Record<string, string> = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
};
if (this.sessionId !== undefined) {
headers["mcp-session-id"] = this.sessionId;
}
for (const message of messages) {
if (isJSONRPCRequest(message)) {
this._streamMapping.set(streamId, {
controller: streamController!,
encoder,
cleanup: () => {
this._streamMapping.delete(streamId);
try {
streamController!.close();
} catch {
// Controller might already be closed
}
},
});
this._requestToStreamMapping.set(message.id, streamId);
}
}
await this.writePrimingEvent(streamController!, encoder, streamId);
for (const message of messages) {
let closeSSEStream: (() => void) | undefined;
let closeStandaloneSSEStream: (() => void) | undefined;
if (isJSONRPCRequest(message) && this._eventStore) {
closeSSEStream = () => {
this.closeSSEStream(message.id);
};
closeStandaloneSSEStream = () => {
this.closeStandaloneSSEStream();
};
}
this.onmessage?.(message, {
authInfo,
requestInfo,
closeSSEStream,
closeStandaloneSSEStream,
});
}
return new Response(readable, { status: 200, headers });
} catch (error) {
this.onerror?.(error as Error);
return this.createJsonErrorResponse(400, -32700, "Parse error");
}
}
/**
* Handles DELETE requests to terminate sessions
*/
private async handleDeleteRequest(req: Request): Promise<Response> {
const sessionError = await this.validateSession(req);
if (sessionError) {
return sessionError;
}
const protocolError = this.validateProtocolVersion(req);
if (protocolError) {
return protocolError;
}
if (this.sessionId && this._sessionStore) {
await this._sessionStore.delete(this.sessionId);
}
await Promise.resolve(this._onsessionclosed?.(this.sessionId!));
await this.close();
return new Response(null, { status: 200 });
}
/**
* Validates session ID for non-initialization requests.
*/
private async validateSession(req: Request): Promise<Response | undefined> {
if (this.sessionIdGenerator === undefined) {
return undefined;
}
const sessionId = req.headers.get("mcp-session-id");
if (!sessionId) {
return this.createJsonErrorResponse(
400,
-32000,
"Bad Request: Mcp-Session-Id header is required",
);
}
if (this._sessionStore) {
const sessionState = await this._sessionStore.get(sessionId);
if (sessionState && sessionState.initialized) {
this.sessionId = sessionId;
this._initialized = true;
return undefined;
}
return this.createJsonErrorResponse(
404,
-32001,
"Session not found",
);
}
if (!this._initialized) {
return this.createJsonErrorResponse(
400,
-32000,
"Bad Request: Server not initialized",
);
}
if (sessionId !== this.sessionId) {
return this.createJsonErrorResponse(
404,
-32001,
"Session not found",
);
}
return undefined;
}
private validateProtocolVersion(req: Request): Response | undefined {
const protocolVersion =
req.headers.get("mcp-protocol-version") ??
DEFAULT_NEGOTIATED_PROTOCOL_VERSION;
if (!SUPPORTED_PROTOCOL_VERSIONS.includes(protocolVersion)) {
return this.createJsonErrorResponse(
400,
-32000,
`Bad Request: Unsupported protocol version (supported versions: ${SUPPORTED_PROTOCOL_VERSIONS.join(", ")})`,
);
}
return undefined;
}
async close(): Promise<void> {
this._streamMapping.forEach(({ cleanup }) => {
cleanup();
});
this._streamMapping.clear();
this._requestResponseMap.clear();
this.onclose?.();
}
/**
* Close an SSE stream for a specific request, triggering client reconnection.
*/
closeSSEStream(requestId: RequestId): void {
const streamId = this._requestToStreamMapping.get(requestId);
if (!streamId) return;
const stream = this._streamMapping.get(streamId);
if (stream) {
stream.cleanup();
}
}
/**
* Close the standalone GET SSE stream, triggering client reconnection.
*/
closeStandaloneSSEStream(): void {
const stream = this._streamMapping.get(this._standaloneSseStreamId);
if (stream) {
stream.cleanup();
}
}
async send(
message: JSONRPCMessage,
options?: { relatedRequestId?: RequestId },
): Promise<void> {
let requestId = options?.relatedRequestId;
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
requestId = message.id;
}
// Check if this message should be sent on the standalone SSE stream
if (requestId === undefined) {
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
throw new Error(
"Cannot send a response on a standalone SSE stream unless resuming a previous client request",
);
}
let eventId: string | undefined;
if (this._eventStore) {
eventId = await this._eventStore.storeEvent(
this._standaloneSseStreamId,
message,
);
}
const standaloneSse = this._streamMapping.get(
this._standaloneSseStreamId,
);
if (standaloneSse === undefined) {
return;
}
if (standaloneSse.controller && standaloneSse.encoder) {
this.writeSSEEvent(
standaloneSse.controller,
standaloneSse.encoder,
message,
eventId,
);
}
return;
}
const streamId = this._requestToStreamMapping.get(requestId);
if (!streamId) {
throw new Error(
`No connection established for request ID: ${String(requestId)}`,
);
}
const stream = this._streamMapping.get(streamId);
if (
!this._enableJsonResponse &&
stream?.controller &&
stream?.encoder
) {
let eventId: string | undefined;
if (this._eventStore) {
eventId = await this._eventStore.storeEvent(streamId, message);
}
this.writeSSEEvent(
stream.controller,
stream.encoder,
message,
eventId,
);
}
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
this._requestResponseMap.set(requestId, message);
const relatedIds = Array.from(
this._requestToStreamMapping.entries(),
)
.filter(([_, sid]) => sid === streamId)
.map(([id]) => id);
const allResponsesReady = relatedIds.every((id) =>
this._requestResponseMap.has(id),
);
if (allResponsesReady) {
if (!stream) {
throw new Error(
`No connection established for request ID: ${String(requestId)}`,
);
}
if (this._enableJsonResponse && stream.resolveJson) {
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (this.sessionId !== undefined) {
headers["mcp-session-id"] = this.sessionId;
}
const responses = relatedIds.map(
(id) => this._requestResponseMap.get(id)!,
);
if (responses.length === 1) {
stream.resolveJson(
new Response(JSON.stringify(responses[0]), {
status: 200,
headers,
}),
);
} else {
stream.resolveJson(
new Response(JSON.stringify(responses), {
status: 200,
headers,
}),
);
}
} else {
stream.cleanup();
}
for (const id of relatedIds) {
this._requestResponseMap.delete(id);
this._requestToStreamMapping.delete(id);
}
}
}
}
}