Skip to main content
Glama

myAI Memory Sync

by Jktfe
direct-wrapper.js•22.9 kB
#!/usr/bin/env node /** * Wrapper script for Direct MCP server * * This script runs the Direct MCP server as a child process and filters its output * to ensure that only valid JSON-RPC messages are sent to stdout while all * other output is redirected to stderr. This prevents Claude Desktop from * encountering JSON parsing errors. * * Special handling for Claude Desktop's connection pattern: * Claude Desktop has a unique behavior where it: * 1. Connects and sends an initialize request * 2. Receives the initialize response * 3. Immediately closes the transport * 4. Sends a SIGTERM signal * 5. Attempts to reconnect shortly after * * This wrapper is designed to handle this pattern gracefully. */ import { spawn } from 'node:child_process'; import { createInterface } from 'node:readline'; import { join, dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; // Get current directory const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); // Path to the actual server script const serverPath = join(__dirname, 'direct-server.js'); // Debug mode const DEBUG = process.env.DEBUG === 'true'; // Configuration const AUTO_SHUTDOWN_TIMEOUT = 5 * 60 * 1000; // 5 minutes of inactivity before shutdown const KEEP_ALIVE_INTERVAL = 15 * 1000; // 15 seconds between keep-alive pings const RECONNECT_GRACE_PERIOD = 10 * 1000; // 10 seconds grace period for reconnects const SIGNAL_IGNORE_PERIOD = 5 * 1000; // 5 seconds to ignore signals after initialize // Log to stderr function log(message) { process.stderr.write(`[direct-wrapper] ${message}\n`); } function debug(message) { if (DEBUG) { process.stderr.write(`[direct-wrapper:debug] ${message}\n`); } } // Log that we're starting log(`Starting direct server from ${serverPath}`); // Keep track of the server process let serverProcess = null; let isShuttingDown = false; let pendingRequests = new Map(); // Track pending requests by ID let lastInitializeResponse = null; // Store the last initialize response let lastInitializeTime = 0; // Track when the last initialize response was sent let ignoreSignalsTimeout = null; // Timeout to ignore signals after initialize let clientStdinClosed = false; // Track if client stdin has closed let reconnectAttemptTimeout = null; // Timeout for reconnect attempts let keepAliveInterval = null; // Interval for sending keep-alive pings to client let lastToolsResponse = null; // Store the last listTools response let claudeDesktopMode = true; // Special mode for Claude Desktop's connection pattern let lastActivityTime = Date.now(); // Track last activity time let inactivityTimeout = null; // Timeout for auto-shutdown after inactivity // Function to start the server process function startServer() { // Spawn the server process with environment variables serverProcess = spawn('node', [serverPath], { stdio: ['pipe', 'pipe', 'pipe'], env: { ...process.env, MCP_WRAPPER_MODE: 'true', NODE_OPTIONS: '--trace-warnings', DEBUG: DEBUG ? 'true' : 'false' } }); // Create readline interfaces for stdout and stderr const stdoutRl = createInterface({ input: serverProcess.stdout, terminal: false }); const stderrRl = createInterface({ input: serverProcess.stderr, terminal: false }); // Handle stdout from the server stdoutRl.on('line', (line) => { try { // Skip empty lines if (!line.trim()) { return; } // Update last activity time lastActivityTime = Date.now(); resetInactivityTimeout(); // Try to parse as JSON to validate try { const parsed = JSON.parse(line); // Only forward valid JSON-RPC messages if (parsed && typeof parsed === 'object' && parsed.jsonrpc === '2.0') { debug(`Forwarding server response: ${line.substring(0, 100)}${line.length > 100 ? '...' : ''}`); // Store initialize response for potential reconnects if (parsed.result && parsed.result.serverInfo) { lastInitializeResponse = line; lastInitializeTime = Date.now(); debug('Stored initialize response for future reconnects'); // Set a timeout to ignore SIGTERM signals for a short period after initialize if (ignoreSignalsTimeout) { clearTimeout(ignoreSignalsTimeout); } ignoreSignalsTimeout = setTimeout(() => { debug('Signal ignore period ended'); ignoreSignalsTimeout = null; }, SIGNAL_IGNORE_PERIOD); // If client stdin is closed (disconnected), try to keep the connection alive if (clientStdinClosed) { debug('Client stdin is closed, but we received an initialize response. Setting up keep-alive mechanism.'); setupKeepAlive(); } } // Store listTools response for potential reconnects if (parsed.result && parsed.result.tools) { lastToolsResponse = line; debug('Stored listTools response for future reconnects'); } // Remove from pending requests if (parsed.id !== undefined) { pendingRequests.delete(parsed.id); } // Write to stdout even if client stdin is closed // This ensures that if Claude Desktop is still listening, it will get the response process.stdout.write(line + '\n'); // In Claude Desktop mode, if this is an initialize response, prepare for the expected disconnect if (claudeDesktopMode && parsed.result && parsed.result.serverInfo) { debug('Claude Desktop mode: Preparing for expected disconnect after initialize response'); // Claude Desktop will disconnect after initialize, so we'll prepare for reconnection setTimeout(() => { if (!clientStdinClosed) { debug('Claude Desktop mode: Still connected after initialize response'); } }, 1000); } } else { // Not a valid JSON-RPC message, redirect to stderr process.stderr.write(`[direct-wrapper:redirect:not-jsonrpc] ${line}\n`); } } catch (err) { // Not valid JSON, redirect to stderr process.stderr.write(`[direct-wrapper:redirect:not-json] ${line}\n`); } } catch (err) { // Error processing line process.stderr.write(`[direct-wrapper:error] Error processing stdout line: ${err.message}\n`); } }); // Redirect all stderr output from the server stderrRl.on('line', (line) => { // Skip keep-alive debug messages to reduce noise if (line.includes('Keep-alive check') || line.includes('Sending keep-alive ping')) { if (DEBUG) { process.stderr.write(`[direct-wrapper:stderr] ${line}\n`); } } else { process.stderr.write(`[direct-wrapper:stderr] ${line}\n`); } }); // Handle server process exit serverProcess.on('exit', (code, signal) => { log(`Server process exited with code ${code} and signal ${signal}`); // If we're not intentionally shutting down, restart the server if (!isShuttingDown) { log('Restarting server process...'); setTimeout(() => { startServer(); // Resend any pending requests after restart if (pendingRequests.size > 0) { log(`Resending ${pendingRequests.size} pending requests after restart`); setTimeout(() => { for (const [id, request] of pendingRequests.entries()) { debug(`Resending request with id ${id}`); if (serverProcess && serverProcess.stdin.writable) { serverProcess.stdin.write(request + '\n'); } } }, 1000); // Wait a bit for the server to initialize } }, 1000); // Restart after a short delay } else { // If we are shutting down, exit the wrapper process.exit(code || 0); } }); // Handle server process errors serverProcess.on('error', (err) => { log(`Server process error: ${err.message}`); // If we're not intentionally shutting down, restart the server if (!isShuttingDown) { log('Restarting server process after error...'); setTimeout(startServer, 1000); // Restart after a short delay } else { // If we are shutting down, exit the wrapper process.exit(1); } }); // Set up inactivity timeout resetInactivityTimeout(); } // Set up inactivity timeout to shut down after a period of inactivity function resetInactivityTimeout() { // Clear any existing timeout if (inactivityTimeout) { clearTimeout(inactivityTimeout); } // Set up a new timeout inactivityTimeout = setTimeout(() => { const inactivityTime = Date.now() - lastActivityTime; log(`No activity for ${Math.round(inactivityTime / 1000)} seconds, shutting down`); // Shut down gracefully isShuttingDown = true; // Clear all intervals and timeouts clearAllTimeouts(); // Kill the server process if (serverProcess) { serverProcess.kill('SIGTERM'); } // Exit after a short delay setTimeout(() => { process.exit(0); }, 1000); }, AUTO_SHUTDOWN_TIMEOUT); } // Clear all timeouts and intervals function clearAllTimeouts() { if (keepAliveInterval) { clearInterval(keepAliveInterval); keepAliveInterval = null; } if (reconnectAttemptTimeout) { clearTimeout(reconnectAttemptTimeout); reconnectAttemptTimeout = null; } if (ignoreSignalsTimeout) { clearTimeout(ignoreSignalsTimeout); ignoreSignalsTimeout = null; } if (inactivityTimeout) { clearTimeout(inactivityTimeout); inactivityTimeout = null; } } // Set up keep-alive mechanism to maintain connection with Claude Desktop function setupKeepAlive() { // Clear any existing keep-alive interval if (keepAliveInterval) { clearInterval(keepAliveInterval); } // Set up a new keep-alive interval keepAliveInterval = setInterval(() => { // If we have a cached initialize response, send a dummy notification to keep the connection alive if (lastInitializeResponse && !isShuttingDown) { debug('Sending keep-alive notification to client'); // Send a notification (no id) that won't require a response const keepAliveMsg = { jsonrpc: '2.0', method: 'notifications/keepAlive', params: { timestamp: Date.now() } }; try { process.stdout.write(JSON.stringify(keepAliveMsg) + '\n'); } catch (err) { debug(`Error sending keep-alive notification: ${err.message}`); } } }, KEEP_ALIVE_INTERVAL); } // Function to attempt reconnection with Claude Desktop function attemptReconnect() { // Clear any existing reconnect timeout if (reconnectAttemptTimeout) { clearTimeout(reconnectAttemptTimeout); reconnectAttemptTimeout = null; } // If we have a cached initialize response and client stdin is closed if (lastInitializeResponse && clientStdinClosed && !isShuttingDown) { debug('Attempting to reconnect with Claude Desktop'); // Send the cached initialize response again try { // Safely write to stdout, catching any EPIPE errors try { process.stdout.write(lastInitializeResponse + '\n'); } catch (err) { if (err.code === 'EPIPE') { // EPIPE means the other end of the pipe is closed debug('EPIPE error when trying to write initialize response - output stream is closed'); } else { // Re-throw other errors throw err; } } } catch (err) { debug(`Error sending initialize response: ${err.message}`); } // If we have a cached listTools response, send that too after a short delay if (lastToolsResponse) { setTimeout(() => { debug('Sending cached listTools response'); try { // Safely write to stdout, catching any EPIPE errors try { process.stdout.write(lastToolsResponse + '\n'); } catch (err) { if (err.code === 'EPIPE') { // EPIPE means the other end of the pipe is closed debug('EPIPE error when trying to write tools response - output stream is closed'); } else { // Re-throw other errors throw err; } } } catch (err) { debug(`Error sending tools response: ${err.message}`); } }, 100); } // Set up keep-alive mechanism setupKeepAlive(); // Schedule next reconnect attempt reconnectAttemptTimeout = setTimeout(attemptReconnect, 30000); // Try again in 30 seconds } } // Start the server initially startServer(); // Handle stdin (from client) and forward to server const stdinRl = createInterface({ input: process.stdin, terminal: false }); stdinRl.on('line', (line) => { try { // Update last activity time lastActivityTime = Date.now(); resetInactivityTimeout(); // Reset client stdin closed flag since we're receiving data if (clientStdinClosed) { log('Client reconnected (received stdin data)'); clientStdinClosed = false; // Clear reconnect attempt timeout if it exists if (reconnectAttemptTimeout) { clearTimeout(reconnectAttemptTimeout); reconnectAttemptTimeout = null; } } // Skip empty lines if (!line.trim()) { return; } // Try to parse as JSON to validate try { const parsed = JSON.parse(line); // Only forward valid JSON-RPC messages if (parsed && typeof parsed === 'object' && parsed.jsonrpc === '2.0') { debug(`Forwarded client request: ${line.substring(0, 100)}${line.length > 100 ? '...' : ''}`); // Check for Claude Desktop client info if (parsed.method === 'initialize' && parsed.params?.clientInfo?.name === 'claude-ai') { log('Detected Claude Desktop client, enabling Claude Desktop mode'); claudeDesktopMode = true; } // Special handling for initialize requests if we have a cached response if (parsed.method === 'initialize' && lastInitializeResponse && !isShuttingDown) { debug('Detected initialize request with cached response available'); // If this is a reconnect and we have a cached response that's less than 30 seconds old if (Date.now() - lastInitializeTime < 30000) { // Still send the request to the server to maintain state if (serverProcess && serverProcess.stdin.writable) { serverProcess.stdin.write(line + '\n'); } // Also immediately respond with the cached response to prevent client timeout debug('Sending cached initialize response'); try { // Safely write to stdout, catching any EPIPE errors try { process.stdout.write(lastInitializeResponse + '\n'); } catch (err) { if (err.code === 'EPIPE') { // EPIPE means the other end of the pipe is closed debug('EPIPE error when trying to write initialize response - output stream is closed'); } else { // Re-throw other errors throw err; } } } catch (err) { debug(`Error sending initialize response: ${err.message}`); } // In Claude Desktop mode, also send cached listTools response if (claudeDesktopMode && lastToolsResponse) { // Create a new listTools request with the next ID const listToolsRequest = { jsonrpc: '2.0', method: 'listTools', id: parsed.id + 1 }; // Send listTools request to server to maintain state if (serverProcess && serverProcess.stdin.writable) { serverProcess.stdin.write(JSON.stringify(listToolsRequest) + '\n'); } // Also immediately respond with cached listTools response // But update the ID to match the new request try { const toolsResponse = JSON.parse(lastToolsResponse); toolsResponse.id = parsed.id + 1; debug('Sending cached listTools response with updated ID'); try { // Safely write to stdout, catching any EPIPE errors try { process.stdout.write(JSON.stringify(toolsResponse) + '\n'); } catch (err) { if (err.code === 'EPIPE') { // EPIPE means the other end of the pipe is closed debug('EPIPE error when trying to write tools response - output stream is closed'); } else { // Re-throw other errors throw err; } } } catch (err) { debug(`Error sending tools response: ${err.message}`); } } catch (err) { debug(`Error updating cached listTools response ID: ${err.message}`); } } return; } else { debug('Cached initialize response is too old, forwarding to server'); } } // Special handling for listTools requests if we have a cached response if (parsed.method === 'listTools' && lastToolsResponse && !isShuttingDown) { debug('Detected listTools request with cached response available'); // Update the ID in the cached response to match the request try { const toolsResponse = JSON.parse(lastToolsResponse); toolsResponse.id = parsed.id; // Still send the request to the server to maintain state if (serverProcess && serverProcess.stdin.writable) { serverProcess.stdin.write(line + '\n'); } // Also immediately respond with the cached response debug('Sending cached listTools response with updated ID'); try { // Safely write to stdout, catching any EPIPE errors try { process.stdout.write(JSON.stringify(toolsResponse) + '\n'); } catch (err) { if (err.code === 'EPIPE') { // EPIPE means the other end of the pipe is closed debug('EPIPE error when trying to write tools response - output stream is closed'); } else { // Re-throw other errors throw err; } } } catch (err) { debug(`Error sending tools response: ${err.message}`); } return; } catch (err) { debug(`Error updating cached listTools response ID: ${err.message}`); } } // Store request if it has an ID (for potential resending) if (parsed.id !== undefined) { pendingRequests.set(parsed.id, line); } // Make sure we have a server process to send to if (serverProcess && serverProcess.stdin.writable) { serverProcess.stdin.write(line + '\n'); } else { log('Server process not available, starting new one...'); startServer(); // Queue the message to be sent after a short delay setTimeout(() => { if (serverProcess && serverProcess.stdin.writable) { serverProcess.stdin.write(line + '\n'); } else { log('Failed to send message to server process'); } }, 500); } } else { // Not a valid JSON-RPC message, log warning process.stderr.write(`[direct-wrapper:warning] Received non-JSON-RPC message from client: ${line}\n`); } } catch (err) { // Not valid JSON, log warning process.stderr.write(`[direct-wrapper:warning] Received non-JSON input from client: ${line}\n`); } } catch (err) { // Error processing line process.stderr.write(`[direct-wrapper:error] Error processing stdin line: ${err.message}\n`); } }); // Handle process signals process.on('SIGINT', () => { log('Received SIGINT'); // If we're in the signal ignore period after initialize, don't forward if (ignoreSignalsTimeout) { log('Ignoring SIGINT during post-initialize grace period'); return; } log('Forwarding SIGINT to server'); isShuttingDown = true; pendingRequests.clear(); // Clear pending requests on shutdown // Clear all timeouts and intervals clearAllTimeouts(); if (serverProcess) { serverProcess.kill('SIGINT'); } else { process.exit(0); } }); process.on('SIGTERM', () => { log('Received SIGTERM'); // If we're in the signal ignore period after initialize, don't forward if (ignoreSignalsTimeout) { log('Ignoring SIGTERM during post-initialize grace period'); return; } // In Claude Desktop mode, if we have a recent initialize response, ignore SIGTERM if (claudeDesktopMode && lastInitializeResponse && (Date.now() - lastInitializeTime < 10000)) { log('Claude Desktop mode: Ignoring SIGTERM after recent initialize (expected disconnect pattern)'); return; } log('Forwarding SIGTERM to server'); isShuttingDown = true; pendingRequests.clear(); // Clear pending requests on shutdown // Clear all timeouts and intervals clearAllTimeouts(); if (serverProcess) { serverProcess.kill('SIGTERM'); } else { process.exit(0); } }); // Handle stdin closing (client disconnected) process.stdin.on('end', () => { log('Client stdin closed, but keeping wrapper alive for potential reconnects'); clientStdinClosed = true; // In Claude Desktop mode, this is expected after initialize if (claudeDesktopMode && lastInitializeResponse && (Date.now() - lastInitializeTime < 10000)) { log('Claude Desktop mode: Expected disconnect after initialize, waiting for reconnect'); } // Start reconnect attempts attemptReconnect(); }); // Keep the process alive even if stdin closes process.stdin.resume(); // Log that we're ready log('Direct wrapper initialized and ready');

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jktfe/myAImemory-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server