Skip to main content
Glama

Streamable HTTP Bridge

by masx200
Mulan Permissive Software License, Version 2
2
  • Linux
  • Apple
main.ts23.7 kB
import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"; import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"; import cors from "cors"; import express from "express"; import fs, { type StatWatcher } from "fs"; import morgan from "morgan"; import { randomUUID } from "node:crypto"; import { readFileSync, unwatchFile, watchFile } from "node:fs"; import { dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; import authenticateToken from "./authenticateToken.js"; import { createMcpServer } from "./createMcpServer.js"; import { mergeConfigs } from "./mergeConfigs.js"; import { parseCommandLineArgs } from "./parseCommandLineArgs.js"; import { createServer, IncomingMessage, ServerResponse } from "http"; import { WebSocketServer } from "ws"; import type { WebSocketClientTransport } from "./WebSocketClientTransport.js"; import { WebSocketServerTransport, type WebSocketServerTransportOptions, } from "./WebSocketServerTransport.js"; const wsservertransports = new Set<WebSocketServerTransport>(); export interface McpServerConfig { protocols?: string | string[]; headers?: Record<string, string>; type?: string; transport?: string; url?: string; httpUrl?: string; sseUrl?: string; wsUrl?: string; command?: string; args?: string[]; cwd?: string; env?: Record<string, string>; } // 配置接口定义 export interface Config { sseServer?: { enabled?: boolean; endpoint?: string; messageEndpoint?: string; }; wsServer?: { enabled?: boolean; pathPrefix?: string; }; enableHttpServer?: boolean; config?: string | undefined; pathPrefix?: string; hotReload?: boolean; version?: boolean; apiKey?: string; port?: number; host?: string; corsAllowOrigins?: string[]; mcpServers?: { [key: string]: McpServerConfig; }; } export interface ServerInstance { server?: McpServer; client?: Client; transport?: | StdioClientTransport | SSEClientTransport | StreamableHTTPClientTransport | WebSocketClientTransport; httpTransports?: StreamableHTTPServerTransport[]; sseTransports?: SSEServerTransport[]; config: McpServerConfig; } // 默认配置 export const DEFAULT_CONFIG: Config = { sseServer: { enabled: true, endpoint: "/sse", messageEndpoint: "/messages", }, enableHttpServer: true, wsServer: { enabled: true, pathPrefix: "/ws", }, pathPrefix: "/mcp", hotReload: false, version: false, apiKey: "", port: 3000, host: "0.0.0.0", corsAllowOrigins: ["*"], mcpServers: {}, }; // 全局变量 export let config: Config = { ...DEFAULT_CONFIG }; let servers: Map<string, ServerInstance> = new Map(); let configFilePath: string = "settings.json"; let configWatcher: StatWatcher | null = null; // 加载配置文件 function loadConfigFile(filePath: string): Config { try { const fileContent = readFileSync(filePath, "utf-8"); const fileConfig = JSON.parse(fileContent) as Config; return { ...DEFAULT_CONFIG, ...fileConfig }; } catch (error) { console.warn(`Failed to load config file ${filePath}:`, error); process.exit(1); } } // 从环境变量加载配置 function loadEnvConfig(): Partial<Config> { return { apiKey: process.env.BRIDGE_API_TOKEN, port: process.env.BRIDGE_API_PORT ? parseInt(process.env.BRIDGE_API_PORT) : undefined, pathPrefix: process.env.BRIDGE_STREAMABLE_HTTP_PATH, }; } // 初始化所有MCP服务器 async function initializeServers(config: Config) { if (!config.mcpServers) return; cleanup(); // 清理现有服务器 for (const [serverName, instance] of servers) { try { instance?.server?.close(); instance?.client?.close(); instance?.transport?.close(); if (instance.httpTransports) { instance.httpTransports.forEach((transport) => transport.close()); } if (instance.sseTransports) { instance.sseTransports.forEach((transport) => transport.close()); } } catch (error) { console.error(`Error closing server ${serverName}:`, error); } } servers.clear(); // 创建新服务器 for (const [serverName, serverConfig] of Object.entries(config.mcpServers)) { if ( !Object.keys(serverConfig).includes("url") && !Object.keys(serverConfig).includes("command") && !Object.keys(serverConfig).includes("wsUrl") && !Object.keys(serverConfig).includes("httpUrl") && !Object.keys(serverConfig).includes("sseUrl") ) { throw new Error( "url, command, wsUrl, httpUrl, sseUrl are required,configuration is invalid, please check the configuration file", ); } try { // const instance = await createMcpServer(serverName, serverConfig); servers.set(serverName, { config: serverConfig }); console.log(`✅ Server '${serverName}' initialized successfully`); } catch (error) { console.error(`❌ Failed to initialize server '${serverName}':`, error); process.exit(1); } } } // 设置配置文件监听 function setupConfigWatcher(configFilePath: string) { if (configWatcher) { unwatchFile(configFilePath); } if (config.hotReload) { configWatcher = watchFile(configFilePath, { interval: 1000 }, () => { console.log("🔄 Configuration file changed, reloading..."); reloadConfiguration(); }); console.log(`👀 Watching for configuration changes in: ${configFilePath}`); } } // 重新加载配置 async function reloadConfiguration() { const cliConfig = parseCommandLineArgs(); const fileConfig = loadConfigFile(configFilePath); const envConfig = loadEnvConfig(); // console.log(JSON.stringify(cliConfig, null, 4)); // console.log(JSON.stringify(fileConfig, null, 4)); // console.log(JSON.stringify(envConfig, null, 4)); config = mergeConfigs(cliConfig, fileConfig, envConfig); console.log("📋 Configuration reloaded:", JSON.stringify(config, null, 2)); // 重新设置配置监听 setupConfigWatcher(configFilePath); // 重新初始化服务器 await initializeServers(config); await server.close(); cleanup(); server = await main(); } const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); //@ts-ignore const packageJson = await fs.promises.readFile( join(__dirname, "./package.json"), { encoding: "utf-8", }, ); const packageJsonObj = JSON.parse(packageJson); // 主函数 async function main() { // 解析命令行参数 const cliConfig = parseCommandLineArgs(); // 显示版本信息 if (cliConfig.version) { console.log("MCP Streamable HTTP Bridge version " + packageJsonObj.version); process.exit(0); } // 确定配置文件路径 if (cliConfig.config) { configFilePath = cliConfig.config; } // 加载配置 const fileConfig = loadConfigFile(configFilePath); const envConfig = loadEnvConfig(); // console.log(JSON.stringify(cliConfig, null, 4)); // console.log(JSON.stringify(fileConfig, null, 4)); // console.log(JSON.stringify(envConfig, null, 4)); config = mergeConfigs(cliConfig, fileConfig, envConfig); console.log("📋 Configuration:", JSON.stringify(config, null, 2)); // 设置配置文件监听 setupConfigWatcher(configFilePath); // 初始化服务器 await initializeServers(config); // 创建Express应用 const app = express(); // 添加日志中间件 app.use(morgan("combined")); // CORS配置 app.use( cors({ origin: config.corsAllowOrigins, exposedHeaders: ["Mcp-Session-Id"], allowedHeaders: ["Content-Type", "mcp-session-id", "Authorization"], }), ); app.use(express.json()); app.use(authenticateToken(config)); const transports = new Map<string, StreamableHTTPServerTransport>(); const sseTransports = new Map<string, SSEServerTransport>(); const pathPrefix = config.pathPrefix || "/mcp"; for (const [key, value] of servers) { if (config.enableHttpServer) { // 处理MCP请求 console.log( "registering pathPrefix", pathPrefix + "/" + key, pathPrefix + "/" + encodeURIComponent(key), ); app.all(pathPrefix + "/" + encodeURIComponent(key), async (req, res) => { const sessionId = req.headers["mcp-session-id"] as string; let transport: StreamableHTTPServerTransport; if (sessionId && transports.has(sessionId)) { transport = transports.get(sessionId)!; } else if (!sessionId && isInitializeRequest(req.body)) { // 新的初始化请求 transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), onsessioninitialized: (sessionId) => { transports.set(transport.sessionId!, transport); console.log(`New mcp session initialized: ${sessionId}`); }, }); // 选择第一个可用的服务器实例 const serverInstance = value; if (!serverInstance) { return res.status(500).json({ jsonrpc: "2.0", error: { code: -32003, message: "No MCP servers available", }, id: null, }); } serverInstance.httpTransports ??= []; serverInstance.httpTransports.push(transport); // 清理传输 transport.onclose = () => { if (transport.sessionId) { console.log(`Session closed: ${transport.sessionId}`); transports.delete(transport.sessionId); serverInstance.httpTransports ??= []; serverInstance.httpTransports = serverInstance.httpTransports .filter((t) => t !== transport); } }; transport.onerror = (error) => { if (transport.sessionId) { console.log(`Session errored: ${transport.sessionId}`); transports.delete(transport.sessionId); } console.error("Transport errored", error); }; const serverName = key; const serverConfig = value.config; // 初始化MCP服务器,懒加载实现 if (!serverInstance?.server) { console.log("Initializing MCP server", serverName, serverConfig); const instance = await createMcpServer(serverName, serverConfig); serverInstance.server = instance.server; serverInstance.client = instance.client; serverInstance.transport = instance.transport; } else { console.log( "already Initialized MCP server", serverName, serverConfig, ); } // 连接到MCP服务器 //@ts-ignore await serverInstance.server.connect(transport); console.log("transport", transport); } else { // 无效请求 return res.status(400).json({ jsonrpc: "2.0", error: { code: -32000, message: "Bad Request: No valid session ID provided", }, id: null, }); } await transport.handleRequest(req, res, req.body); }); } // 如果启用了SSE服务器,添加SSE相关路由 const serverConfig = value.config; if (config.sseServer && config.sseServer.enabled) { const sseEndpoint = config.sseServer.endpoint || "/sse"; const messageEndpoint = config.sseServer.messageEndpoint || "/messages"; // SSE端点 app.get(`${sseEndpoint}/${encodeURIComponent(key)}`, async (req, res) => { try { const serverInstance = value; if (!serverInstance) { return res.status(500).json({ jsonrpc: "2.0", error: { code: -32003, message: "No MCP servers available", }, id: null, }); } const serverName = key; // 初始化MCP服务器(如果尚未初始化) if (!serverInstance?.server) { console.log( "Initializing MCP server for SSE", serverName, serverConfig, ); const instance = await createMcpServer(serverName, serverConfig); serverInstance.server = instance.server; serverInstance.client = instance.client; serverInstance.transport = instance.transport; } // 创建SSE传输 const sseTransport = new SSEServerTransport( messageEndpoint + `/${encodeURIComponent(key)}`, res, ); serverInstance.sseTransports ??= []; serverInstance.sseTransports.push(sseTransport); // 存储SSE传输 sseTransports.set(sseTransport.sessionId, sseTransport); console.log(`New SSE session initialized: ${sseTransport.sessionId}`); // 设置响应关闭时的清理逻辑 res.on("close", () => { if (serverInstance.sseTransports?.includes(sseTransport)) { serverInstance.sseTransports = serverInstance.sseTransports .filter((t) => t !== sseTransport); } sseTransports.delete(sseTransport.sessionId); console.log(`SSE session closed: ${sseTransport.sessionId}`); }); // 连接到MCP服务器 //@ts-ignore await serverInstance.server.connect(sseTransport); console.log("sseTransport connected", sseTransport); } catch (error) { console.error(`Error handling SSE connection:`, error); if (!res.headersSent) { res.status(500).json({ error: "Internal server error" }); } } }); // SSE消息端点 app.post( `${messageEndpoint}/${encodeURIComponent(key)}`, async (req, res) => { try { const sessionId = req.query.sessionId as string; const sseTransport = sseTransports.get(sessionId); if (sseTransport) { await sseTransport.handlePostMessage(req, res, req.body); } else { res .status(400) .json({ error: "No SSE transport found for sessionId" }); } } catch (error) { console.error(`Error handling SSE message:`, error); if (!res.headersSent) { res.status(500).json({ error: "Internal server error" }); } } }, ); } } // 启动服务器 const port = config.port || 3000; const host = config.host || "0.0.0.0"; console.log("📋 Configuration:", JSON.stringify(config, null, 4)); const server = createServer( (request: IncomingMessage, response: ServerResponse) => { app(request, response); }, ); function validateBearerToken(token: string) { // 示例:简单对比 return token === (config.apiKey ?? process.env.HTTP_API_TOKEN); } server.on("upgrade", async (request, socket, head) => { try { console.log("receive upgrade request"); console.log({ head: head.toString(), url: request.url, method: request.method, headers: request.headers, }); if (!config.wsServer?.enabled) { socket.write("HTTP/1.1 404 Not Found\r\n\r\n"); socket.destroy(); return; } if (config.apiKey ?? process.env.HTTP_API_TOKEN) { const authHeader = request.headers["authorization"]; if (!authHeader || !authHeader.startsWith("Bearer ")) { socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n"); socket.destroy(); return; } const token = authHeader.slice("Bearer ".length); // 去掉 "Bearer " if (!validateBearerToken(token)) { socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n"); socket.destroy(); return; } } //@ts-ignore if (!request.url?.startsWith(config.wsServer?.pathPrefix ?? "/ws")) { socket.write("HTTP/1.1 404 Not Found\r\n\r\n"); socket.destroy(); return; } const mcpservername = request.url?.substring( (config.wsServer?.pathPrefix ?? "/ws").length + 1, ); const mcpserverconfig = config.mcpServers ?.[decodeURIComponent(mcpservername)]; if (!mcpserverconfig) { socket.write("HTTP/1.1 404 Not Found\r\n\r\n"); socket.destroy(); return; } const value = servers.get(mcpservername); if (!value) { socket.write("HTTP/1.1 404 Not Found\r\n\r\n"); socket.destroy(); return; } const serverConfig = value.config; // 初始化MCP服务器,懒加载实现 const serverInstance = value; const serverName = mcpservername; if (!serverInstance?.server) { console.log("Initializing MCP server", serverName, serverConfig); const instance = await createMcpServer(serverName, serverConfig); serverInstance.server = instance.server; serverInstance.client = instance.client; serverInstance.transport = instance.transport; } else { console.log( "already Initialized MCP server", serverName, serverConfig, ); } const options: WebSocketServerTransportOptions = { onMessage(message) { console.log("wsTransport message", JSON.stringify(message, null, 4)); }, onError(error) { console.error("wsTransport error", error); }, onOpen(socket) { console.log("wsTransport opened", socket.url); }, onsessionclosed: (sessionId) => { console.log("wsTransport sessionClosed", sessionId); }, onsessioninitialized: (sessionId) => { console.log("wsTransport sessionInitialized", sessionId); }, onClose: (socket) => { console.log("wsTransport closed", socket.url); wss.close(); }, onConnection: (socket) => { console.log("wsTransport connected", socket.url); }, path: (config.wsServer?.pathPrefix ?? "/ws") + "/" + encodeURIComponent(serverName), noServer: true, verifyClient: !(config.apiKey ?? process.env.HTTP_API_TOKEN) ? undefined : async function (info, callback) { const request = info.req as IncomingMessage; const authHeader = request.headers["authorization"]; if (!authHeader || !authHeader.startsWith("Bearer ")) { console.log("no authHeader,verifyClient failed"); callback(false); return; } const token = authHeader.slice("Bearer ".length); const result = validateBearerToken(token); console.log("verifyClient result", result); callback(result); }, }; const wss = new WebSocketServer({ ...options }); wss.on("error", (error) => { console.error("WebSocketServerTransport error", error); }); wss.on("connection", async (ws, request) => { const wsTransport = new WebSocketServerTransport(ws, request, options); console.log("wsserverTransport", wsTransport); if (!wsTransport) { throw new Error("wsTransport is not defined"); } const mcpserverinstance = serverInstance.server; if (!mcpserverinstance) { throw new Error("mcpserverinstance is not defined"); } // const wss = wsTransport.wss; // if (!wss) { // throw new Error("wss is not defined"); // } //@ts-ignore await mcpserverinstance.connect(wsTransport); console.log("mcpserverinstance connect", mcpserverinstance); console.log("wsTransport connected", wsTransport); wsservertransports.add(wsTransport); console.log("wsTransport connected", wsTransport.sessionId); }); wss.handleUpgrade(request, socket, head, (ws) => { wss.emit("connection", ws, request); }); } catch (error) { console.error("wsTransport error", error); } }); server.on("error", (err) => { console.error(`Error starting server: ${err.message}`); cleanup(); process.exit(1); }); server.listen(port, host, (/* err */) => { // if (err) { // console.error("Failed to start server:", err); // process.exit(1); // } if (config.enableHttpServer) { console.log( `🚀 MCP Bridge (stdio ↔ Streamable HTTP) \n listening on http://${host}:${port}${pathPrefix}`, ); } if (config.apiKey) { console.log(`🔒 API Key authentication enabled`); } else { console.log(`🔒 API Key authentication disabled`); } if (config.hotReload) { console.log(`🔄 Hot reload enabled`); } console.log( `📦 Configured MCP servers: ${ Object.keys(config.mcpServers || {}).join( ", ", ) }`, ); // 打印所有MCP HTTP端点 if (config.wsServer?.enabled) { console.log("🌐 Available MCP ws endpoints:"); for (const [key] of servers) { const endpoint = `${config.wsServer?.pathPrefix ?? "/ws"}/${ encodeURIComponent(key) }`; const encodedEndpoint = endpoint; console.log(key, `\n http://${host}:${port}${encodedEndpoint}`); } } if (config.enableHttpServer) { console.log("🌐 Available MCP HTTP endpoints:"); for (const [key] of servers) { const endpoint = `${pathPrefix}/${encodeURIComponent(key)}`; const encodedEndpoint = endpoint; console.log(key, `\n http://${host}:${port}${encodedEndpoint}`); } } if (config.sseServer && config.sseServer.enabled) { console.log("🌐 Available MCP SSE endpoints:"); const sseEndpoint = config.sseServer.endpoint || "/sse"; const messageEndpoint = config.sseServer.messageEndpoint || "/messages"; for (const [key] of servers) { console.log( key, `\nSSE Endpoint: \n http://${host}:${port}${sseEndpoint}/${ encodeURIComponent( key, ) }`, "\n", key, `\nMessage Endpoint: \n http://${host}:${port}${messageEndpoint}/${ encodeURIComponent( key, ) }`, ); } } }); return { async close() { cleanup(); return new Promise((resolve, reject) => { server.close((err) => { if (err) { reject(err); } console.log("server closed"); resolve(true); }); }); }, }; } // 启动应用 let server = await main().catch((error) => { console.error("Failed to start application:", error); process.exit(1); }); function cleanup() { for (const transport of wsservertransports) { transport.close(); } wsservertransports.clear(); }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/masx200/mcp-demo-streamable-http-bridge'

If you have feedback or need assistance with the MCP directory API, please join our Discord server