stream-transport.ts•2.15 kB
import type { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
import type { DuplexStream } from './types.js';
/**
 * An MCP transport built on top of a duplex stream.
 * It uses a `ReadableStream` to receive messages and a `WritableStream` to send messages.
 *
 * Useful if you wish to pipe messages over your own stream-based transport or directly between two streams.
 */
export class StreamTransport
  implements Transport, DuplexStream<JSONRPCMessage>
{
  #readableStreamController?: ReadableStreamDefaultController<JSONRPCMessage>;
  #writeableStreamController?: WritableStreamDefaultController;
  ready: Promise<void>;
  readable: ReadableStream<JSONRPCMessage>;
  writable: WritableStream<JSONRPCMessage>;
  onclose?: () => void;
  onerror?: (error: Error) => void;
  onmessage?: (message: JSONRPCMessage) => void;
  constructor() {
    let resolveReadReady: () => void;
    let resolveWriteReady: () => void;
    const readReady = new Promise<void>((resolve) => {
      resolveReadReady = resolve;
    });
    const writeReady = new Promise<void>((resolve) => {
      resolveWriteReady = resolve;
    });
    this.ready = Promise.all([readReady, writeReady]).then(() => {});
    this.readable = new ReadableStream({
      start: (controller) => {
        this.#readableStreamController = controller;
        resolveReadReady();
      },
    });
    this.writable = new WritableStream({
      start: (controller) => {
        this.#writeableStreamController = controller;
        resolveWriteReady();
      },
      write: (message) => {
        this.onmessage?.(message);
      },
    });
  }
  async start() {
    await this.ready;
  }
  async send(message: JSONRPCMessage) {
    if (!this.#readableStreamController) {
      throw new Error('readable stream not initialized');
    }
    this.#readableStreamController.enqueue(message);
  }
  async close() {
    this.#readableStreamController?.error(new Error('connection closed'));
    this.#writeableStreamController?.error(new Error('connection closed'));
    this.onclose?.();
  }
}