Payload CMS MCP Server
- lib
import getRawBody from "raw-body";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { IncomingHttpHeaders, IncomingMessage, ServerResponse } from "http";
import { createClient } from "redis";
import { Socket } from "net";
import { Readable } from "stream";
import { ServerOptions } from "@modelcontextprotocol/sdk/server/index.js";
import vercelJson from "../vercel.json";
interface SerializedRequest {
requestId: string;
url: string;
method: string;
body: string;
headers: IncomingHttpHeaders;
}
export function initializeMcpApiHandler(
initializeServer: (server: McpServer) => void,
serverOptions: ServerOptions = {}
) {
const maxDuration =
vercelJson?.functions?.["api/server.ts"]?.maxDuration || 800;
// Clean the Redis URL if it contains variable name or quotes
const cleanRedisUrl = (url: string | undefined): string | undefined => {
if (!url) return undefined;
// If the URL contains KV_URL= or REDIS_URL=, extract just the URL part
if (url.includes('KV_URL=') || url.includes('REDIS_URL=')) {
const match = url.match(/(?:KV_URL=|REDIS_URL=)["']?(rediss?:\/\/[^"']+)["']?/);
return match ? match[1] : url;
}
// Remove any surrounding quotes
return url.replace(/^["'](.+)["']$/, '$1');
};
const redisUrl = cleanRedisUrl(process.env.REDIS_URL) || cleanRedisUrl(process.env.KV_URL);
if (!redisUrl) {
throw new Error("REDIS_URL or KV_URL environment variable is not set");
}
console.log("Using Redis URL:", redisUrl);
// Get optional configuration from environment variables
const connectTimeout = process.env.REDIS_CONNECT_TIMEOUT ?
parseInt(process.env.REDIS_CONNECT_TIMEOUT, 10) : 30000;
const keepAlive = process.env.REDIS_KEEP_ALIVE ?
parseInt(process.env.REDIS_KEEP_ALIVE, 10) : 5000;
const pingInterval = process.env.REDIS_PING_INTERVAL ?
parseInt(process.env.REDIS_PING_INTERVAL, 10) : 1000;
const commandTimeout = process.env.REDIS_COMMAND_TIMEOUT ?
parseInt(process.env.REDIS_COMMAND_TIMEOUT, 10) : 5000;
const heartbeatInterval = process.env.REDIS_HEARTBEAT_INTERVAL ?
parseInt(process.env.REDIS_HEARTBEAT_INTERVAL, 10) : 30000;
const persistenceInterval = process.env.REDIS_PERSISTENCE_INTERVAL ?
parseInt(process.env.REDIS_PERSISTENCE_INTERVAL, 10) : 60000;
const maxReconnectAttempts = process.env.REDIS_MAX_RECONNECT_ATTEMPTS ?
parseInt(process.env.REDIS_MAX_RECONNECT_ATTEMPTS, 10) : 5;
const tlsVerify = process.env.REDIS_TLS_VERIFY ?
process.env.REDIS_TLS_VERIFY.toLowerCase() === 'true' : false;
// Global connection state
let isRedisConnected = false;
let isRedisPublisherConnected = false;
let reconnectAttempts = 0;
let heartbeatIntervalId: NodeJS.Timeout | null = null;
let persistenceIntervalId: NodeJS.Timeout | null = null;
// Create Redis clients with maximum persistence settings
const redis = createClient({
url: redisUrl,
socket: {
reconnectStrategy: (retries) => {
// More aggressive exponential backoff with a maximum delay of 5 seconds
const delay = Math.min(Math.pow(1.5, retries) * 100, 5000);
console.log(`Redis reconnecting in ${delay}ms (attempt ${retries})`);
reconnectAttempts = retries;
return delay;
},
connectTimeout: connectTimeout,
keepAlive: keepAlive,
noDelay: true, // Disable Nagle's algorithm for faster response
tls: redisUrl.startsWith('rediss://') ? { rejectUnauthorized: tlsVerify } : undefined,
},
pingInterval: pingInterval,
disableOfflineQueue: false, // Queue commands when disconnected
commandTimeout: commandTimeout,
retryStrategy: (times) => {
// Very aggressive retry strategy for commands
return Math.min(times * 100, 2000);
},
// Force auto-reconnect
autoResubscribe: true,
autoResendUnfulfilledCommands: true,
enableReadyCheck: true,
enableOfflineQueue: true,
maxRetriesPerRequest: 20, // Retry commands many times
});
const redisPublisher = createClient({
url: redisUrl,
socket: {
reconnectStrategy: (retries) => {
// More aggressive exponential backoff with a maximum delay of 5 seconds
const delay = Math.min(Math.pow(1.5, retries) * 100, 5000);
console.log(`Redis publisher reconnecting in ${delay}ms (attempt ${retries})`);
return delay;
},
connectTimeout: connectTimeout,
keepAlive: keepAlive,
noDelay: true, // Disable Nagle's algorithm for faster response
tls: redisUrl.startsWith('rediss://') ? { rejectUnauthorized: tlsVerify } : undefined,
},
pingInterval: pingInterval,
disableOfflineQueue: false, // Queue commands when disconnected
commandTimeout: commandTimeout,
retryStrategy: (times) => {
// Very aggressive retry strategy for commands
return Math.min(times * 100, 2000);
},
// Force auto-reconnect
autoResubscribe: true,
autoResendUnfulfilledCommands: true,
enableReadyCheck: true,
enableOfflineQueue: true,
maxRetriesPerRequest: 20, // Retry commands many times
});
// Enhanced event listeners
redis.on("error", (err) => {
console.error("Redis error", err);
isRedisConnected = false;
ensureRedisConnection();
});
redis.on("reconnecting", () => {
console.log("Redis reconnecting...");
isRedisConnected = false;
});
redis.on("connect", () => {
console.log("Redis connected");
isRedisConnected = true;
reconnectAttempts = 0;
});
redis.on("end", () => {
console.log("Redis disconnected");
isRedisConnected = false;
ensureRedisConnection();
});
redis.on("ready", () => {
console.log("Redis ready");
isRedisConnected = true;
});
redisPublisher.on("error", (err) => {
console.error("Redis publisher error", err);
isRedisPublisherConnected = false;
ensureRedisConnection();
});
redisPublisher.on("reconnecting", () => {
console.log("Redis publisher reconnecting...");
isRedisPublisherConnected = false;
});
redisPublisher.on("connect", () => {
console.log("Redis publisher connected");
isRedisPublisherConnected = true;
});
redisPublisher.on("end", () => {
console.log("Redis publisher disconnected");
isRedisPublisherConnected = false;
ensureRedisConnection();
});
redisPublisher.on("ready", () => {
console.log("Redis publisher ready");
isRedisPublisherConnected = true;
});
// Initial connection promise
const redisPromise = Promise.all([redis.connect(), redisPublisher.connect()]);
let servers: McpServer[] = [];
// More aggressive function to handle reconnection
const ensureRedisConnection = async () => {
try {
if (!isRedisConnected) {
console.log("Ensuring Redis connection...");
try {
await redis.disconnect();
} catch (e) {
// Ignore disconnect errors
}
await redis.connect();
}
if (!isRedisPublisherConnected) {
console.log("Ensuring Redis publisher connection...");
try {
await redisPublisher.disconnect();
} catch (e) {
// Ignore disconnect errors
}
await redisPublisher.connect();
}
} catch (error) {
console.error("Failed to reconnect to Redis:", error);
// Schedule another attempt
setTimeout(ensureRedisConnection, 2000);
}
};
// Set up a more frequent heartbeat to keep the Redis connection alive
heartbeatIntervalId = setInterval(async () => {
try {
if (isRedisConnected) {
// Send a ping to keep the connection alive
await redis.ping();
console.log("Redis heartbeat: connection alive");
} else {
console.log("Redis heartbeat: reconnecting...");
await ensureRedisConnection();
}
} catch (error) {
console.error("Redis heartbeat error:", error);
isRedisConnected = false;
await ensureRedisConnection();
}
}, heartbeatInterval);
// Additional persistence interval that forces reconnection periodically
persistenceIntervalId = setInterval(async () => {
console.log("Persistence check: ensuring Redis connections are healthy");
// Force a ping to check connection health
try {
await redis.ping();
await redisPublisher.ping();
} catch (error) {
console.error("Persistence check failed:", error);
await ensureRedisConnection();
}
// If we've had too many reconnect attempts, force a clean reconnection
if (reconnectAttempts > maxReconnectAttempts) {
console.log(`Too many reconnect attempts (${reconnectAttempts}/${maxReconnectAttempts}), forcing clean reconnection`);
try {
await redis.disconnect();
await redisPublisher.disconnect();
} catch (e) {
// Ignore disconnect errors
}
// Short delay before reconnecting
await new Promise(resolve => setTimeout(resolve, 1000));
try {
await redis.connect();
await redisPublisher.connect();
reconnectAttempts = 0;
} catch (error) {
console.error("Forced reconnection failed:", error);
}
}
}, persistenceInterval);
// Handle process termination to clean up resources
const handleTermination = async () => {
console.log("Cleaning up resources before termination");
if (heartbeatIntervalId) clearInterval(heartbeatIntervalId);
if (persistenceIntervalId) clearInterval(persistenceIntervalId);
try {
await redis.disconnect();
await redisPublisher.disconnect();
} catch (e) {
// Ignore disconnect errors
}
process.exit(0);
};
// Register termination handlers if they haven't been registered yet
if (process.listenerCount('SIGTERM') === 0) {
process.on('SIGTERM', handleTermination);
}
if (process.listenerCount('SIGINT') === 0) {
process.on('SIGINT', handleTermination);
}
return async function mcpApiHandler(
req: IncomingMessage,
res: ServerResponse
) {
// Ensure Redis connection before processing request
await ensureRedisConnection();
await redisPromise;
const url = new URL(req.url || "", "https://example.com");
if (url.pathname === "/sse") {
console.log("Got new SSE connection");
const transport = new SSEServerTransport("/message", res);
const sessionId = transport.sessionId;
const server = new McpServer(
{
name: "mcp-typescript server on vercel",
version: "0.1.0",
},
serverOptions
);
initializeServer(server);
servers.push(server);
server.server.onclose = () => {
console.log("SSE connection closed");
servers = servers.filter((s) => s !== server);
};
let logs: {
type: "log" | "error";
messages: string[];
}[] = [];
// This ensures that we logs in the context of the right invocation since the subscriber
// is not itself invoked in request context.
function logInContext(severity: "log" | "error", ...messages: string[]) {
logs.push({
type: severity,
messages,
});
}
// Handles messages originally received via /message
const handleMessage = async (message: string) => {
console.log("Received message from Redis", message);
logInContext("log", "Received message from Redis", message);
const request = JSON.parse(message) as SerializedRequest;
// Make in IncomingMessage object because that is what the SDK expects.
const req = createFakeIncomingMessage({
method: request.method,
url: request.url,
headers: request.headers,
body: request.body,
});
const syntheticRes = new ServerResponse(req);
let status = 100;
let body = "";
syntheticRes.writeHead = (statusCode: number) => {
status = statusCode;
return syntheticRes;
};
syntheticRes.end = (b: unknown) => {
body = b as string;
return syntheticRes;
};
await transport.handlePostMessage(req, syntheticRes);
await redisPublisher.publish(
`responses:${sessionId}:${request.requestId}`,
JSON.stringify({
status,
body,
})
);
if (status >= 200 && status < 300) {
logInContext(
"log",
`Request ${sessionId}:${request.requestId} succeeded: ${body}`
);
} else {
logInContext(
"error",
`Message for ${sessionId}:${request.requestId} failed with status ${status}: ${body}`
);
}
};
const interval = setInterval(() => {
for (const log of logs) {
console[log.type].call(console, ...log.messages);
}
logs = [];
}, 100);
await redis.subscribe(`requests:${sessionId}`, handleMessage);
console.log(`Subscribed to requests:${sessionId}`);
let timeout: NodeJS.Timeout;
let resolveTimeout: (value: unknown) => void;
const waitPromise = new Promise((resolve) => {
resolveTimeout = resolve;
timeout = setTimeout(() => {
resolve("max duration reached");
}, (maxDuration - 5) * 1000);
});
async function cleanup() {
clearTimeout(timeout);
clearInterval(interval);
await redis.unsubscribe(`requests:${sessionId}`, handleMessage);
console.log("Done");
res.statusCode = 200;
res.end();
}
req.on("close", () => resolveTimeout("client hang up"));
// Handle process termination to clean up resources
const handleSessionTermination = async () => {
console.log("Cleaning up session resources before termination");
await cleanup();
};
// Register session-specific cleanup
if (process.listenerCount('SIGTERM') === 1) { // Only our global handler
process.on('SIGTERM', handleSessionTermination);
}
if (process.listenerCount('SIGINT') === 1) { // Only our global handler
process.on('SIGINT', handleSessionTermination);
}
await server.connect(transport);
const closeReason = await waitPromise;
console.log(closeReason);
await cleanup();
// Remove session-specific handlers
process.removeListener('SIGTERM', handleSessionTermination);
process.removeListener('SIGINT', handleSessionTermination);
} else if (url.pathname === "/message") {
console.log("Received message");
const body = await getRawBody(req, {
length: req.headers["content-length"],
encoding: "utf-8",
});
const sessionId = url.searchParams.get("sessionId") || "";
if (!sessionId) {
res.statusCode = 400;
res.end("No sessionId provided");
return;
}
const requestId = crypto.randomUUID();
const serializedRequest: SerializedRequest = {
requestId,
url: req.url || "",
method: req.method || "",
body: body,
headers: req.headers,
};
// Handles responses from the /sse endpoint.
await redis.subscribe(
`responses:${sessionId}:${requestId}`,
(message) => {
clearTimeout(timeout);
const response = JSON.parse(message) as {
status: number;
body: string;
};
res.statusCode = response.status;
res.end(response.body);
}
);
// Queue the request in Redis so that a subscriber can pick it up.
// One queue per session.
await redisPublisher.publish(
`requests:${sessionId}`,
JSON.stringify(serializedRequest)
);
console.log(`Published requests:${sessionId}`, serializedRequest);
let timeout = setTimeout(async () => {
await redis.unsubscribe(`responses:${sessionId}:${requestId}`);
res.statusCode = 408;
res.end("Request timed out");
}, 10 * 1000);
res.on("close", async () => {
clearTimeout(timeout);
await redis.unsubscribe(`responses:${sessionId}:${requestId}`);
});
} else {
res.statusCode = 404;
res.end("Not found");
}
};
}
// Define the options interface
interface FakeIncomingMessageOptions {
method?: string;
url?: string;
headers?: IncomingHttpHeaders;
body?: string | Buffer | Record<string, any> | null;
socket?: Socket;
}
// Create a fake IncomingMessage
function createFakeIncomingMessage(
options: FakeIncomingMessageOptions = {}
): IncomingMessage {
const {
method = "GET",
url = "/",
headers = {},
body = null,
socket = new Socket(),
} = options;
// Create a readable stream that will be used as the base for IncomingMessage
const readable = new Readable();
readable._read = (): void => {}; // Required implementation
// Add the body content if provided
if (body) {
if (typeof body === "string") {
readable.push(body);
} else if (Buffer.isBuffer(body)) {
readable.push(body);
} else {
readable.push(JSON.stringify(body));
}
readable.push(null); // Signal the end of the stream
}
// Create the IncomingMessage instance
const req = new IncomingMessage(socket);
// Set the properties
req.method = method;
req.url = url;
req.headers = headers;
// Copy over the stream methods
req.push = readable.push.bind(readable);
req.read = readable.read.bind(readable);
req.on = readable.on.bind(readable);
req.pipe = readable.pipe.bind(readable);
return req;
}