sse-server.ts•3.95 kB
import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { Hono } from "hono";
import type { Context } from "hono";
import { RESPONSE_ALREADY_SENT } from "@hono/node-server/utils/response";
import type { HttpBindings } from "@hono/node-server";
export function createSSEServer(mcpServer: McpServer) {
const app = new Hono<{ Bindings: HttpBindings }>();
const transportMap = new Map<string, SSEServerTransport>();
// SSE连接端点
app.get("/sse", async (c: Context) => {
const res = c.env.outgoing;
const transport = new SSEServerTransport("/messages", res);
transportMap.set(transport.sessionId, transport);
await mcpServer.connect(transport);
return RESPONSE_ALREADY_SENT;
});
// 消息处理端点
app.post("/messages", async (c: Context) => {
const sessionId = c.req.query("sessionId");
if (!sessionId) {
console.error("Message received without sessionId");
return c.json({ error: "sessionId is required" }, 400);
}
const transport = transportMap.get(sessionId);
if (transport) {
transport.handlePostMessage(c.env.incoming, c.env.outgoing);
}
return RESPONSE_ALREADY_SENT;
});
return app;
}
// import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
// import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
// import { Hono } from "hono";
// import { streamSSE } from "hono/streaming";
// import type { Context } from "hono";
// export function createSSEServer(mcpServer: McpServer) {
// const app = new Hono();
// // 中间件:设置SSE必需的头信息
// app.use("/sse/*", async (c, next) => {
// c.header("Content-Type", "text/event-stream");
// c.header("Cache-Control", "no-cache");
// c.header("Connection", "keep-alive");
// await next();
// });
// const transportMap = new Map<string, SSEServerTransport>();
// // SSE连接端点 (改造后)
// app.get("/sse", (c: Context) => {
// return streamSSE(c, async (stream) => {
// // 创建符合SSE协议的流
// const transport = new SSEServerTransport("/messages", {
// write: (data: string | Uint8Array<ArrayBufferLike>) =>
// stream.write(data),
// end: () => stream.close(),
// // on: (event: any, listener: any) => stream.on(event, listener),
// writeHead: (statusCode: number, headers: any) => {
// c.env.outgoing.status = statusCode;
// c.env.outgoing.headers.set(headers);
// },
// } as any);
// transportMap.set(transport.sessionId, transport);
// // 连接保活机制
// const keepAlive = setInterval(() => {
// stream.write(": keep-alive\n\n"); // SSE心跳包
// }, 15000);
// // 连接关闭清理
// stream.onAbort(() => {
// clearInterval(keepAlive);
// transportMap.delete(transport.sessionId);
// });
// await mcpServer.connect(transport);
// });
// });
// // 消息处理端点 (改造后)
// app.post("/messages", async (c: Context) => {
// const sessionId = c.req.query("sessionId");
// if (!sessionId) {
// console.error("Message received without sessionId");
// return c.json({ error: "sessionId is required" }, 400);
// }
// const transport = transportMap.get(sessionId);
// if (!transport) {
// console.error(`No transport found for sessionId: ${sessionId}`);
// return c.json({ error: "Invalid sessionId" }, 400);
// }
// // 使用标准HTTP处理代替底层env操作
// const req = c.req.raw;
// const res = new Response(null, { status: 200 });
// // await transport.handlePostMessage(req, res);
// transport.handlePostMessage(c.env.incoming, c.env.outgoing);
// return new Response(res.body, res);
// });
// return app;
// }