/**
* Streamable HTTP Transport for MCP Server (MCP 2025-11-25 Spec)
*
* Uses the official SDK StreamableHTTPServerTransport with:
* - Session management via Mcp-Session-Id header
* - Server-Sent Events (SSE) for streaming
* - Event store for resumability
* - OAuth 2.1 protected resource metadata
* - Health check endpoints (liveness/readiness)
* - DNS rebinding protection via SDK middleware
*/
import express, { Express, Request, Response, NextFunction } from 'express';
import { randomUUID } from 'node:crypto';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { createMcpExpressApp } from '@modelcontextprotocol/sdk/server/express.js';
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js';
import type { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { createLogger } from '../shared/logger.js';
import { getConfig } from '../config/index.js';
import { mountHealthEndpoints, registerHealthCheck, CommonHealthChecks } from '../shared/health.js';
import { JwtValidator, extractBearerToken, createJwtValidatorFromEnv } from '../shared/jwt.js';
import { AuthenticationError } from '../shared/errors.js';
const logger = createLogger('http-transport');
const config = getConfig();
/**
* In-memory event store for resumability
*/
class InMemoryEventStore {
private events: Map<string, Array<{ eventId: string; message: unknown }>> = new Map();
private eventIdCounter = 0;
async storeEvent(streamId: string, message: unknown): Promise<string> {
const eventId = `evt_${++this.eventIdCounter}`;
const events = this.events.get(streamId) ?? [];
events.push({ eventId, message });
this.events.set(streamId, events);
return eventId;
}
async replayEventsAfter(
lastEventId: string,
{ send }: { send: (eventId: string, message: unknown) => Promise<void> }
): Promise<string> {
// Find the stream containing this event
for (const [streamId, events] of this.events) {
const lastIndex = events.findIndex((e) => e.eventId === lastEventId);
if (lastIndex !== -1) {
// Replay events after the last seen event
for (let i = lastIndex + 1; i < events.length; i++) {
const event = events[i];
if (event) {
await send(event.eventId, event.message);
}
}
return streamId;
}
}
return '';
}
async getStreamIdForEventId(eventId: string): Promise<string | undefined> {
for (const [streamId, events] of this.events) {
if (events.some((e) => e.eventId === eventId)) {
return streamId;
}
}
return undefined;
}
}
/**
* Store for active transports by session ID
*/
const transports = new Map<string, StreamableHTTPServerTransport>();
/**
* OAuth 2.1 Protected Resource Metadata (RFC 9728)
*/
interface ProtectedResourceMetadata {
resource: string;
authorization_servers: string[];
scopes_supported: string[];
bearer_methods_supported: string[];
}
/**
* Create the protected resource metadata
*/
function getProtectedResourceMetadata(baseUrl: string): ProtectedResourceMetadata {
return {
resource: baseUrl,
authorization_servers: [
process.env['MCP_SERVER_AUTH_SERVER'] ?? 'https://auth.example.com',
],
scopes_supported: ['read', 'write', 'admin'],
bearer_methods_supported: ['header'],
};
}
/**
* JWT authentication middleware
*/
function createJwtAuthMiddleware(
jwtValidator: JwtValidator | null
): (req: Request, res: Response, next: NextFunction) => Promise<void> {
return async (req: Request, res: Response, next: NextFunction): Promise<void> => {
// Skip auth in development mode without token
const authHeader = req.get('Authorization');
if (config.debugMode && !authHeader) {
next();
return;
}
// Require auth if validator is configured
if (!jwtValidator) {
next();
return;
}
const token = extractBearerToken(authHeader);
if (!token) {
const resourceMetadata = `${req.protocol}://${req.get('host') ?? 'localhost'}/.well-known/oauth-protected-resource`;
res.set('WWW-Authenticate', `Bearer resource_metadata="${resourceMetadata}"`);
res.status(401).json({ error: 'Authorization required' });
return;
}
try {
const claims = await jwtValidator.validate(token);
// Attach claims to request for downstream use
(req as Request & { auth?: unknown }).auth = claims;
next();
} catch (error) {
if (error instanceof AuthenticationError) {
res.status(401).json({ error: error.message });
return;
}
logger.error('JWT validation error', { error: String(error) });
res.status(401).json({ error: 'Invalid token' });
}
};
}
/**
* Create Express app for Streamable HTTP transport
*/
export function createHttpTransport(options?: {
allowedHosts?: string[];
requireAuth?: boolean;
mcpServer?: Server;
}): Express {
// Use SDK's createMcpExpressApp for DNS rebinding protection
const app = options?.allowedHosts
? createMcpExpressApp({ allowedHosts: options.allowedHosts })
: createMcpExpressApp();
// Parse JSON bodies
app.use(express.json());
// Mount health check endpoints
mountHealthEndpoints(app);
// Register default health checks
registerHealthCheck('memory', CommonHealthChecks.memory(512));
registerHealthCheck('eventLoop', CommonHealthChecks.eventLoop(100));
// OAuth Protected Resource Metadata (RFC 9728)
app.get('/.well-known/oauth-protected-resource', (req: Request, res: Response) => {
const baseUrl = `${req.protocol}://${req.get('host') ?? 'localhost'}`;
res.json(getProtectedResourceMetadata(baseUrl));
});
// MCP endpoint discovery (MCP 2025-11-25)
app.get('/.well-known/mcp', (_req: Request, res: Response) => {
res.json({
name: config.serverName,
version: config.serverVersion,
protocol_version: '2025-11-25',
capabilities: {
tools: {},
resources: {},
prompts: {},
},
});
});
// Set up JWT validation if configured
const jwtValidator = options?.requireAuth ? createJwtValidatorFromEnv() : null;
const authMiddleware = createJwtAuthMiddleware(jwtValidator);
// MCP POST endpoint - handles JSON-RPC requests
const mcpPostHandler = async (req: Request, res: Response): Promise<void> => {
const sessionId = req.get('mcp-session-id');
try {
let transport: StreamableHTTPServerTransport;
if (sessionId && transports.has(sessionId)) {
// Reuse existing transport for session
transport = transports.get(sessionId)!;
} else if (!sessionId && isInitializeRequest(req.body)) {
// New initialization request - create transport with resumability
const eventStore = new InMemoryEventStore();
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore,
onsessioninitialized: (newSessionId: string) => {
logger.info('Session initialized', { sessionId: newSessionId });
transports.set(newSessionId, transport);
},
onsessionclosed: (closedSessionId: string) => {
logger.info('Session closed', { sessionId: closedSessionId });
transports.delete(closedSessionId);
},
});
// Set up close handler
transport.onclose = () => {
const sid = transport.sessionId;
if (sid && transports.has(sid)) {
logger.debug('Transport closed, removing from map', { sessionId: sid });
transports.delete(sid);
}
};
// Connect to MCP server if provided
if (options?.mcpServer) {
await options.mcpServer.connect(transport);
}
await transport.handleRequest(req, res, req.body);
return;
} else {
// Invalid request
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided',
},
id: null,
});
return;
}
// Handle request with existing transport
await transport.handleRequest(req, res, req.body);
} catch (error) {
logger.error('Error handling MCP request', { error: String(error) });
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error',
},
id: null,
});
}
}
};
// MCP GET endpoint - SSE stream
const mcpGetHandler = async (req: Request, res: Response): Promise<void> => {
const sessionId = req.get('mcp-session-id');
if (!sessionId || !transports.has(sessionId)) {
res.status(400).json({ error: 'Invalid or missing session ID' });
return;
}
const lastEventId = req.get('last-event-id');
if (lastEventId) {
logger.debug('Client reconnecting with Last-Event-ID', { sessionId, lastEventId });
}
const transport = transports.get(sessionId)!;
await transport.handleRequest(req, res);
};
// MCP DELETE endpoint - session termination
const mcpDeleteHandler = async (req: Request, res: Response): Promise<void> => {
const sessionId = req.get('mcp-session-id');
if (!sessionId || !transports.has(sessionId)) {
res.status(400).json({ error: 'Invalid or missing session ID' });
return;
}
logger.info('Session termination requested', { sessionId });
try {
const transport = transports.get(sessionId)!;
await transport.handleRequest(req, res);
} catch (error) {
logger.error('Error handling session termination', { error: String(error) });
if (!res.headersSent) {
res.status(500).json({ error: 'Error processing session termination' });
}
}
};
// Register routes with optional auth
if (options?.requireAuth) {
app.post('/mcp', authMiddleware, mcpPostHandler);
app.get('/mcp', authMiddleware, mcpGetHandler);
app.delete('/mcp', authMiddleware, mcpDeleteHandler);
} else {
app.post('/mcp', mcpPostHandler);
app.get('/mcp', mcpGetHandler);
app.delete('/mcp', mcpDeleteHandler);
}
return app;
}
/**
* Get active session count
*/
export function getSessionCount(): number {
return transports.size;
}
/**
* Close all active transports (for shutdown)
*/
export async function closeAllTransports(): Promise<void> {
const closePromises: Promise<void>[] = [];
for (const [sessionId, transport] of transports) {
logger.debug('Closing transport', { sessionId });
closePromises.push(
transport.close().catch((error) => {
logger.error('Error closing transport', { sessionId, error: String(error) });
})
);
}
await Promise.all(closePromises);
transports.clear();
logger.info('All transports closed');
}
/**
* Start session cleanup task (cleanup handled by transport callbacks)
*/
export function startSessionCleanup(): NodeJS.Timeout {
// The SDK handles session cleanup via onsessionclosed callback
// This is kept for API compatibility but does minimal work
return setInterval(() => {
logger.debug('Session count', { active: transports.size });
}, 5 * 60 * 1000);
}
/**
* Clear all sessions (for testing)
*/
export function clearSessions(): void {
transports.clear();
}