index.ts•7.26 kB
import express, { Request, Response } from 'express';
import { randomUUID } from "node:crypto";
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js';
import { InMemoryEventStore } from './event-store.js';
import cors from 'cors';
import { MCPServer } from '../src/servers/mcp-server';
/**
* This example server demonstrates backwards compatibility with:
* 2. The Streamable HTTP transport (protocol version 2025-03-26)
*
* It maintains a single MCP server instance but exposes two transport options:
* - /mcp: The new Streamable HTTP endpoint (supports GET/POST/DELETE)
*/
const getServer = async (accessToken: string) => {
const server = new MCPServer({
props: {
accessToken: accessToken,
instanceUrl: process.env.THOUGHTSPOT_INSTANCE_URL as string,
clientName: {
clientId: 'express-mcp-server',
clientName: 'Express MCP Server',
registrationDate: Date.now()
}
}
});
await server.init();
return server;
}
const getAccessTokenFromAuthorizationHeader = (authorizationHeader: string) => {
if (!authorizationHeader) {
throw new Error("Authorization header not found");
}
const accessToken = authorizationHeader.split(" ")[1];
if (!accessToken) {
throw new Error("Access token not found in authorization header");
}
return accessToken;
}
// Create Express application
const app = express();
app.use(express.json());
// Configure CORS to expose Mcp-Session-Id header for browser-based clients
app.use(cors({
origin: '*', // Allow all origins - adjust as needed for production
exposedHeaders: ['Mcp-Session-Id']
}));
// Store transports by session ID
const transports: Record<string, StreamableHTTPServerTransport | SSEServerTransport> = {};
//=============================================================================
// STREAMABLE HTTP TRANSPORT (PROTOCOL VERSION 2025-03-26)
//=============================================================================
// Handle all MCP Streamable HTTP requests (GET, POST, DELETE) on a single endpoint
app.all('/mcp', async (req: Request, res: Response) => {
console.log(`Received ${req.method} request to /mcp`);
try {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: StreamableHTTPServerTransport;
if (sessionId && transports[sessionId]) {
// Check if the transport is of the correct type
const existingTransport = transports[sessionId];
if (existingTransport instanceof StreamableHTTPServerTransport) {
// Reuse existing transport
transport = existingTransport;
} else {
// Transport exists but is not a StreamableHTTPServerTransport (could be SSEServerTransport)
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: Session exists but uses a different transport protocol',
},
id: null,
});
return;
}
} else if (!sessionId && req.method === 'POST' && isInitializeRequest(req.body)) {
const eventStore = new InMemoryEventStore();
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore, // Enable resumability
onsessioninitialized: (sessionId) => {
// Store the transport by session ID when session is initialized
console.log(`StreamableHTTP session initialized with ID: ${sessionId}`);
transports[sessionId] = transport;
}
});
// Set up onclose handler to clean up transport when closed
transport.onclose = () => {
const sid = transport.sessionId;
if (sid && transports[sid]) {
console.log(`Transport closed for session ${sid}, removing from transports map`);
delete transports[sid];
}
};
const accessToken = getAccessTokenFromAuthorizationHeader(req.headers['authorization'] as string);
// Connect the transport to the MCP server
const server = await getServer(accessToken);
await server.connect(transport);
} else {
// Invalid request - no session ID or not initialization request
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided',
},
id: null,
});
return;
}
// Handle the request with the transport
await transport.handleRequest(req, res, req.body);
} catch (error: any) {
console.error('Error handling MCP request:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: error?.message || 'Internal server error',
},
id: null,
});
}
}
});
app.use((err: any, req: Request, res: Response, next: Function) => {
console.error(err.stack)
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: err?.message || 'Internal server error',
},
id: null,
});
})
if (!process.env.THOUGHTSPOT_INSTANCE_URL) {
throw new Error("THOUGHTSPOT_INSTANCE_URL environment variable is not set");
}
// Start the server
const PORT = process.env.PORT ? parseInt(process.env.PORT, 10) : 3000;
app.listen(PORT, () => {
console.log(`MCP server listening on port ${PORT}`);
console.log(`
==============================================
SUPPORTED TRANSPORT OPTIONS:
1. Streamable Http(Protocol version: 2025-03-26)
Endpoint: /mcp
Methods: GET, POST, DELETE
Usage:
- Initialize with POST to /mcp
- Establish SSE stream with GET to /mcp
- Send requests with POST to /mcp
- Terminate session with DELETE to /mcp
==============================================
`);
});
// Handle server shutdown
process.on('SIGINT', async () => {
console.log('Shutting down server...');
// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId].close();
delete transports[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
console.log('Server shutdown complete');
process.exit(0);
});