import type { Request, Response, NextFunction } from "express";
import express from "express";
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { ErrorCode, McpError } from "@modelcontextprotocol/sdk/types.js";
import type { AddressInfo } from "node:net";
import type { HttpTransportConfig } from "../config/runtime.js";
import { PROJECT_NAME } from "../config/constants.js";
import { PACKAGE_VERSION } from "../shared/version.js";
import { getServerContext } from "../mcp/server.js";
import { RegisteredTool } from "../mcp/tools/registerTools.js";
import { mcpConfigSchema } from "../schema/mcpConfig.js";
import { getSessionConfig as getRequestSessionConfig } from "../config/session.js";
import { normaliseBaseUrl, type SessionConfig as ServerSessionConfig } from "../shared/config/schema.js";
import type { SessionConfig as RequestSessionConfig } from "../config/session.js";
const JSON_BODY_LIMIT = "1mb";
const MCP_PATHS = ["/mcp", "/"] as const;
const HEALTH_PATH = "/healthz";
const REQUIRED_ACCEPT_TYPES = ["application/json", "text/event-stream"] as const;
function cloneSession(session: ServerSessionConfig): ServerSessionConfig {
return { ...session, defaultHeaders: { ...session.defaultHeaders } };
}
function parseHeadersJson(value: string | undefined): Record<string, string> | undefined {
if (!value) {
return undefined;
}
try {
const parsed = JSON.parse(value) as unknown;
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return undefined;
}
const headers: Record<string, string> = {};
for (const [key, raw] of Object.entries(parsed)) {
if (typeof raw === "string") {
headers[key] = raw;
} else if (raw !== null && raw !== undefined) {
headers[key] = String(raw);
}
}
return headers;
} catch {
return undefined;
}
}
function applySessionOverrides(
base: ServerSessionConfig,
overrides: RequestSessionConfig | undefined
): ServerSessionConfig {
const next = cloneSession(base);
if (!overrides) {
return next;
}
if (overrides.apiToken !== undefined) {
next.apiToken = overrides.apiToken;
}
if (overrides.defaultTeamId !== undefined) {
next.defaultTeamId = overrides.defaultTeamId;
}
if (overrides.baseUrl) {
next.baseUrl = normaliseBaseUrl(overrides.baseUrl);
}
if (overrides.requestTimeoutMs !== undefined && Number.isFinite(overrides.requestTimeoutMs)) {
const timeout = Math.ceil(Math.max(1, overrides.requestTimeoutMs) / 1000);
if (Number.isFinite(timeout) && timeout > 0) {
next.requestTimeout = timeout;
}
}
if (overrides.primaryLanguage) {
next.defaultHeaders["Accept-Language"] = overrides.primaryLanguage;
}
const parsedHeaders = parseHeadersJson(overrides.defaultHeadersJson);
if (parsedHeaders) {
for (const [key, value] of Object.entries(parsedHeaders)) {
next.defaultHeaders[key] = value;
}
}
if (overrides.authScheme) {
next.authScheme = overrides.authScheme;
}
return next;
}
function resolveMissingSessionFields(config: ServerSessionConfig): string[] {
const missing: string[] = [];
if (!config.apiToken?.trim()) missing.push("apiToken");
const teamId = config.defaultTeamId;
if (teamId === undefined || !Number.isFinite(teamId) || teamId <= 0) missing.push("defaultTeamId");
return missing;
}
function formatMissingFields(fields: string[]): string {
if (fields.length === 1) return `'${fields[0]}'`;
if (fields.length === 2) return `'${fields[0]}' and '${fields[1]}'`;
return `${fields.slice(0, -1).map(f => `'${f}'`).join(", ")}, and '${fields.at(-1)}'`;
}
function isInitializeRequest(body: unknown): body is { id?: unknown; method?: unknown } {
return Boolean(
body &&
typeof body === "object" &&
!Array.isArray(body) &&
"method" in body &&
typeof (body as { method?: unknown }).method === "string"
);
}
function isDebugEnabled(): boolean {
const value = process.env.MCP_DEBUG ?? "";
if (!value) {
return false;
}
const normalised = value.trim().toLowerCase();
return normalised !== "0" && normalised !== "false";
}
export function normaliseAcceptHeader(value: string | string[] | undefined): string {
if (!value) {
return REQUIRED_ACCEPT_TYPES.join(", ");
}
const items = Array.isArray(value) ? value : value.split(",");
const seen = new Set<string>();
const result: string[] = [];
for (const item of items) {
const trimmed = item.trim();
if (!trimmed) {
continue;
}
const base = trimmed.split(";")[0]?.trim().toLowerCase();
if (!base || seen.has(base)) {
continue;
}
seen.add(base);
result.push(trimmed);
}
for (const required of REQUIRED_ACCEPT_TYPES) {
if (!seen.has(required)) {
seen.add(required);
result.push(required);
}
}
return result.join(", ");
}
function ensureAcceptHeader(request: Request): void {
request.headers.accept = normaliseAcceptHeader(request.headers.accept);
}
function createCorsMiddleware(config?: HttpTransportConfig) {
const allowOrigin = config?.corsAllowOrigin ?? "*";
const allowHeaders =
config?.corsAllowHeaders ?? "Content-Type, MCP-Session-Id, MCP-Protocol-Version";
const allowMethods = config?.corsAllowMethods ?? "GET,POST,DELETE,OPTIONS";
return (_req: Request, res: Response, next: NextFunction) => {
res.setHeader("Access-Control-Allow-Origin", allowOrigin);
res.setHeader("Access-Control-Allow-Headers", allowHeaders);
res.setHeader("Access-Control-Allow-Methods", allowMethods);
next();
};
}
function createRequestLogger(subsystem: string) {
return (req: Request, res: Response, next: NextFunction) => {
if (!isDebugEnabled()) {
next();
return;
}
const startedAt = Date.now();
res.on("finish", () => {
const elapsed = Date.now() - startedAt;
const payload = {
subsystem,
method: req.method,
path: req.path,
status: res.statusCode,
elapsedMs: elapsed
};
process.stdout.write(`${JSON.stringify(payload)}\n`);
});
next();
};
}
function createJsonRpcLogger(subsystem: string) {
return {
inbound(message: unknown) {
if (!isDebugEnabled()) {
return;
}
const record = typeof message === "object" && message !== null ? (message as Record<string, unknown>) : undefined;
const type = Array.isArray(message) ? "batch" : record?.method ? "request" : record?.result !== undefined || record?.error !== undefined ? "response" : "unknown";
const payload = {
subsystem,
direction: "inbound",
type,
id: record?.id ?? null,
method: record?.method
};
process.stdout.write(`${JSON.stringify(payload)}\n`);
},
outbound(message: unknown) {
if (!isDebugEnabled()) {
return;
}
const record = typeof message === "object" && message !== null ? (message as Record<string, unknown>) : undefined;
const payload = {
subsystem,
direction: "outbound",
id: record?.id ?? null,
method: record?.method ?? null,
hasError: Boolean(record?.error)
};
process.stdout.write(`${JSON.stringify(payload)}\n`);
}
};
}
function applyTransportObservers(
transport: StreamableHTTPServerTransport,
logger: ReturnType<typeof createJsonRpcLogger>
): void {
const existingSend = transport.send.bind(transport);
transport.send = async (message, options) => {
logger.outbound(message);
return existingSend(message, options);
};
const previousOnMessage = transport.onmessage;
transport.onmessage = (message, extra) => {
logger.inbound(message);
previousOnMessage?.(message, extra);
};
}
function respondWithJson(res: Response, status: number, body: unknown): void {
if (!res.headersSent) {
res.status(status).json(body);
return;
}
res.end();
}
export async function startHttpBridge(
server: Server,
options: { port: number; host?: string }
): Promise<{ port: number; close: () => Promise<void> }> {
const { port, host } = options;
const context = getServerContext(server);
const httpConfig = context.runtime.transport.kind === "http" ? context.runtime.transport : undefined;
const app = express();
app.disable("x-powered-by");
app.use(createCorsMiddleware(httpConfig));
app.use(createRequestLogger("http.server"));
app.use(express.json({ limit: JSON_BODY_LIMIT }));
app.get("/.well-known/mcp-config", (_req, res) => {
res.type("application/schema+json").send(mcpConfigSchema);
});
for (const path of MCP_PATHS) {
app.options(path, (_req, res) => {
res.status(204).end();
});
}
app.get(HEALTH_PATH, (_req, res) => {
res.json({
ok: true,
transport: "http",
name: PROJECT_NAME,
version: PACKAGE_VERSION,
tools: context.tools.map((tool: RegisteredTool) => tool.name)
});
});
const supportedMethods = new Set(["POST", "GET", "DELETE"]);
for (const path of MCP_PATHS) {
app.all(path, async (req, res) => {
if (!supportedMethods.has(req.method)) {
respondWithJson(res, 405, {
jsonrpc: "2.0",
error: { code: -32601, message: "Method Not Allowed" },
id: null
});
return;
}
let requestConfig: RequestSessionConfig | undefined;
try {
requestConfig = getRequestSessionConfig(req);
} catch {
requestConfig = undefined;
}
context.session = applySessionOverrides(context.baseSession, requestConfig);
const parsedBody = req.method === "POST" ? req.body : undefined;
if (req.method === "POST" && isInitializeRequest(parsedBody) && parsedBody.method === "initialize") {
const missing = resolveMissingSessionFields(context.session);
if (missing.length > 0) {
const message = `Missing configuration: ${formatMissingFields(missing)}. Tools requiring authentication will fail until credentials are provided.`;
context.logger.warn("initialize_missing_config", { missing, message });
respondWithJson(res, 200, {
jsonrpc: "2.0",
error: { code: ErrorCode.InvalidParams, message, data: { missing } },
id: (parsedBody as { id?: unknown }).id ?? null
});
return;
}
}
const transport = new StreamableHTTPServerTransport(({
sessionIdGenerator: undefined,
enableJsonResponse: httpConfig?.enableJsonResponse ?? true,
allowedHosts: httpConfig?.allowedHosts,
allowedOrigins: httpConfig?.allowedOrigins,
enableDnsRebindingProtection: httpConfig?.enableDnsRebindingProtection ?? false,
initializeTimeoutMs:
httpConfig?.initializeTimeoutMs ?? context.runtime.httpInitializeTimeoutMs
} as unknown) as any); // cast to avoid upstream type mismatch
transport.onerror = error => {
const reason = error instanceof Error ? error.message : String(error);
context.logger.error("http_transport_error", { reason });
};
applyTransportObservers(transport, createJsonRpcLogger("http.rpc"));
let transportClosed = false;
const closeTransport = () => {
if (transportClosed) {
return;
}
transportClosed = true;
void transport.close();
};
res.on("close", closeTransport);
try {
if (req.method === "POST") {
ensureAcceptHeader(req);
}
await server.connect(transport);
await transport.handleRequest(req, res, parsedBody);
} catch (error) {
const reason = error instanceof Error ? error.message : String(error);
const baseLog = {
reason,
method: req.method,
path: req.path
} as Record<string, unknown>;
if (error instanceof McpError) {
baseLog.code = error.code;
context.logger.error("http_request_failed", baseLog);
const body = typeof req.body === "object" && req.body !== null ? (req.body as { id?: unknown }) : undefined;
const requestId = body?.id ?? null;
const jsonError: { code: number; message: string; data?: unknown } = {
code: error.code,
message: error.message
};
if (error.data !== undefined) {
jsonError.data = error.data;
}
respondWithJson(res, 200, {
jsonrpc: "2.0",
error: jsonError,
id: requestId
});
closeTransport();
return;
}
context.logger.error("http_request_failed", baseLog);
respondWithJson(res, 500, {
jsonrpc: "2.0",
error: { code: -32603, message: "Internal Server Error" },
id: null
});
closeTransport();
}
});
}
app.use((req, res) => {
respondWithJson(res, 404, { error: "Not Found" });
});
// ensure correct overload for app.listen
const serverInstance = host ? app.listen(port, host) : app.listen(port);
const actualPort = await new Promise<number>((resolve, reject) => {
serverInstance.once("error", reject);
serverInstance.once("listening", () => {
serverInstance.off("error", reject);
const address = serverInstance.address();
if (address && typeof address === "object") {
resolve((address as AddressInfo).port);
return;
}
if (typeof address === "number") {
resolve(address);
return;
}
resolve(port);
});
});
const close = async () =>
new Promise<void>((resolve, reject) => {
serverInstance.close(error => {
if (error) {
reject(error);
return;
}
resolve();
});
});
return { port: actualPort, close };
}