Skip to main content
Glama

Vercel MCP Adapter

mcp-api-handler.ts21 kB
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import { type IncomingHttpHeaders, IncomingMessage, type ServerResponse, } from "node:http"; import { createClient } from "redis"; import { Socket } from "node:net"; import { Readable } from "node:stream"; import type { ServerOptions } from "@modelcontextprotocol/sdk/server/index.js"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import type { BodyType } from "./server-response-adapter"; import assert from "node:assert"; import type { McpEvent, McpErrorEvent, McpSessionEvent, McpRequestEvent, } from "../lib/log-helper"; import { createEvent } from "../lib/log-helper"; import { EventEmittingResponse } from "../lib/event-emitter.js"; import { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types"; import { getAuthContext } from "./auth-context"; interface SerializedRequest { requestId: string; url: string; method: string; body: BodyType; headers: IncomingHttpHeaders; } type LogLevel = "log" | "error" | "warn" | "info" | "debug"; type Logger = { log: (...args: unknown[]) => void; error: (...args: unknown[]) => void; warn: (...args: unknown[]) => void; info: (...args: unknown[]) => void; debug: (...args: unknown[]) => void; }; function createLogger(verboseLogs = false): Logger { return { log: (...args: unknown[]) => { if (verboseLogs) console.log(...args); }, error: (...args: unknown[]) => { if (verboseLogs) console.error(...args); }, warn: (...args: unknown[]) => { if (verboseLogs) console.warn(...args); }, info: (...args: unknown[]) => { if (verboseLogs) console.info(...args); }, debug: (...args: unknown[]) => { if (verboseLogs) console.debug(...args); }, }; } /** * Configuration for the MCP handler. * @property redisUrl - The URL of the Redis instance to use for the MCP handler. * @property streamableHttpEndpoint - The endpoint to use for the streamable HTTP transport. * @property sseEndpoint - The endpoint to use for the SSE transport. * @property verboseLogs - If true, enables console logging. */ export type Config = { /** * The URL of the Redis instance to use for the MCP handler. * @default process.env.REDIS_URL || process.env.KV_URL */ redisUrl?: string; /** * The endpoint to use for the streamable HTTP transport. * @deprecated Use `set basePath` instead. * @default "/mcp" */ streamableHttpEndpoint?: string; /** * The endpoint to use for the SSE transport. * @deprecated Use `set basePath` instead. * @default "/sse" */ sseEndpoint?: string; /** * The endpoint to use for the SSE messages transport. * @deprecated Use `set basePath` instead. * @default "/message" */ sseMessageEndpoint?: string; /** * The maximum duration of an MCP request in seconds. * @default 60 */ maxDuration?: number; /** * If true, enables console logging. * @default false */ verboseLogs?: boolean; /** * The base path to use for deriving endpoints. * If provided, endpoints will be derived from this path. * For example, if basePath is "/", that means your routing is: * /app/[transport]/route.ts and then: * - streamableHttpEndpoint will be "/mcp" * - sseEndpoint will be "/sse" * - sseMessageEndpoint will be "/message" * @default "" */ basePath?: string; /** * Callback function that receives MCP events. * This can be used to track analytics, debug issues, or implement custom behaviors. */ onEvent?: (event: McpEvent) => void; /** * If true, disables the SSE endpoint. * As of 2025-03-26, SSE is not supported by the MCP spec. * https://modelcontextprotocol.io/specification/2025-03-26/basic/transports * @default false */ disableSse?: boolean; }; /** * Derives MCP endpoints from a base path. * @param basePath - The base path to derive endpoints from * @returns An object containing the derived endpoints */ function deriveEndpointsFromBasePath(basePath: string): { streamableHttpEndpoint: string; sseEndpoint: string; sseMessageEndpoint: string; } { // Remove trailing slash if present const normalizedBasePath = basePath.replace(/\/$/, ""); return { streamableHttpEndpoint: `${normalizedBasePath}/mcp`, sseEndpoint: `${normalizedBasePath}/sse`, sseMessageEndpoint: `${normalizedBasePath}/message`, }; } /** * Calculates the endpoints for the MCP handler. * @param config - The configuration for the MCP handler. * @returns An object containing the endpoints for the MCP handler. */ export function calculateEndpoints({ basePath, streamableHttpEndpoint = "/mcp", sseEndpoint = "/sse", sseMessageEndpoint = "/message", }: Config) { const { streamableHttpEndpoint: fullStreamableHttpEndpoint, sseEndpoint: fullSseEndpoint, sseMessageEndpoint: fullSseMessageEndpoint, } = basePath != null ? deriveEndpointsFromBasePath(basePath) : { streamableHttpEndpoint, sseEndpoint, sseMessageEndpoint, }; return { streamableHttpEndpoint: fullStreamableHttpEndpoint, sseEndpoint: fullSseEndpoint, sseMessageEndpoint: fullSseMessageEndpoint, }; } let redisPublisher: ReturnType<typeof createClient>; let redis: ReturnType<typeof createClient>; async function initializeRedis({ redisUrl, logger, }: { redisUrl?: string; logger: Logger; }) { if (redis && redisPublisher) { return { redis, redisPublisher }; } if (!redisUrl) { throw new Error("redisUrl is required"); } redis = createClient({ url: redisUrl, }); redisPublisher = createClient({ url: redisUrl, }); redis.on("error", (err) => { logger.error("Redis error", err); }); redisPublisher.on("error", (err) => { logger.error("Redis error", err); }); await Promise.all([redis.connect(), redisPublisher.connect()]); return { redis, redisPublisher }; } export function initializeMcpApiHandler( initializeServer: | ((server: McpServer) => Promise<void>) | ((server: McpServer) => void), serverOptions: ServerOptions = {}, config: Config = { redisUrl: process.env.REDIS_URL || process.env.KV_URL, streamableHttpEndpoint: "/mcp", sseEndpoint: "/sse", sseMessageEndpoint: "/message", basePath: "", maxDuration: 60, verboseLogs: false, disableSse: false, } ) { const { redisUrl, basePath, streamableHttpEndpoint: explicitStreamableHttpEndpoint, sseEndpoint: explicitSseEndpoint, sseMessageEndpoint: explicitSseMessageEndpoint, maxDuration, verboseLogs, disableSse, } = config; // If basePath is provided, derive endpoints from it const { streamableHttpEndpoint, sseEndpoint, sseMessageEndpoint } = calculateEndpoints({ basePath, streamableHttpEndpoint: explicitStreamableHttpEndpoint, sseEndpoint: explicitSseEndpoint, sseMessageEndpoint: explicitSseMessageEndpoint, }); const logger = createLogger(verboseLogs); let servers: McpServer[] = []; let statelessServer: McpServer; const statelessTransport = new StreamableHTTPServerTransport({ sessionIdGenerator: undefined, }); return async function mcpApiHandler(req: Request, res: ServerResponse) { const url = new URL(req.url || "", "https://example.com"); if (url.pathname === streamableHttpEndpoint) { if (req.method === "GET") { logger.log("Received GET MCP request"); res.writeHead(405).end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Method not allowed.", }, id: null, }) ); return; } if (req.method === "DELETE") { logger.log("Received DELETE MCP request"); res.writeHead(405).end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Method not allowed.", }, id: null, }) ); return; } if (req.method === "POST") { const eventRes = new EventEmittingResponse( createFakeIncomingMessage(), config.onEvent ); if (!statelessServer) { statelessServer = new McpServer( { name: "mcp-typescript server on vercel", version: "0.1.0", }, serverOptions ); await initializeServer(statelessServer); await statelessServer.connect(statelessTransport); } // Parse the request body let bodyContent: BodyType; const contentType = req.headers.get("content-type") || ""; if (contentType.includes("application/json")) { bodyContent = await req.json(); } else { bodyContent = await req.text(); } const incomingRequest = createFakeIncomingMessage({ method: req.method, url: req.url, headers: Object.fromEntries(req.headers), body: bodyContent, auth: req.auth, // Use the auth info that should already be set by withMcpAuth }); // Create a response that will emit events const wrappedRes = new EventEmittingResponse( incomingRequest, config.onEvent ); Object.assign(wrappedRes, res); try { await statelessTransport.handleRequest(incomingRequest, wrappedRes); if ( typeof bodyContent === "object" && bodyContent && "method" in bodyContent ) { eventRes.requestCompleted( bodyContent.method as string, bodyContent ); } } catch (error) { if ( typeof bodyContent === "object" && bodyContent && "method" in bodyContent ) { eventRes.requestCompleted( bodyContent.method as string, undefined, error instanceof Error ? error : String(error) ); } throw error; } } } else if (url.pathname === sseEndpoint) { if (disableSse) { res.statusCode = 404; res.end("Not found"); return; } // Check HTTP method - only allow GET for SSE connections if (req.method !== "GET") { logger.log(`Rejected SSE connection with method ${req.method}`); res .writeHead(405, { "Content-Type": "text/plain" }) .end("Method Not Allowed"); return; } // Check that Accept header supports event-stream const acceptHeader = req.headers.get("accept") || req.headers.get("Accept"); if ( acceptHeader && !acceptHeader.includes("text/event-stream") && !acceptHeader.includes("*/*") && !acceptHeader.includes("text/*") ) { logger.log( `Rejected SSE connection with incompatible Accept header: ${acceptHeader}` ); res .writeHead(406, { "Content-Type": "text/plain" }) .end("Not Acceptable"); return; } const { redis, redisPublisher } = await initializeRedis({ redisUrl, logger, }); logger.log("Got new SSE connection"); assert(sseMessageEndpoint, "sseMessageEndpoint is required"); const transport = new SSEServerTransport(sseMessageEndpoint, res); const sessionId = transport.sessionId; const eventRes = new EventEmittingResponse( createFakeIncomingMessage(), config.onEvent, sessionId ); eventRes.startSession("SSE", { userAgent: req.headers.get("user-agent") ?? undefined, ip: req.headers.get("x-forwarded-for") ?? req.headers.get("x-real-ip") ?? undefined, }); const server = new McpServer( { name: "mcp-typescript server on vercel", version: "0.1.0", }, serverOptions ); await initializeServer(server); servers.push(server); server.server.onclose = () => { logger.log("SSE connection closed"); eventRes.endSession("SSE"); servers = servers.filter((s) => s !== server); }; let logs: { type: LogLevel; messages: string[]; }[] = []; // eslint-disable-next-line no-inner-declarations function logInContext(severity: LogLevel, ...messages: string[]) { logs.push({ type: severity, messages, }); } // Handles messages originally received via /message const handleMessage = async (message: string) => { logger.log("Received message from Redis", message); logInContext("log", "Received message from Redis", message); const request = JSON.parse(message) as SerializedRequest; // Make in IncomingMessage object because that is what the SDK expects. const req = createFakeIncomingMessage({ method: request.method, url: request.url, headers: request.headers, body: request.body, }); const syntheticRes = new EventEmittingResponse( req, config.onEvent, sessionId ); let status = 100; let body = ""; syntheticRes.writeHead = (statusCode: number) => { status = statusCode; return syntheticRes; }; syntheticRes.end = (b: unknown) => { body = b as string; return syntheticRes; }; try { await transport.handlePostMessage(req, syntheticRes); // If it was a function call, complete it if ( typeof request.body === "object" && request.body && "method" in request.body ) { try { const result = JSON.parse(body); eventRes.requestCompleted(request.body.method as string, result); } catch { eventRes.requestCompleted(request.body.method as string, body); } } } catch (error) { eventRes.error( error instanceof Error ? error : String(error), "Error handling SSE message", "session" ); throw error; } await redisPublisher.publish( `responses:${sessionId}:${request.requestId}`, JSON.stringify({ status, body, }) ); if (status >= 200 && status < 300) { logInContext( "log", `Request ${sessionId}:${request.requestId} succeeded: ${body}` ); } else { logInContext( "error", `Message for ${sessionId}:${request.requestId} failed with status ${status}: ${body}` ); eventRes.error( `Request failed with status ${status}`, body, "session" ); } }; const interval = setInterval(() => { for (const log of logs) { logger[log.type](...log.messages); } logs = []; }, 100); await redis.subscribe(`requests:${sessionId}`, handleMessage); logger.log(`Subscribed to requests:${sessionId}`); let timeout: NodeJS.Timeout; let resolveTimeout: (value: unknown) => void; const waitPromise = new Promise((resolve) => { resolveTimeout = resolve; timeout = setTimeout(() => { resolve("max duration reached"); }, (maxDuration ?? 60) * 1000); }); // eslint-disable-next-line no-inner-declarations async function cleanup() { clearTimeout(timeout); clearInterval(interval); await redis.unsubscribe(`requests:${sessionId}`, handleMessage); logger.log("Done"); res.statusCode = 200; res.end(); } req.signal.addEventListener("abort", () => resolveTimeout("client hang up") ); await server.connect(transport); const closeReason = await waitPromise; logger.log(closeReason); await cleanup(); } else if (url.pathname === sseMessageEndpoint) { if (disableSse) { res.statusCode = 404; res.end("Not found"); return; } const { redis, redisPublisher } = await initializeRedis({ redisUrl, logger, }); logger.log("Received message"); const body = await req.text(); let parsedBody: BodyType; try { parsedBody = JSON.parse(body); } catch (e) { parsedBody = body; } const sessionId = url.searchParams.get("sessionId") || ""; if (!sessionId) { res.statusCode = 400; res.end("No sessionId provided"); return; } const requestId = crypto.randomUUID(); const serializedRequest: SerializedRequest = { requestId, url: req.url || "", method: req.method || "", body: parsedBody, headers: Object.fromEntries(req.headers.entries()), }; // Declare timeout and response handling state before subscription let timeout: NodeJS.Timeout; let hasResponded = false; // Safe response handler to prevent double res.end() const sendResponse = (status: number, body: string) => { if (!hasResponded) { hasResponded = true; clearTimeout(timeout); res.statusCode = status; res.end(body); } }; // Handles responses from the /sse endpoint. await redis.subscribe( `responses:${sessionId}:${requestId}`, (message) => { try { const response = JSON.parse(message) as { status: number; body: string; }; sendResponse(response.status, response.body); } catch (error) { logger.error("Failed to parse response message:", error); sendResponse(500, "Internal server error"); } } ); // Queue the request in Redis so that a subscriber can pick it up. // One queue per session. await redisPublisher.publish( `requests:${sessionId}`, JSON.stringify(serializedRequest) ); logger.log(`Published requests:${sessionId}`, serializedRequest); // Set timeout after subscription is established timeout = setTimeout(async () => { await redis.unsubscribe(`responses:${sessionId}:${requestId}`); sendResponse(408, "Request timed out"); }, 10 * 1000); res.on("close", async () => { hasResponded = true; clearTimeout(timeout); await redis.unsubscribe(`responses:${sessionId}:${requestId}`); }); } else { res.statusCode = 404; res.end("Not found"); } }; } // Define the options interface interface FakeIncomingMessageOptions { method?: string; url?: string; headers?: IncomingHttpHeaders; body?: BodyType; socket?: Socket; auth?: AuthInfo; } // Create a fake IncomingMessage function createFakeIncomingMessage( options: FakeIncomingMessageOptions = {} ): IncomingMessage & { auth?: AuthInfo } { const { method = "GET", url = "/", headers = {}, body = null, socket = new Socket(), } = options; // Create a readable stream that will be used as the base for IncomingMessage const readable = new Readable(); readable._read = (): void => {}; // Required implementation // Add the body content if provided if (body) { if (typeof body === "string") { readable.push(body); } else if (Buffer.isBuffer(body)) { readable.push(body); } else { // Ensure proper JSON-RPC format const bodyString = JSON.stringify(body); readable.push(bodyString); } readable.push(null); // Signal the end of the stream } else { readable.push(null); // Always end the stream even if no body } // Create the IncomingMessage instance const req = new IncomingMessage(socket) as IncomingMessage & { auth?: AuthInfo }; // Set the properties req.method = method; req.url = url; req.headers = headers; const auth = options.auth || getAuthContext(); if (auth) { // See https://github.com/modelcontextprotocol/typescript-sdk/blob/590d4841373fc4eb86ecc9079834353a98cb84a3/src/server/auth/middleware/bearerAuth.ts#L71 for more info. (req as { auth?: AuthInfo }).auth = auth; } // Copy over the stream methods req.push = readable.push.bind(readable); req.read = readable.read.bind(readable); // @ts-expect-error req.on = readable.on.bind(readable); req.pipe = readable.pipe.bind(readable); return req; }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brunobenavent/mcp-vercel'

If you have feedback or need assistance with the MCP directory API, please join our Discord server