import express, { Response } from "express";
import { randomUUID } from "crypto";
import { StreamableHTTPServerTransport }
from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { createMcpServer } from "../server/createMcpServer.js";
import { transport } from "../../hub.js";
import { logReq, logRes } from "../../hub.js";
import { activeSessions, sessionServers, sessionMcpServers } from "../../hub.js";
import { updateActiveConnections } from "../../hub.js";
import { triggerAttachedResources } from "../resource/triggerAttachedResources.js";
export const handleMcpRequest = (endpoint: string) => (req: express.Request, res: express.Response) => {
const start = Date.now();
const id = req.body?.id ?? "null";
const method = req.body?.method ?? req.method;
let sessionId = req.headers['mcp-session-id'] || req.headers['x-session-id'];
if (!sessionId) {
sessionId = randomUUID();
req.headers['mcp-session-id'] = sessionId;
console.error(`[${endpoint}] Generated session ID for client: ${sessionId}`);
}
updateActiveConnections(+1);
let activeConnections = updateActiveConnections(0);
logReq(`← id=${id}\t${method}\tsession=${sessionId}\tactive=${activeConnections}\t${endpoint}`);
// Track tool calls and trigger resources
if (method === 'tools/call' && req.body?.params) {
const toolName = req.body.params.name;
const toolArgs = req.body.params.arguments || {};
console.error(`[${endpoint}] Tool call detected: ${toolName} in session: ${sessionId}`);
// Update session with tool usage
if (activeSessions.has(String(sessionId))) {
const session = activeSessions.get(String(sessionId))!;
session.toolsUsed.push(toolName);
session.lastActivity = Date.now();
activeSessions.set(String(sessionId), session);
}
// Store original response methods
const originalWrite = res.write.bind(res);
const originalEnd = res.end.bind(res);
// Track if we've already processed the response
let responseProcessed = false;
// Override end method to capture response data
res.end = function(chunk?: any, encoding?: any, cb?: any): Response {
if (responseProcessed) {
return originalEnd(chunk, encoding, cb);
}
responseProcessed = true;
// Check if the tool call was successful
let success = true;
let result = null;
try {
if (chunk) {
const responseData = JSON.parse(chunk.toString());
success = !responseData.error && !responseData.result?.isError;
result = responseData.result;
if (success && activeSessions.has(String(sessionId))) {
const session = activeSessions.get(String(sessionId))!;
session.successfulTasks++;
activeSessions.set(String(sessionId), session);
console.error(`[${endpoint}] Updated session ${sessionId}: toolsUsed=${session.toolsUsed.length}, successfulTasks=${session.successfulTasks}`);
}
}
} catch (e) {
console.error(`[${endpoint}] Error parsing response for resource trigger:`, e);
}
// Trigger attached resources after tool execution
triggerAttachedResources(String(sessionId), toolName, toolArgs, result, success)
.catch(error => console.error(`[${endpoint}] Resource trigger failed:`, error));
// Call original end method
return originalEnd(chunk, encoding, cb);
};
}
res.on("finish", () => {
updateActiveConnections(-1);
let activeConnections = updateActiveConnections(0);
const took = Date.now() - start;
const tag = res.statusCode >= 400 ? "ERR" : "OK";
logRes(`→ id=${id}\t${tag} (${took} ms)\tactive=${activeConnections}\t${endpoint}`);
if (res.statusCode >= 400) {
console.error(`[${endpoint}] Error ${res.statusCode} for session ${sessionId}, method: ${method}`);
}
});
// For initialize requests, create a new server instance for this session
if (req.body?.method === 'initialize') {
// Create new server instance for this session
createMcpServer(String(sessionId)).then(sessionServer => {
const sessionTransport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => String(sessionId), // Ensure it's a string
});
// Connect the session-specific server
return sessionServer.connect(sessionTransport).then(() => {
console.error(`[${endpoint}] Created dedicated server for session: ${sessionId}`);
// Store the session-specific transport and server
sessionServers.set(String(sessionId), sessionTransport);
sessionMcpServers.set(String(sessionId), sessionServer);
// Track session with proper initialization
activeSessions.set(String(sessionId), {
startTime: Date.now(),
lastActivity: Date.now(),
clientInfo: req.body.params?.clientInfo || { name: 'Unknown Client', version: '1.0.0' },
capabilities: req.body.params?.capabilities || {},
toolsUsed: [],
successfulTasks: 0,
achievements: []
});
// Handle the request with the session-specific transport
sessionTransport.handleRequest(req, res, req.method === "POST" ? req.body : undefined);
});
}).catch((err: any) => {
console.error(`[${endpoint}] Failed to create session server:`, err);
res.status(500).json({
jsonrpc: "2.0",
error: { code: -32000, message: "Failed to initialize session" },
id: req.body?.id
});
});
return; // Exit early for initialize requests
}
// For non-initialize requests, use the session-specific transport if available
const sessionTransport = sessionServers.get(String(sessionId));
console.log("sessionTransport =", sessionTransport);
if (sessionTransport) {
// Update session activity
if (activeSessions.has(String(sessionId))) {
const session = activeSessions.get(String(sessionId))!;
session.lastActivity = Date.now();
activeSessions.set(String(sessionId), session);
}
// Use the session-specific transport
sessionTransport.handleRequest(req, res, req.method === "POST" ? req.body : undefined);
} else {
// Session transport not found - clean up stale session state
if (activeSessions.has(String(sessionId))) {
console.error(`[${endpoint}] Cleaning up stale session: ${sessionId}`);
activeSessions.delete(String(sessionId));
sessionServers.delete(String(sessionId));
}
// For non-GET requests, try to recover by treating as a new session
if (req.method === 'POST' && req.body) {
console.error(`[${endpoint}] Attempting session recovery for ${sessionId}`);
// Create new session server and transport
createMcpServer(String(sessionId)).then(sessionServer => {
const newSessionTransport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => String(sessionId),
});
// Connect and handle the request
return sessionServer.connect(newSessionTransport).then(() => {
console.error(`[${endpoint}] Recovered session: ${sessionId}`);
// Store the new session-specific transport
sessionServers.set(String(sessionId), newSessionTransport);
// Track the recovered session
activeSessions.set(String(sessionId), {
startTime: Date.now(),
lastActivity: Date.now(),
clientInfo: { name: 'Recovered Client', version: '1.0.0' },
capabilities: {},
toolsUsed: [],
successfulTasks: 0,
achievements: []
});
// Handle the request with the new transport
newSessionTransport.handleRequest(req, res, req.method === "POST" ? req.body : undefined);
});
}).catch((err: any) => {
console.error(`[${endpoint}] Failed to recover session:`, err);
// Fallback to main transport as last resort
transport.handleRequest(req, res, req.method === "POST" ? req.body : undefined);
});
} else {
// For GET requests or other non-POST requests, return proper error response
console.error(`[${endpoint}] No session transport found for ${sessionId}, method: ${method}`);
if (req.method === 'GET') {
// VS Code expects a proper error response for GET requests
res.status(404).json({
error: 'Session not found. Please initialize a new session.',
sessionId: sessionId,
method: method
});
} else {
// For other non-POST requests, use main transport
transport.handleRequest(req, res, req.method === "POST" ? req.body : undefined);
}
}
}
};