import { Router, type Request, type Response } from "express";
import { randomUUID } from "node:crypto";
import { IncomingMessage, ServerResponse } from "node:http";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import type { ServerManager } from "../client/manager.js";
import { Aggregator } from "../server/aggregator.js";
import { createProxyServer } from "../server/proxy.js";
import type { Server } from "@modelcontextprotocol/sdk/server/index.js";
import logger from "../utils/logger.js";
/**
* Extract a route param value as a string.
* Express 5 types allow string | string[] for param values.
*/
function paramAsString(value: string | string[] | undefined): string {
if (Array.isArray(value)) return value[0] ?? "";
return value ?? "";
}
interface SessionInfo {
transport: StreamableHTTPServerTransport;
server: Server;
aggregator: Aggregator;
endpoint: string;
}
/** Map from sessionId to SessionInfo */
const sessions = new Map<string, SessionInfo>();
/**
* Creates a new transport + proxy server session for a given set of connections.
* Returns the transport, which will generate and track its own session ID.
*/
function createSession(
manager: ServerManager,
endpoint: string,
serverIds?: string[],
): StreamableHTTPServerTransport {
const connections = serverIds
? manager.getConnections(serverIds)
: manager.getConnections();
const aggregator = new Aggregator(connections);
const serverName = serverIds
? `mcp-server-hub-${serverIds.join("-")}`
: "mcp-server-hub-all";
const server = createProxyServer(aggregator, serverName);
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sessionId: string) => {
logger.info(`Session initialized: ${sessionId}`, { endpoint });
sessions.set(sessionId, { transport, server, aggregator, endpoint });
},
onsessionclosed: (sessionId: string) => {
logger.info(`Session closed: ${sessionId}`, { endpoint });
sessions.delete(sessionId);
},
});
// Connect the MCP server to this transport
server.connect(transport).catch((err) => {
logger.error("Failed to connect proxy server to transport", {
error: (err as Error).message,
endpoint,
});
});
return transport;
}
/**
* Look up an existing session by the mcp-session-id header.
*/
function getSessionTransport(
req: Request,
): StreamableHTTPServerTransport | undefined {
const sessionId = req.headers["mcp-session-id"];
if (typeof sessionId !== "string") return undefined;
return sessions.get(sessionId)?.transport;
}
/**
* Handles an incoming request by either routing to an existing session
* or creating a new one for initialization requests.
*/
async function handleMcpRequest(
req: Request,
res: Response,
manager: ServerManager,
endpoint: string,
serverIds?: string[],
): Promise<void> {
let transport = getSessionTransport(req);
if (!transport) {
// No existing session -- this should be an initialization request.
// Create a new transport; the SDK will validate whether it's truly
// an initialize request and return appropriate errors if not.
if (req.method === "GET" || req.method === "DELETE") {
// GET and DELETE require an existing session
res.status(400).json({
jsonrpc: "2.0",
error: { code: -32000, message: "Bad Request: Mcp-Session-Id header is required" },
id: null,
});
return;
}
transport = createSession(manager, endpoint, serverIds);
}
// Delegate to the StreamableHTTPServerTransport
// Express req/res extend IncomingMessage/ServerResponse
await transport.handleRequest(
req as unknown as IncomingMessage,
res as unknown as ServerResponse,
req.body,
);
}
export function createMcpRouter(manager: ServerManager): Router {
const router = Router();
// ── All servers aggregated: /mcp ─────────────────────────────
router.post("/mcp", async (req: Request, res: Response) => {
try {
await handleMcpRequest(req, res, manager, "/mcp");
} catch (err) {
logger.error("Error handling POST /mcp", { error: (err as Error).message });
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
});
router.get("/mcp", async (req: Request, res: Response) => {
try {
await handleMcpRequest(req, res, manager, "/mcp");
} catch (err) {
logger.error("Error handling GET /mcp", { error: (err as Error).message });
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
});
router.delete("/mcp", async (req: Request, res: Response) => {
try {
await handleMcpRequest(req, res, manager, "/mcp");
} catch (err) {
logger.error("Error handling DELETE /mcp", { error: (err as Error).message });
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
});
// ── Single server: /mcp/server/:id ──────────────────────────
router.post("/mcp/server/:id", async (req: Request, res: Response) => {
try {
const serverId = paramAsString(req.params.id);
if (!manager.getConnection(serverId)) {
res.status(404).json({ error: `Server "${serverId}" not found` });
return;
}
await handleMcpRequest(req, res, manager, `/mcp/server/${serverId}`, [serverId]);
} catch (err) {
logger.error("Error handling POST /mcp/server/:id", { error: (err as Error).message });
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
});
router.get("/mcp/server/:id", async (req: Request, res: Response) => {
try {
const serverId = paramAsString(req.params.id);
if (!manager.getConnection(serverId)) {
res.status(404).json({ error: `Server "${serverId}" not found` });
return;
}
await handleMcpRequest(req, res, manager, `/mcp/server/${serverId}`, [serverId]);
} catch (err) {
logger.error("Error handling GET /mcp/server/:id", { error: (err as Error).message });
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
});
router.delete("/mcp/server/:id", async (req: Request, res: Response) => {
try {
const serverId = paramAsString(req.params.id);
if (!manager.getConnection(serverId)) {
res.status(404).json({ error: `Server "${serverId}" not found` });
return;
}
await handleMcpRequest(req, res, manager, `/mcp/server/${serverId}`, [serverId]);
} catch (err) {
logger.error("Error handling DELETE /mcp/server/:id", { error: (err as Error).message });
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
});
// ── Group of servers: /mcp/group/:id ────────────────────────
router.post("/mcp/group/:id", async (req: Request, res: Response) => {
try {
const groupId = paramAsString(req.params.id);
const groupConnections = manager.getGroupConnections(groupId);
if (!groupConnections) {
res.status(404).json({ error: `Group "${groupId}" not found` });
return;
}
const serverIds = [...groupConnections.keys()];
await handleMcpRequest(req, res, manager, `/mcp/group/${groupId}`, serverIds);
} catch (err) {
logger.error("Error handling POST /mcp/group/:id", { error: (err as Error).message });
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
});
router.get("/mcp/group/:id", async (req: Request, res: Response) => {
try {
const groupId = paramAsString(req.params.id);
const groupConnections = manager.getGroupConnections(groupId);
if (!groupConnections) {
res.status(404).json({ error: `Group "${groupId}" not found` });
return;
}
const serverIds = [...groupConnections.keys()];
await handleMcpRequest(req, res, manager, `/mcp/group/${groupId}`, serverIds);
} catch (err) {
logger.error("Error handling GET /mcp/group/:id", { error: (err as Error).message });
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
});
router.delete("/mcp/group/:id", async (req: Request, res: Response) => {
try {
const groupId = paramAsString(req.params.id);
const groupConnections = manager.getGroupConnections(groupId);
if (!groupConnections) {
res.status(404).json({ error: `Group "${groupId}" not found` });
return;
}
const serverIds = [...groupConnections.keys()];
await handleMcpRequest(req, res, manager, `/mcp/group/${groupId}`, serverIds);
} catch (err) {
logger.error("Error handling DELETE /mcp/group/:id", { error: (err as Error).message });
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
});
return router;
}
/**
* Close all active sessions. Call this during graceful shutdown.
*/
export async function closeAllSessions(): Promise<void> {
logger.info(`Closing ${sessions.size} active session(s)...`);
const closeTasks = [...sessions.values()].map(async ({ transport, server }) => {
try {
await transport.close();
await server.close();
} catch (err) {
logger.warn("Error closing session", { error: (err as Error).message });
}
});
await Promise.allSettled(closeTasks);
sessions.clear();
logger.info("All sessions closed");
}