sse.ts•5.85 kB
import express, { Request, Response } from 'express';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { z } from 'zod';
import { CallToolResult } from '@modelcontextprotocol/sdk/types.js';
import cors from 'cors';
/**
* This example server demonstrates the deprecated HTTP+SSE transport
* (protocol version 2024-11-05). It mainly used for testing backward compatible clients.
*
* The server exposes two endpoints:
* - /mcp: For establishing the SSE stream (GET)
* - /:sessionId/messages: For receiving client messages (POST)
*/
// Create an MCP server instance
const getServer = () => {
const server = new McpServer({
name: 'simple-sse-server',
version: '1.0.0',
}, { capabilities: { logging: {} } });
server.tool(
'hello',
'Just return the message "Hello World {name}"',
{
name: z.string().describe('The name to greet'),
},
async ({ name }: { name: string }): Promise<CallToolResult> => {
// Return the result as required by CallToolResult
return {
content: [
{
type: "text",
text: `Hello World ${name}`
}
]
};
}
);
return server;
};
const app = express();
app.use(express.json());
app.use(cors()); // Enable CORS for all routes
// Store transports by session ID
const transports: Record<string, SSEServerTransport> = {};
// SSE endpoint for establishing the stream
app.get('/mcp', async (req: Request, res: Response) => {
console.log('Received GET request to /mcp (establishing SSE stream)');
try {
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
});
// Create a new SSE transport for the client
const transport = new SSEServerTransport('/messages', res);
// Store the transport by session ID
const sessionId = transport.sessionId;
transports[sessionId] = transport;
console.log(`[SSE] Transport created for sessionId: ${sessionId}`);
// Set up onclose handler to clean up transport when closed
transport.onclose = () => {
console.log(`SSE transport closed for session ${sessionId}`);
delete transports[sessionId];
console.log(`[SSE] Transport deleted for sessionId: ${sessionId}`);
};
// Connect the transport to the MCP server
const server = getServer();
await server.connect(transport);
console.log(`[SSE] Server connected to transport for sessionId: ${sessionId}`);
// Send endpoint as a plain string event with sessionId in path
res.write(`event: endpoint\ndata: /${sessionId}/messages\n\n`);
// Send sessionId as a plain string event
res.write(`event: session\ndata: ${sessionId}\n\n`);
console.log(`[SSE] Initial endpoint and session events sent for sessionId: ${sessionId}`);
console.log(`Established SSE stream with session ID: ${sessionId}`);
} catch (error) {
console.error('Error establishing SSE stream:', error);
if (!res.headersSent) {
res.status(500).send('Error establishing SSE stream');
}
}
});
// Messages endpoint for receiving client JSON-RPC requests
app.post('/:sessionId/messages', async (req: Request, res: Response) => {
console.log('Received POST request to /:sessionId/messages');
console.log(`[POST] Params:`, req.params);
console.log(`[POST] Body:`, req.body);
console.log(`[POST] Headers:`, req.headers);
console.log(`[POST] Request method:`, req.method);
console.log(`[POST] Request URL:`, req.url);
// Extract session ID from URL path parameter
const sessionId = req.params.sessionId as string | undefined;
console.log(`[POST] Extracted sessionId from params: ${sessionId}`);
if (!sessionId) {
console.error('No session ID provided in request URL');
res.status(400).json({ error: 'Missing sessionId parameter' });
return;
}
const transport = transports[sessionId];
if (!transport) {
console.error(`[POST] No active transport found for session ID: ${sessionId}`);
console.log(`[POST] Current transports:`, Object.keys(transports));
res.status(404).json({ error: 'Session not found' });
return;
}
try {
// Handle the POST message with the transport
console.log(`[POST] Handling message for sessionId: ${sessionId}`);
await transport.handlePostMessage(req, res, req.body);
console.log(`[POST] Message handled for sessionId: ${sessionId}`);
} catch (error) {
console.error('Error handling request:', error);
if (!res.headersSent) {
res.status(500).json({ error: 'Error handling request' });
}
}
});
// Health check endpoint
app.get('/health', (req: Request, res: Response) => {
res.json({ status: 'ok', timestamp: new Date().toISOString() });
});
// Start the server
const PORT = process.env.PORT || 3000;
const server = app.listen(PORT, () => {
console.log(`Simple SSE Server (deprecated protocol version 2024-11-05) listening on port ${PORT}`);
});
// Handle server shutdown gracefully
const shutdown = async () => {
console.log('Shutting down server...');
// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId].close();
delete transports[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
// Close the HTTP server
server.close(() => {
console.log('Server shutdown complete');
process.exit(0);
});
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);