Skip to main content
Glama

GitMCP

server.js21.6 kB
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import { registerTools } from "./tools/index.js"; import { storeSession, sessionExists, queueMessage, subscribeToSessionMessages, subscribeToResponse, publishResponse, getActiveSubscribers, } from "./utils/sessionStore.js"; import { parseRawBody } from "./utils/bodyParser.js"; import { Socket } from "net"; import { Readable } from "stream"; import { IncomingMessage, ServerResponse } from "http"; // Generate a unique instance ID for this serverless function const INSTANCE_ID = `instance-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`; console.info(`Server initialized with instance ID: ${INSTANCE_ID}`); // For local instances only - doesn't work across serverless invocations let activeTransports = {}; // Track session health let sessionHealthChecks = {}; // Get max duration from vercel.json config const maxDuration = 59; export default async function handler(req, res) { const requestId = Math.random().toString(36).substring(2, 15); console.info(`[${INSTANCE_ID}:${requestId}] New request: ${req.method} ${req.url}`); const adjustedUrl = new URL(req.url || "", `http://${req.headers.host}`); console.debug(`[${INSTANCE_ID}:${requestId}] Adjusted URL: ${adjustedUrl.toString()}`); if (req.method === "GET") { try { console.info(`[${INSTANCE_ID}:${requestId}] Handling GET request for SSE connection`); // Add response headers for SSE res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); console.debug(`[${INSTANCE_ID}:${requestId}] SSE headers set`); // Instantiate the MCP server. const mcp = new McpServer({ name: `MCP SSE Server for ${req.url}`, version: "1.0.0", }); console.debug(`[${INSTANCE_ID}:${requestId}] MCP server instantiated`); if (!req.headers.host) { throw new Error("Missing host header"); } // Register the "fetch_documentation" tool. registerTools(mcp, req.headers.host, req.url); console.debug(`[${INSTANCE_ID}:${requestId}] Tools registered`); // Create an SSE transport. const endpoint = "/message"; const transport = new SSEServerTransport(endpoint, res); console.debug(`[${INSTANCE_ID}:${requestId}] SSE transport created`); try { console.debug(`[${INSTANCE_ID}:${requestId}] Connecting MCP server to transport`); await mcp.connect(transport); console.info(`[${INSTANCE_ID}:${requestId}] MCP server connected to transport`); } catch (error) { console.error(`[${INSTANCE_ID}:${requestId}] Failed to connect MCP server to transport:`, error); throw error; } const sessionId = transport.sessionId; console.info(`[${INSTANCE_ID}:${requestId}] Session established: ${sessionId}`); // Store in local map (for same-instance handling) activeTransports[sessionId] = transport; console.debug(`[${INSTANCE_ID}:${requestId}] Transport stored in map. Active transports: ${Object.keys(activeTransports).length}`); // Setup context-based logging for this session // This collects logs from async operations and flushes them periodically let logs = []; // This ensures that logs in async contexts (like Redis subscribers) // are captured and logged in the proper request context function logInContext(severity, ...messages) { logs.push({ type: severity, messages: [`[${INSTANCE_ID}:${requestId}:${sessionId}]`, ...messages], }); } // Periodically flush logs to the console const logInterval = setInterval(() => { if (logs.length > 0) { for (const log of logs) { console[log.type].apply(console, log.messages); } logs = []; } }, 100); try { // Store in Redis (for cross-instance handling) logInContext("debug", `Storing session in Redis`); await storeSession(sessionId, { host: req.headers.host, userAgent: req.headers["user-agent"], createdAt: new Date().toISOString(), instanceId: INSTANCE_ID, requestId, }); logInContext("debug", `Session stored in Redis`); } catch (error) { logInContext("error", `Failed to store session in Redis:`, error); // Continue despite Redis storage failure } // Subscribe to session messages using Redis PubSub try { logInContext("debug", `Subscribing to messages for session`); const unsubscribe = await subscribeToSessionMessages(sessionId, async (request) => { try { logInContext("info", `Processing message: ${request.requestId} on instance ${INSTANCE_ID}`); // Create a fake IncomingMessage object with the stored data const fReq = createFakeIncomingMessage({ method: request.method || "POST", url: request.url || req.url, headers: request.headers || {}, body: request.body, }); const syntheticRes = new ServerResponse(fReq); let status = 200; let body = ""; // Capture the response status and body syntheticRes.writeHead = (statusCode) => { status = statusCode; return syntheticRes; }; syntheticRes.end = (b) => { body = typeof b === "string" ? b : JSON.stringify(b); return syntheticRes; }; // Process the message with the transport logInContext("debug", `Processing request ${request.requestId} on instance ${INSTANCE_ID}`); try { await transport.handlePostMessage(fReq, syntheticRes); logInContext("debug", `Transport processed message successfully: ${request.requestId}`); } catch (e) { logInContext("error", `Transport error processing message ${request.requestId}:`, e); status = 500; body = JSON.stringify({ error: e instanceof Error ? e.message : String(e), }); } // Publish the response back to Redis logInContext("debug", `Publishing response for ${request.requestId} with status ${status} from instance ${INSTANCE_ID}`); await publishResponse(sessionId, request.requestId, status, body); if (status >= 200 && status < 300) { logInContext("info", `Request ${request.requestId} succeeded with status ${status}`); } else { logInContext("error", `Request ${request.requestId} failed with status ${status}: ${body}`); } } catch (error) { logInContext("error", `Error processing message:`, error); // Publish error response try { await publishResponse(sessionId, request.requestId, 500, JSON.stringify({ error: error instanceof Error ? error.message : String(error), })); logInContext("info", `Published error response for ${request.requestId}`); } catch (pubError) { logInContext("error", `Failed to publish error response: ${pubError}`); } } }); logInContext("info", `Subscribed successfully to messages on instance ${INSTANCE_ID}. Session ID: ${sessionId}`); // Clean up when the connection closes req.on("close", async () => { logInContext("info", `SSE connection closing on instance ${INSTANCE_ID}`); clearInterval(logInterval); delete activeTransports[sessionId]; // Clean up health check if it exists if (sessionHealthChecks[sessionId]) { clearInterval(sessionHealthChecks[sessionId]); delete sessionHealthChecks[sessionId]; } if (unsubscribe) { try { await unsubscribe(); logInContext("debug", `Unsubscribed from Redis channels`); } catch (error) { logInContext("error", `Error unsubscribing from Redis channels:`, error); } } // Flush remaining logs for (const log of logs) { console[log.type].apply(console, log.messages); } console.info(`[${INSTANCE_ID}:${requestId}] SSE connection closed, sessionId: ${sessionId}`); }); } catch (error) { console.error(`[${INSTANCE_ID}:${requestId}] Failed to subscribe to messages for session ${sessionId}:`, error); throw error; } // Set up a timeout for the maximum duration of the serverless function let resolveTimeout; const waitPromise = new Promise((resolve) => { resolveTimeout = resolve; // End the connection slightly before the serverless function times out setTimeout(() => { logInContext("info", `Max duration reached (${maxDuration}s), closing connection`); resolve("max duration reached"); }, (maxDuration - 5) * 1000); }); req.on("close", () => resolveTimeout?.("client hung up")); // Wait for either timeout or client disconnect const closeReason = await waitPromise; console.info(`[${INSTANCE_ID}:${requestId}] Connection closed: ${closeReason}`); // Final cleanup clearInterval(logInterval); // Return a proper response to end the function res.status(200).end(); } catch (error) { console.error(`[${INSTANCE_ID}:${requestId}] MCP SSE Server error:`, error); try { res.write(`data: ${JSON.stringify({ error: error instanceof Error ? error.message : String(error), })}\n\n`); res.end(); } catch (writeError) { console.error(`[${INSTANCE_ID}:${requestId}] Failed to write error response:`, writeError); } } return; } // POST /message?sessionId=...: handle incoming messages. if (req.method === "POST" && adjustedUrl.pathname.endsWith("/message")) { const sessionId = adjustedUrl.searchParams.get("sessionId"); const messageTraceId = Math.random().toString(36).substring(2, 10); console.info(`[${INSTANCE_ID}:${requestId}] POST message for session ${sessionId} (trace: ${messageTraceId})`); if (!sessionId) { console.error(`[${INSTANCE_ID}:${requestId}] Missing sessionId parameter`); res.status(400).json({ error: "Missing sessionId parameter" }); return; } try { // Check if we have the transport in this instance - re-enable direct handling // if (activeTransports[sessionId]) { // // We can handle it directly in this instance // console.info(`[${INSTANCE_ID}:${requestId}] Handling POST message for session ${sessionId} directly in this instance (trace: ${messageTraceId})`); // try { // await activeTransports[sessionId].handlePostMessage(req, res); // console.info(`[${INSTANCE_ID}:${requestId}] Successfully handled direct message for session ${sessionId} (trace: ${messageTraceId})`); // return; // } catch (directError) { // console.error(`[${INSTANCE_ID}:${requestId}] Error handling direct message for ${sessionId} (trace: ${messageTraceId}):`, directError); // // Fall through to Redis handling if direct handling fails // } // } // Direct handling is explicitly disabled to diagnose Redis-based handling console.debug(`[${INSTANCE_ID}:${requestId}] Direct handling is disabled, using Redis-based message handling (trace: ${messageTraceId})`); console.debug(`[${INSTANCE_ID}:${requestId}] Checking if session ${sessionId} exists in Redis (trace: ${messageTraceId})`); const sessionValid = await sessionExists(sessionId); if (!sessionValid) { console.error(`[${INSTANCE_ID}:${requestId}] No active SSE session found for ${sessionId} (trace: ${messageTraceId})`); res .status(400) .json({ error: "No active SSE session for the provided sessionId" }); return; } // Check if there are active subscribers for this session const activeSubscribers = await getActiveSubscribers(sessionId); console.info(`[${INSTANCE_ID}:${requestId}] Session ${sessionId} has ${activeSubscribers} active subscribers (trace: ${messageTraceId})`); if (activeSubscribers === 0) { console.error(`[${INSTANCE_ID}:${requestId}] No active subscribers for session ${sessionId} (trace: ${messageTraceId})`); res.status(503).json({ error: "The session exists but has no active subscribers. The SSE connection may have been terminated.", }); return; } console.debug(`[${INSTANCE_ID}:${requestId}] Session ${sessionId} exists, parsing message body (trace: ${messageTraceId})`); const rawBody = await parseRawBody(req); const message = JSON.parse(rawBody.toString("utf8")); console.debug(`[${INSTANCE_ID}:${requestId}] Parsed message for session ${sessionId} (trace: ${messageTraceId})`); // Queue the message via Redis PubSub console.debug(`[${INSTANCE_ID}:${requestId}] Queueing message for session ${sessionId} from instance ${INSTANCE_ID} (trace: ${messageTraceId})`); const messageRequestId = await queueMessage(sessionId, message, req.headers, req.method, req.url); console.info(`[${INSTANCE_ID}:${requestId}] Message queued for session ${sessionId}, requestId: ${messageRequestId} (trace: ${messageTraceId})`); // We need to ensure we don't have concurrent requests competing for the same response // Use a flag to ensure only one response handler updates the response let hasResponded = false; // Set up a subscription to listen for a response let responseTimeout; console.debug(`[${INSTANCE_ID}:${requestId}] Setting up response subscription for ${sessionId}:${messageRequestId} (trace: ${messageTraceId})`); const unsubscribe = await subscribeToResponse(sessionId, messageRequestId, (response) => { console.info(`[${INSTANCE_ID}:${requestId}] Response received for ${sessionId}:${messageRequestId}, status: ${response.status} (trace: ${messageTraceId})`); if (responseTimeout) { clearTimeout(responseTimeout); } // Ensure we only respond once if (hasResponded) { console.warn(`[${INSTANCE_ID}:${requestId}] Already responded to client for ${messageRequestId}, skipping duplicate response (trace: ${messageTraceId})`); return; } hasResponded = true; // Return the response to the client try { res.status(response.status).send(response.body); console.debug(`[${INSTANCE_ID}:${requestId}] Response sent to client for ${sessionId}:${messageRequestId} (trace: ${messageTraceId})`); } catch (error) { console.error(`[${INSTANCE_ID}:${requestId}] Error sending response to client for ${messageRequestId} (trace: ${messageTraceId}):`, error); } // Clean up the subscription unsubscribe().catch((err) => { console.error(`[${INSTANCE_ID}:${requestId}] Error unsubscribing from response channel for ${messageRequestId} (trace: ${messageTraceId}):`, err); }); }); // Add a timeout for the response - using 10 seconds for all requests responseTimeout = setTimeout(async () => { if (hasResponded) { console.debug(`[${INSTANCE_ID}:${requestId}] Already responded for ${messageRequestId}, not sending timeout response (trace: ${messageTraceId})`); return; } hasResponded = true; console.warn(`[${INSTANCE_ID}:${requestId}] Request timed out waiting for response: ${sessionId}:${messageRequestId} (trace: ${messageTraceId})`); // Return 202 to indicate message was accepted but is still being processed res.status(202).json({ status: "accepted", message: "Message accepted but processing in another instance", requestId: messageRequestId, trace: messageTraceId, }); // Clean up the subscription after responding, but don't wait for it unsubscribe().catch((err) => { console.error(`[${INSTANCE_ID}:${requestId}] Error unsubscribing after timeout for ${messageRequestId} (trace: ${messageTraceId}):`, err); }); }, 6000); // 6 seconds for all requests // Clean up subscription when request is closed req.on("close", async () => { console.debug(`[${INSTANCE_ID}:${requestId}] Client closed connection for ${sessionId}:${messageRequestId} (trace: ${messageTraceId})`); if (responseTimeout) { clearTimeout(responseTimeout); } if (!hasResponded) { await unsubscribe().catch((err) => { console.error(`[${INSTANCE_ID}:${requestId}] Error unsubscribing on close for ${messageRequestId} (trace: ${messageTraceId}):`, err); }); } }); } catch (error) { console.error(`[${INSTANCE_ID}:${requestId}] Error handling POST message (trace: ${messageTraceId}):`, error); res.status(500).json({ error: error instanceof Error ? error.message : String(error), }); } return; } console.error(`[${INSTANCE_ID}:${requestId}] Not found: ${req.method} ${req.url}`); res.status(404).end("Not found"); } // Create a fake IncomingMessage function createFakeIncomingMessage(options = {}) { const { method = "GET", url = "/", headers = {}, body = null, socket = new Socket(), } = options; // Create a readable stream that will be used as the base for IncomingMessage const readable = new Readable(); readable._read = () => { }; // Required implementation // Add the body content if provided if (body) { if (typeof body === "string") { readable.push(body); } else if (Buffer.isBuffer(body)) { readable.push(body); } else { readable.push(JSON.stringify(body)); } readable.push(null); // Signal the end of the stream } // Create the IncomingMessage instance const req = new IncomingMessage(socket); // Set the properties req.method = method; req.url = url; req.headers = headers; // Copy over the stream methods req.push = readable.push.bind(readable); req.read = readable.read.bind(readable); req.on = readable.on.bind(readable); req.pipe = readable.pipe.bind(readable); return req; } //# sourceMappingURL=server.js.map

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/idosal/git-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server