Skip to main content
Glama

Letta MCP Server

by oculairmedia
sse-transport.js12.2 kB
import express from 'express'; import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; import { createLogger } from '../core/logger.js'; /** * Run the server using SSE transport * @param {Object} server - The LettaServer instance */ export async function runSSE(server) { const logger = createLogger('sse-transport'); try { const app = express(); let transport; let isConnected = false; let reconnectAttempts = 0; let lastClientId = null; const maxReconnectAttempts = 10; const reconnectDelay = 2000; // 2 seconds initial delay const activeConnections = new Map(); // Track active connections // Generate a unique client ID const generateClientId = () => { return `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; }; // Function to handle connection and reconnection const connectTransport = async (req, res) => { const clientIp = req.headers['x-forwarded-for'] || req.socket.remoteAddress; const clientId = generateClientId(); lastClientId = clientId; try { logger.info(`Establishing SSE transport for client ${clientId} (${clientIp})`); transport = new SSEServerTransport('/message', res); // Store connection info activeConnections.set(clientId, { transport, ip: clientIp, connectedAt: new Date(), req, res, }); await server.server.connect(transport); isConnected = true; reconnectAttempts = 0; logger.info(`SSE transport connected successfully for client ${clientId}`); return { success: true, clientId }; } catch (error) { logger.error(`Failed to connect SSE transport for client ${clientId}:`, error); activeConnections.delete(clientId); isConnected = false; return { success: false, clientId }; } }; // Function to handle reconnection with exponential backoff const attemptReconnect = async (req, res) => { if (reconnectAttempts >= maxReconnectAttempts) { logger.error( `Maximum reconnection attempts (${maxReconnectAttempts}) reached. Giving up.`, ); return { success: false }; } reconnectAttempts++; const delay = reconnectDelay * Math.pow(1.5, reconnectAttempts - 1); logger.info( `Attempting to reconnect (${reconnectAttempts}/${maxReconnectAttempts}) in ${delay}ms...`, ); return new Promise((resolve) => { setTimeout(async () => { try { const result = await connectTransport(req, res); logger.info( `Reconnection attempt ${reconnectAttempts} ${result.success ? 'successful' : 'failed'} for client ${result.clientId}`, ); resolve(result); } catch (error) { logger.error( `Reconnection attempt ${reconnectAttempts} failed with error:`, error, ); resolve({ success: false }); } }, delay); }); }; // Function to clean up a connection const cleanupConnection = async (clientId) => { if (activeConnections.has(clientId)) { activeConnections.get(clientId); logger.info(`Cleaning up connection for client ${clientId}`); try { // Remove from active connections activeConnections.delete(clientId); // Log connection stats logger.info(`Connection stats: ${activeConnections.size} active connections`); } catch (error) { logger.error(`Error cleaning up connection for client ${clientId}:`, error); } } }; app.get('/sse', async (req, res) => { const clientIp = req.headers['x-forwarded-for'] || req.socket.remoteAddress; const wasDisconnected = !isConnected && transport !== undefined; if (wasDisconnected) { logger.info( `Received SSE connection request from ${clientIp} after previous disconnection`, ); logger.info('Attempting to reconnect (previous connection was lost)'); } else { logger.info(`Received new SSE connection request from ${clientIp}`); } // Set headers for SSE res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); // Connect transport const result = await connectTransport(req, res); if (!result.success) { logger.info('Initial connection failed, attempting to reconnect...'); await attemptReconnect(req, res); } else if (wasDisconnected) { logger.info( `Successfully reconnected after previous disconnection (client ${result.clientId})`, ); } // Store the client ID for cleanup const clientId = result.clientId; // Send initial ping to confirm connection try { res.write(': connected\n\n'); } catch (error) { logger.error(`Error sending initial ping to client ${clientId}:`, error); } req.on('close', async () => { logger.info(`SSE connection closed for client ${clientId}`); isConnected = false; // Clean up the connection await cleanupConnection(clientId); // Only attempt to reconnect if the server is still running if (server.server && !server.server.closed) { logger.info( 'Connection lost, waiting for new client connection to reconnect...', ); // We can't use the closed response object for reconnection // Instead, we'll set a flag to indicate we should try to reconnect on the next connection reconnectAttempts = 0; // Reset for next connection attempt } }); server.server.onclose = async () => { logger.info('Server closing...'); isConnected = false; await server.server.close(); }; }); app.post('/message', async (req, res) => { try { logger.info('Received message'); if (!transport || !isConnected) { logger.error('No active SSE connection'); res.status(503).json({ error: 'No active SSE connection' }); return; } await transport.handlePostMessage(req, res); } catch (error) { logger.error('Error handling message:', error); // If error is related to connection, mark as disconnected if ( error.message && (error.message.includes('connection') || error.message.includes('transport') || error.message.includes('closed')) ) { isConnected = false; logger.info('Connection error detected, marking as disconnected'); logger.info('Will attempt to reconnect on next client connection'); // Reset reconnect attempts for next connection reconnectAttempts = 0; // If we have a last client ID, clean it up if (lastClientId) { await cleanupConnection(lastClientId); } } res.status(500).json({ error: 'Internal server error' }); } }); // Add a health check endpoint app.get('/health', (req, res) => { res.json({ status: 'ok', connected: isConnected, activeConnections: activeConnections.size, reconnectAttempts, maxReconnectAttempts, uptime: process.uptime(), }); }); // Set up a ping interval to keep connections alive const pingInterval = setInterval(() => { if (activeConnections.size > 0) { logger.info(`Sending ping to ${activeConnections.size} active connections`); for (const [clientId, connection] of activeConnections.entries()) { try { if (connection.res && !connection.res.finished) { connection.res.write(': ping\n\n'); } } catch (error) { logger.error(`Error sending ping to client ${clientId}:`, error); isConnected = false; cleanupConnection(clientId); } } } }, 30000); // Send ping every 30 seconds const PORT = process.env.PORT || 3001; const httpServer = app.listen(PORT, () => { logger.info(`Letta SSE server is running on port ${PORT}`); logger.info(`API credentials: ${server.apiBase ? 'Available' : 'Not available'}`); logger.info( `Reconnection enabled: max attempts=${maxReconnectAttempts}, initial delay=${reconnectDelay}ms`, ); logger.info('Connection tracking: enabled with ping interval (30s)'); }); const cleanup = async () => { logger.info('Starting cleanup process...'); // Clear the ping interval if (pingInterval) { logger.info('Clearing ping interval'); clearInterval(pingInterval); } // Clean up all active connections logger.info(`Cleaning up ${activeConnections.size} active connections`); for (const [clientId] of activeConnections.entries()) { try { logger.info(`Closing connection for client ${clientId}`); activeConnections.delete(clientId); } catch (error) { logger.error(`Error cleaning up connection for client ${clientId}:`, error); } } // Close the HTTP server if (httpServer) { logger.info('Closing HTTP server...'); httpServer.close(); } // Close the MCP server if (server.server) { logger.info('Closing MCP server...'); await server.server.close(); } logger.info('Cleanup complete, exiting process'); if (process.env.NODE_ENV !== 'test') { process.exit(0); } }; process.on('SIGINT', cleanup); process.on('SIGTERM', cleanup); process.on('uncaughtException', async (error) => { logger.error('Uncaught exception:', error); await cleanup(); }); // Return the server instance for testing return httpServer; } catch (err) { const error = err instanceof Error ? err : new Error(String(err)); logger.error('Failed to start SSE server:', error); if (process.env.NODE_ENV !== 'test') { process.exit(1); } throw error; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/oculairmedia/Letta-MCP-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server