Skip to main content
Glama

Google Cloud MCP Server

by andyl25
http.js12.8 kB
/** * HTTP Transport for MCP Server * * Implements the Streamable HTTP transport as defined in the MCP specification: * https://modelcontextprotocol.io/docs/concepts/transports#streamable-http */ import express from 'express'; import { randomUUID } from 'crypto'; /** * HTTP Transport implementation for MCP Server using Streamable HTTP */ export class HttpTransport { app; sessions = new Map(); sseConnections = new Map(); pendingRequests = new Map(); port; httpServer; eventIdCounter = 0; // Transport interface callbacks onclose; onerror; onmessage; constructor(port = 3000) { this.port = port; this.app = express(); this.setupMiddleware(); this.setupRoutes(); } /** * Setup Express middleware */ setupMiddleware() { // Parse JSON bodies this.app.use(express.json()); // CORS middleware this.app.use((req, res, next) => { res.header('Access-Control-Allow-Origin', '*'); res.header('Access-Control-Allow-Methods', 'GET, POST, DELETE, OPTIONS'); res.header('Access-Control-Allow-Headers', 'Content-Type, Mcp-Session-Id, Last-Event-ID, Accept'); if (req.method === 'OPTIONS') { res.sendStatus(200); return; } next(); }); // Security middleware - validate Origin header to prevent DNS rebinding attacks this.app.use((req, res, next) => { const origin = req.headers.origin; if (origin && !this.isValidOrigin(origin)) { res.status(403).json({ error: 'Forbidden origin' }); return; } next(); }); } /** * Validate origin header to prevent DNS rebinding attacks */ isValidOrigin(origin) { try { const url = new URL(origin); // Allow localhost and 127.0.0.1 return url.hostname === 'localhost' || url.hostname === '127.0.0.1' || url.hostname.endsWith('.local'); } catch { return false; } } /** * Setup Express routes */ setupRoutes() { // Main MCP endpoint - handles both POST and GET this.app.post('/mcp', this.handleMcpPost.bind(this)); this.app.get('/mcp', this.handleMcpGet.bind(this)); this.app.delete('/mcp', this.handleMcpDelete.bind(this)); // Health check endpoint this.app.get('/health', (req, res) => { res.json({ status: 'ok', timestamp: new Date().toISOString() }); }); } /** * Handle POST requests to /mcp endpoint */ async handleMcpPost(req, res) { try { const sessionId = req.headers['mcp-session-id']; const acceptHeader = req.headers.accept || 'application/json'; // Validate request body if (!req.body || typeof req.body !== 'object') { res.status(400).json({ jsonrpc: '2.0', error: { code: -32700, message: 'Parse error' }, id: null }); return; } const jsonrpcMessage = req.body; // Handle session management if (this.isInitializeRequest(jsonrpcMessage)) { const newSessionId = randomUUID(); this.sessions.set(newSessionId, { id: newSessionId, createdAt: new Date(), lastActivity: new Date() }); res.setHeader('Mcp-Session-Id', newSessionId); } else if (sessionId) { // Update session activity const session = this.sessions.get(sessionId); if (session) { session.lastActivity = new Date(); } } // Handle the request if ('id' in jsonrpcMessage && jsonrpcMessage.id !== undefined) { // This is a request, wait for response const requestId = jsonrpcMessage.id; // Create promise to wait for response const responsePromise = new Promise((resolve, reject) => { const timeout = setTimeout(() => { this.pendingRequests.delete(requestId); reject(new Error('Request timeout')); }, 30000); this.pendingRequests.set(requestId, { id: requestId, resolve, reject, timeout }); }); // Forward the message to the MCP server if (this.onmessage) { this.onmessage(jsonrpcMessage); } try { const response = await responsePromise; // Determine if we need streaming based on Accept header const needsStreaming = acceptHeader.includes('text/event-stream'); if (needsStreaming && this.shouldStream(response)) { // Send SSE stream res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); this.sendSSEMessage(res, response); // Keep connection alive if needed if (sessionId) { this.setupSSEConnection(sessionId, res); } else { res.end(); } } else { // Send single JSON response res.json(response); } } catch (error) { throw error; } } else { // This is a notification, no response expected if (this.onmessage) { this.onmessage(jsonrpcMessage); } res.status(200).json({ status: 'ok' }); } } catch (error) { const errorResponse = { jsonrpc: '2.0', error: { code: -32603, message: 'Internal error', data: error instanceof Error ? error.message : String(error) }, id: null }; res.status(500).json(errorResponse); } } /** * Handle GET requests to /mcp endpoint (for server-initiated SSE streams) */ handleMcpGet(req, res) { const sessionId = req.headers['mcp-session-id']; const lastEventId = req.headers['last-event-id']; if (!sessionId || !this.sessions.has(sessionId)) { res.status(400).json({ error: 'Invalid or missing session ID' }); return; } // Setup SSE stream res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); // If client is resuming, we could replay messages here if (lastEventId) { // Implementation for message replay would go here // For now, we'll just acknowledge the resume this.sendSSEMessage(res, { type: 'resume', lastEventId }, 'resume'); } this.setupSSEConnection(sessionId, res); } /** * Handle DELETE requests to /mcp endpoint (session termination) */ handleMcpDelete(req, res) { const sessionId = req.headers['mcp-session-id']; if (sessionId) { this.sessions.delete(sessionId); // Close any SSE connections for this session const sseConn = this.sseConnections.get(sessionId); if (sseConn) { clearInterval(sseConn.keepAlive); sseConn.response.end(); this.sseConnections.delete(sessionId); } } res.sendStatus(200); } /** * Setup SSE connection management */ setupSSEConnection(sessionId, res) { // Clean up any existing connection const existingConn = this.sseConnections.get(sessionId); if (existingConn) { clearInterval(existingConn.keepAlive); } // Setup keep-alive const keepAlive = setInterval(() => { try { res.write(': keep-alive\n\n'); } catch (error) { // Connection closed clearInterval(keepAlive); this.sseConnections.delete(sessionId); } }, 30000); // Store connection this.sseConnections.set(sessionId, { sessionId, response: res, keepAlive, lastEventId: this.eventIdCounter }); // Handle connection close res.on('close', () => { clearInterval(keepAlive); this.sseConnections.delete(sessionId); }); } /** * Send SSE message */ sendSSEMessage(res, data, eventType = 'message', id) { if (eventType !== 'keep-alive') { res.write(`event: ${eventType}\n`); } const eventId = id || String(++this.eventIdCounter); res.write(`id: ${eventId}\n`); res.write(`data: ${JSON.stringify(data)}\n\n`); } /** * Check if message is an initialize request */ isInitializeRequest(message) { return 'method' in message && message.method === 'initialize'; } /** * Determine if response should be streamed */ shouldStream(response) { // Implement logic to determine if response should be streamed // For now, we'll stream for certain types of responses return false; // Default to no streaming } // Transport interface methods /** * Start the HTTP server */ async start() { return new Promise((resolve, reject) => { try { this.httpServer = this.app.listen(this.port, '127.0.0.1', () => { console.error(`[INFO] HTTP transport listening on http://127.0.0.1:${this.port}/mcp`); resolve(); }); this.httpServer.on('error', (error) => { reject(error); }); } catch (error) { reject(error); } }); } /** * Send a message (implements Transport interface) */ async send(message) { // If this is a response to a pending request, resolve the promise if ('id' in message && message.id !== undefined) { const pending = this.pendingRequests.get(message.id); if (pending) { clearTimeout(pending.timeout); this.pendingRequests.delete(message.id); pending.resolve(message); return; } } // For server-initiated messages, send to all active SSE connections for (const [sessionId, conn] of this.sseConnections) { try { this.sendSSEMessage(conn.response, message, 'message'); } catch (error) { // Connection may have been closed this.sseConnections.delete(sessionId); } } } /** * Close the transport */ async close() { // Clear all pending requests for (const [id, pending] of this.pendingRequests) { clearTimeout(pending.timeout); pending.reject(new Error('Transport closed')); } this.pendingRequests.clear(); // Close all SSE connections for (const [sessionId, conn] of this.sseConnections) { clearInterval(conn.keepAlive); conn.response.end(); } this.sseConnections.clear(); // Clear sessions this.sessions.clear(); // Close HTTP server if (this.httpServer) { return new Promise((resolve) => { this.httpServer.close(() => { resolve(); }); }); } // Call onclose callback if (this.onclose) { this.onclose(); } } } //# sourceMappingURL=http.js.map

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/andyl25/googlecloud-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server