import { createServer, IncomingMessage, ServerResponse } from "http";
import { randomUUID } from "crypto";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
// Map to store transports by session ID
const transports: Record<string, StreamableHTTPServerTransport> = {};
/**
* Detect client type from initialization request
*/
function detectClientType(requestBody: any, userAgent?: string): string {
const clientInfo = requestBody?.params?.clientInfo;
const name = clientInfo?.name?.toLowerCase() || "";
const ua = userAgent?.toLowerCase() || "";
return name.includes("openai") || ua.includes("openai")
? "openai-mcp"
: "unknown";
}
/**
* Parse request body as JSON
*/
async function parseRequestBody(req: IncomingMessage): Promise<any> {
return new Promise((resolve, reject) => {
let body = "";
req.on("data", (chunk) => {
body += chunk.toString();
});
req.on("end", () => {
try {
resolve(body ? JSON.parse(body) : {});
} catch (error) {
reject(new Error("Invalid JSON in request body"));
}
});
req.on("error", reject);
});
}
/**
* Get information about active sessions (for debugging/monitoring)
*/
function getSessionInfo() {
return {
activeSessions: Object.keys(transports).length,
sessionIds: Object.keys(transports),
};
}
/**
* Send JSON-RPC error response
*/
function sendJsonError(
res: ServerResponse,
code: number,
errorCode: number,
message: string
): void {
res.writeHead(code, { "Content-Type": "application/json" });
res.end(
JSON.stringify({
jsonrpc: "2.0",
error: { code: errorCode, message },
id: null,
})
);
}
/**
* Handle MCP POST requests
*/
async function handleMcpPost(
req: IncomingMessage,
res: ServerResponse,
standardServerFactory: () => McpServer,
openaiServerFactory: () => McpServer
): Promise<void> {
const sessionId = req.headers["mcp-session-id"] as string | undefined;
// Ensure Accept header includes required content types for MCP
if (!req.headers.accept?.includes("text/event-stream")) {
req.headers.accept = "application/json, text/event-stream";
}
// Parse request body
let requestBody: any;
try {
requestBody = await parseRequestBody(req);
} catch {
sendJsonError(res, 400, -32700, "Parse error");
return;
}
try {
let transport: StreamableHTTPServerTransport;
if (sessionId && transports[sessionId]) {
// reuse existing transport
transport = transports[sessionId];
} else if (!sessionId && isInitializeRequest(requestBody)) {
// new initialization request - create new session
const newSessionId = randomUUID();
// Detect client type and select appropriate server factory
const clientType = detectClientType(
requestBody,
req.headers["user-agent"]
);
console.log(`Detected client type: ${clientType}`);
const serverFactory =
clientType === "openai-mcp"
? openaiServerFactory
: standardServerFactory;
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => newSessionId,
onsessioninitialized: (sid) => {
console.log(`Session initialized with ID: ${sid}`);
transports[sid] = transport;
},
});
// set up onclose handler to clean up transport when closed
transport.onclose = () => {
const sid = transport.sessionId;
if (sid && transports[sid]) {
console.log(
`Transport closed for session ${sid}, removing from transports map`
);
delete transports[sid];
}
};
// connect the transport to the MCP server BEFORE handling the request
// so responses can flow back through the same transport
const server = serverFactory();
await server.connect(transport);
await transport.handleRequest(req, res, requestBody);
return;
} else {
// invalid request - no session ID and not an initialize request
sendJsonError(
res,
400,
-32000,
sessionId
? "Session not found"
: "Bad Request: No valid session ID provided"
);
return;
}
// Handle request with existing transport - no need to reconnect
// the existing transport is already connected to the server
await transport.handleRequest(req, res, requestBody);
} catch (error) {
console.error("Error handling MCP request:", error);
if (!res.headersSent) {
sendJsonError(res, 500, -32603, "Internal server error");
}
}
}
/**
* Handle MCP GET requests for SSE streams
*/
async function handleMcpGet(
req: IncomingMessage,
res: ServerResponse
): Promise<void> {
const sessionId = req.headers["mcp-session-id"] as string | undefined;
if (!sessionId || !transports[sessionId]) {
res.writeHead(400, { "Content-Type": "text/plain" });
res.end("Invalid or missing session ID");
return;
}
// check for Last-Event-ID header for resumability
const lastEventId = req.headers["last-event-id"] as string | undefined;
if (lastEventId) {
console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`);
} else {
console.log(`Establishing new SSE stream for session ${sessionId}`);
}
await transports[sessionId].handleRequest(req, res);
}
/**
* Handle MCP DELETE requests for session termination
*/
async function handleMcpDelete(
req: IncomingMessage,
res: ServerResponse
): Promise<void> {
const sessionId = req.headers["mcp-session-id"] as string | undefined;
if (!sessionId || !transports[sessionId]) {
res.writeHead(400, { "Content-Type": "text/plain" });
res.end("Invalid or missing session ID");
return;
}
console.log(`Received session termination request for session ${sessionId}`);
try {
await transports[sessionId].handleRequest(req, res);
} catch (error) {
console.error("Error handling session termination:", error);
if (!res.headersSent) {
res.writeHead(500, { "Content-Type": "text/plain" });
res.end("Error processing session termination");
}
}
}
/**
* Start the HTTP server with the given server factories
*/
export async function startHttpServer(
port: number,
standardServerFactory: () => McpServer,
openaiServerFactory: () => McpServer
): Promise<void> {
const httpServer = createServer(async (req, res) => {
const url = (req.url || "/").split("?")[0];
// Set CORS headers for all responses
res.setHeader("Access-Control-Allow-Origin", "*");
res.setHeader("Access-Control-Allow-Methods", "GET,POST,OPTIONS,DELETE");
res.setHeader(
"Access-Control-Allow-Headers",
"Content-Type, MCP-Session-Id, mcp-session-id, MCP-Protocol-Version"
);
res.setHeader("Access-Control-Expose-Headers", "MCP-Session-Id");
// Handle preflight OPTIONS requests
if (req.method === "OPTIONS") {
res.writeHead(200);
res.end();
return;
}
try {
if (url === "/mcp") {
if (req.method === "POST") {
await handleMcpPost(
req,
res,
standardServerFactory,
openaiServerFactory
);
} else if (req.method === "GET") {
await handleMcpGet(req, res);
} else if (req.method === "DELETE") {
await handleMcpDelete(req, res);
} else {
res.writeHead(405, { "Content-Type": "text/plain" });
res.end("Method not allowed");
}
} else if (url === "/ping") {
res.writeHead(200, { "Content-Type": "text/plain" });
res.end("pong");
} else if (url === "/sessions" && req.method === "GET") {
// Return session information for monitoring
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify(getSessionInfo(), null, 2));
} else if (url === "/.well-known/mcp-config" && req.method === "GET") {
// Return MCP configuration schema for server discovery
const configSchema = {
type: "object",
description: "No configuration required",
properties: {},
};
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ configSchema }, null, 2));
} else {
res.writeHead(404);
res.end("Not found");
}
} catch (error) {
console.error("Error handling request:", error);
if (!res.headersSent) {
res.writeHead(500);
res.end("Internal Server Error");
}
}
});
await new Promise<void>((resolve, reject) => {
httpServer.once("error", reject);
httpServer.listen(port, () => {
console.error(`Docfork MCP Server running on HTTP:`);
console.error(` • HTTP endpoint: http://localhost:${port}/mcp`);
console.error(` • Health check: http://localhost:${port}/ping`);
console.error(` • Session info: http://localhost:${port}/sessions`);
resolve();
});
});
// Handle graceful shutdown
const shutdown = async () => {
console.log("Shutting down server...");
for (const sid in transports) {
try {
const transport = transports[sid];
if (transport?.close) {
await transport.close();
}
delete transports[sid];
} catch (error) {
console.error(`Error closing transport for session ${sid}:`, error);
}
}
console.log("Server shutdown complete");
process.exit(0);
};
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
}
/**
* Start stdio transport with provided server factory
*/
export async function startStdioServer(
serverFactory: () => McpServer
): Promise<void> {
const server = serverFactory();
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Docfork MCP Server running on stdio");
}