Skip to main content
Glama
inMemoryDispatcher.ts6.69 kB
import { PassThrough, Readable } from "node:stream"; import type { Application, RequestHandler } from "express"; export class InMemoryDispatcher { constructor(private app: Application) {} async dispatch( method: string, path: string, payload: unknown, headers: Record<string, string> = {}, options: { streaming?: boolean; timeout?: number } = {}, ) { return new Promise<{ status: number; headers: Record<string, string>; body: unknown; stream?: NodeJS.ReadableStream; isStreaming?: boolean; }>((resolve, reject) => { const req = new Readable({ read() {} }) as unknown as { method: string; url: string; headers: Record<string, string>; push: (chunk: string | null) => void; body?: unknown; }; req.method = method; req.url = path; req.headers = { "content-type": "application/json", ...headers }; // For JSON payloads, we need to both set the body and push the data if (payload !== undefined) { const jsonStr = JSON.stringify(payload); req.body = payload; // Set body directly for middleware that expects it req.push(jsonStr); } req.push(null); // Create streaming response handler const streamBuffer = new PassThrough(); let isStreamingResponse = false; let hasEnded = false; const res: { statusCode: number; headers: Record<string, string>; setHeader: (k: string, v: string) => void; getHeader: (k: string) => string | undefined; writeHead: (code: number, headers?: Record<string, string>) => void; bodyChunks: Buffer[]; write: (chunk: unknown) => boolean; end: (chunk?: unknown) => void; } = { statusCode: 200, headers: {} as Record<string, string>, setHeader(k: string, v: string) { this.headers[k.toLowerCase()] = String(v); }, getHeader(k: string) { return this.headers[k.toLowerCase()]; }, writeHead(code: number, headers?: Record<string, string>) { this.statusCode = code; if (headers) { for (const [k, v] of Object.entries(headers)) { this.setHeader(k, String(v)); } } }, bodyChunks: [] as Buffer[], write(chunk: unknown) { if (hasEnded) return false; const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(String(chunk)); // Enhanced streaming detection for multiple protocols const isStreamingContent = options.streaming || // Server-Sent Events this.headers["content-type"] === "text/event-stream" || // Plain text streaming this.headers["content-type"] === "text/plain" || // Binary streaming this.headers["content-type"] === "application/octet-stream" || // Chunked transfer encoding this.headers["transfer-encoding"] === "chunked" || // WebSocket upgrade this.headers.upgrade === "websocket" || // NDJSON streaming this.headers["content-type"] === "application/x-ndjson" || this.headers["content-type"] === "application/jsonlines" || // Custom streaming indicators this.headers["x-streaming"] === "true" || this.headers["x-content-stream"] === "true"; if (isStreamingContent) { isStreamingResponse = true; streamBuffer.write(buffer); } else { this.bodyChunks.push(buffer); } return true; }, end(chunk?: unknown) { if (hasEnded) return; hasEnded = true; // If chunk is provided to end(), treat it as a write call first if (chunk) { const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(String(chunk)); // Enhanced streaming detection (same as write method) const isStreamingContent = options.streaming || // Server-Sent Events this.headers["content-type"] === "text/event-stream" || // Plain text streaming this.headers["content-type"] === "text/plain" || // Binary streaming this.headers["content-type"] === "application/octet-stream" || // Chunked transfer encoding this.headers["transfer-encoding"] === "chunked" || // WebSocket upgrade this.headers.upgrade === "websocket" || // NDJSON streaming this.headers["content-type"] === "application/x-ndjson" || this.headers["content-type"] === "application/jsonlines" || // Custom streaming indicators this.headers["x-streaming"] === "true" || this.headers["x-content-stream"] === "true"; if (isStreamingContent) { isStreamingResponse = true; streamBuffer.write(buffer); } else { this.bodyChunks.push(buffer); } } if (isStreamingResponse) { streamBuffer.end(); resolve({ status: this.statusCode, headers: this.headers, body: null, stream: streamBuffer, isStreaming: true, }); } else { const bodyStr = Buffer.concat(this.bodyChunks).toString("utf8"); let body: unknown = bodyStr; try { body = JSON.parse(bodyStr); } catch (e) { // If JSON parsing fails, return the raw string body = bodyStr; } resolve({ status: this.statusCode, headers: this.headers, body, isStreaming: false, }); } }, }; // Set timeout if specified const timeout = options.timeout || 30000; const timeoutId = setTimeout(() => { if (!hasEnded) { hasEnded = true; reject(new Error(`Request timeout after ${timeout}ms`)); } }, timeout); try { (this.app as unknown as RequestHandler)( req as never, res as never, (err: unknown) => { clearTimeout(timeoutId); if (err) { reject(err); } else if (!hasEnded) { // If no response was sent, resolve with empty response hasEnded = true; resolve({ status: res.statusCode, headers: res.headers, body: undefined, isStreaming: false, }); } }, ); } catch (e) { clearTimeout(timeoutId); reject(e); } }); } // Helper method for streaming responses async dispatchStream( method: string, path: string, payload: unknown, headers: Record<string, string> = {}, timeout?: number, ): Promise<NodeJS.ReadableStream> { const result = await this.dispatch(method, path, payload, headers, { streaming: true, timeout, }); if (result.stream) { return result.stream; } // If not streaming, convert body to stream const stream = new Readable({ read() { this.push( typeof result.body === "string" ? result.body : JSON.stringify(result.body), ); this.push(null); }, }); return stream; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bowen31337/expressjs_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server