QuantMCP
by dougdotcon
- QuantMCP
- lib
import getRawBody from "raw-body";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { IncomingHttpHeaders, IncomingMessage, ServerResponse } from "http";
import { createClient } from "redis";
import { Socket } from "net";
import { Readable } from "stream";
import { ServerOptions } from "@modelcontextprotocol/sdk/server/index.js";
import vercelJson from "../vercel.json";
interface SerializedRequest {
requestId: string;
url: string;
method: string;
body: string;
headers: IncomingHttpHeaders;
}
export function initializeMcpApiHandler(
initializeServer: (server: McpServer) => void,
serverOptions: ServerOptions = {}
) {
const maxDuration =
vercelJson?.functions?.["api/server.ts"]?.maxDuration || 800;
const redisUrl = process.env.REDIS_URL || process.env.KV_URL;
if (!redisUrl) {
throw new Error("REDIS_URL environment variable is not set");
}
const redis = createClient({
url: redisUrl,
});
const redisPublisher = createClient({
url: redisUrl,
});
redis.on("error", (err) => {
console.error("Redis error", err);
});
redisPublisher.on("error", (err) => {
console.error("Redis error", err);
});
const redisPromise = Promise.all([redis.connect(), redisPublisher.connect()]);
let servers: McpServer[] = [];
return async function mcpApiHandler(
req: IncomingMessage,
res: ServerResponse
) {
await redisPromise;
const url = new URL(req.url || "", "https://example.com");
if (url.pathname === "/sse") {
console.log("Got new SSE connection");
const transport = new SSEServerTransport("/message", res);
const sessionId = transport.sessionId;
const server = new McpServer(
{
name: "mcp-typescript server on vercel",
version: "0.1.0",
},
serverOptions
);
initializeServer(server);
servers.push(server);
server.server.onclose = () => {
console.log("SSE connection closed");
servers = servers.filter((s) => s !== server);
};
let logs: {
type: "log" | "error";
messages: string[];
}[] = [];
// This ensures that we logs in the context of the right invocation since the subscriber
// is not itself invoked in request context.
function logInContext(severity: "log" | "error", ...messages: string[]) {
logs.push({
type: severity,
messages,
});
}
// Handles messages originally received via /message
const handleMessage = async (message: string) => {
console.log("Received message from Redis", message);
logInContext("log", "Received message from Redis", message);
const request = JSON.parse(message) as SerializedRequest;
// Make in IncomingMessage object because that is what the SDK expects.
const req = createFakeIncomingMessage({
method: request.method,
url: request.url,
headers: request.headers,
body: request.body,
});
const syntheticRes = new ServerResponse(req);
let status = 100;
let body = "";
syntheticRes.writeHead = (statusCode: number) => {
status = statusCode;
return syntheticRes;
};
syntheticRes.end = (b: unknown) => {
body = b as string;
return syntheticRes;
};
await transport.handlePostMessage(req, syntheticRes);
await redisPublisher.publish(
`responses:${sessionId}:${request.requestId}`,
JSON.stringify({
status,
body,
})
);
if (status >= 200 && status < 300) {
logInContext(
"log",
`Request ${sessionId}:${request.requestId} succeeded: ${body}`
);
} else {
logInContext(
"error",
`Message for ${sessionId}:${request.requestId} failed with status ${status}: ${body}`
);
}
};
const interval = setInterval(() => {
for (const log of logs) {
console[log.type].call(console, ...log.messages);
}
logs = [];
}, 100);
await redis.subscribe(`requests:${sessionId}`, handleMessage);
console.log(`Subscribed to requests:${sessionId}`);
let timeout: NodeJS.Timeout;
let resolveTimeout: (value: unknown) => void;
const waitPromise = new Promise((resolve) => {
resolveTimeout = resolve;
timeout = setTimeout(() => {
resolve("max duration reached");
}, (maxDuration - 5) * 1000);
});
async function cleanup() {
clearTimeout(timeout);
clearInterval(interval);
await redis.unsubscribe(`requests:${sessionId}`, handleMessage);
console.log("Done");
res.statusCode = 200;
res.end();
}
req.on("close", () => resolveTimeout("client hang up"));
await server.connect(transport);
const closeReason = await waitPromise;
console.log(closeReason);
await cleanup();
} else if (url.pathname === "/message") {
console.log("Received message");
const body = await getRawBody(req, {
length: req.headers["content-length"],
encoding: "utf-8",
});
const sessionId = url.searchParams.get("sessionId") || "";
if (!sessionId) {
res.statusCode = 400;
res.end("No sessionId provided");
return;
}
const requestId = crypto.randomUUID();
const serializedRequest: SerializedRequest = {
requestId,
url: req.url || "",
method: req.method || "",
body: body,
headers: req.headers,
};
// Handles responses from the /sse endpoint.
await redis.subscribe(
`responses:${sessionId}:${requestId}`,
(message) => {
clearTimeout(timeout);
const response = JSON.parse(message) as {
status: number;
body: string;
};
res.statusCode = response.status;
res.end(response.body);
}
);
// Queue the request in Redis so that a subscriber can pick it up.
// One queue per session.
await redisPublisher.publish(
`requests:${sessionId}`,
JSON.stringify(serializedRequest)
);
console.log(`Published requests:${sessionId}`, serializedRequest);
let timeout = setTimeout(async () => {
await redis.unsubscribe(`responses:${sessionId}:${requestId}`);
res.statusCode = 408;
res.end("Request timed out");
}, 10 * 1000);
res.on("close", async () => {
clearTimeout(timeout);
await redis.unsubscribe(`responses:${sessionId}:${requestId}`);
});
} else {
res.statusCode = 404;
res.end("Not found");
}
};
}
// Define the options interface
interface FakeIncomingMessageOptions {
method?: string;
url?: string;
headers?: IncomingHttpHeaders;
body?: string | Buffer | Record<string, any> | null;
socket?: Socket;
}
// Create a fake IncomingMessage
function createFakeIncomingMessage(
options: FakeIncomingMessageOptions = {}
): IncomingMessage {
const {
method = "GET",
url = "/",
headers = {},
body = null,
socket = new Socket(),
} = options;
// Create a readable stream that will be used as the base for IncomingMessage
const readable = new Readable();
readable._read = (): void => {}; // Required implementation
// Add the body content if provided
if (body) {
if (typeof body === "string") {
readable.push(body);
} else if (Buffer.isBuffer(body)) {
readable.push(body);
} else {
readable.push(JSON.stringify(body));
}
readable.push(null); // Signal the end of the stream
}
// Create the IncomingMessage instance
const req = new IncomingMessage(socket);
// Set the properties
req.method = method;
req.url = url;
req.headers = headers;
// Copy over the stream methods
req.push = readable.push.bind(readable);
req.read = readable.read.bind(readable);
req.on = readable.on.bind(readable);
req.pipe = readable.pipe.bind(readable);
return req;
}