Skip to main content
Glama
mcpConnectionManager.ts16 kB
import { readFileSync } from "fs"; import pm2 from "pm2"; import { resolveFromBase, getBasePath, resolveFromUserData, } from "@/helpers/paths"; import { isAbsolute } from "path"; import { readConfig, getSocketPath } from "./config"; // Internal PM2 connection ref-count and caching let pm2Connected = false; let pm2LeaseCount = 0; let pm2ListCache: { timestamp: number; data: any[] } | null = null; const PM2_LIST_TTL_MS = 500; // keep very fresh while cutting repeated IPC function actuallyConnectToPm2(): Promise<void> { return new Promise<void>((resolve, reject) => { pm2.connect((err) => { if (err) { reject(new Error(`Failed to connect to PM2: ${err.message}`)); return; } pm2Connected = true; resolve(); }); }); } function actuallyDisconnectFromPm2(): void { pm2.disconnect(); pm2Connected = false; } async function getPm2ListInternal(): Promise<any[]> { return new Promise<any[]>((resolve, reject) => { pm2.list((err, list) => { if (err) { reject(new Error(`Failed to get process list: ${err.message}`)); return; } resolve(list); }); }); } // Types for MCP client and transport export interface McpClient { connect: (transport: any) => Promise<void>; listTools: () => Promise<any>; callTool: (options: { name: string; arguments: any }) => Promise<any>; close: () => Promise<void>; } export interface McpTransport { close?: () => Promise<void>; } export interface ConnectionResources { client: McpClient; transport: McpTransport; disconnect: () => Promise<void>; } // Connection pool to maintain persistent connections export class McpConnectionPool { private connections: Map<string, ConnectionResources> = new Map(); private connectionPromises: Map<string, Promise<ConnectionResources | null>> = new Map(); private readonly poolName: string; constructor(poolName: string = "default") { this.poolName = poolName; } async getConnection(mcpName: string): Promise<ConnectionResources | null> { // Use namespaced key to avoid conflicts with other pools const connectionKey = `${this.poolName}:${mcpName}`; // Check if we already have a connection const existing = this.connections.get(connectionKey); if (existing) { try { // Test if connection is still alive - first check transport, then make a simple call if ( existing.transport && typeof (existing.transport as any).testConnection === "function" ) { const isHealthy = await (existing.transport as any).testConnection(); if (!isHealthy) { throw new Error("Transport connection test failed"); } } // Also test if the MCP client is responsive await existing.client.listTools(); return existing; } catch (error) { // Connection is dead, remove it and create a new one console.warn(`[${mcpName}] Connection is dead, recreating...`, error); this.connections.delete(connectionKey); try { await existing.disconnect(); } catch (disconnectError) { // Ignore disconnect errors for dead connections console.debug( `[${mcpName}] Ignored disconnect error for dead connection:`, disconnectError ); } } } // Check if we're already creating a connection const existingPromise = this.connectionPromises.get(connectionKey); if (existingPromise) { return existingPromise; } // Create new connection const connectionPromise = this.createConnection(mcpName, connectionKey); this.connectionPromises.set(connectionKey, connectionPromise); try { const connection = await connectionPromise; if (connection) { this.connections.set(connectionKey, connection); } return connection; } finally { this.connectionPromises.delete(connectionKey); } } private async createConnection( mcpName: string, connectionKey: string ): Promise<ConnectionResources | null> { try { // Dynamically import the MCP SDK const ClientModule = await import( "@modelcontextprotocol/sdk/client/index.js" ); const Client = ClientModule.Client; // Connect to PM2 await connectToPm2(); // Check if the process is running const isRunning = await checkProcessStatus(mcpName); if (!isRunning) { await disconnectFromPm2(); return null; } // Get package configuration const { cwdAbsolute, env, cmd, cmdArgs } = getPackageConfig(mcpName); // Create an MCP client with pool-specific name const client = new Client({ name: `furi_${this.poolName}`, version: "0.0.1", }); // Check if Unix socket is available (created by our wrapper) const socketPath = getSocketPath(mcpName); const fs = await import("fs"); let transport: any; let transportType: "unix" | "stdio" = "stdio"; if (socketPath && fs.existsSync(socketPath)) { try { // Try to use Unix socket transport const { UnixSocketTransport } = await import("./UnixSocketTransport"); transport = new UnixSocketTransport(socketPath); // We'll connect later when client.connect() is called transportType = "unix"; } catch (error) { console.warn( `[${mcpName}] Failed to create Unix socket transport, falling back to stdio:`, error ); // Do not fall back to stdio, as we need to maintain the PM2 process relationship console.error( `[${mcpName}] Failed to create Unix socket transport and no fallback available` ); throw error; // Re-throw to properly handle the error } } else { // Unix socket not found - log this for debugging if (process.env.DEBUG || process.env.VERBOSE) { console.log( `[${mcpName}] Unix socket not found at ${socketPath}, using stdio transport` ); } // Unix socket not found - we cannot proceed console.error( `[${mcpName}] Unix socket not found${ socketPath ? ` at ${socketPath}` : "" } and no fallback available` ); throw new Error( `Unix socket not available for ${mcpName}. Make sure the MCP is running with 'furi start ${mcpName}'` ); } try { await client.connect(transport); } catch (error) { const connectError = error as Error; console.error( `[${mcpName}] Error during client.connect:`, connectError ); // Add more context to the error throw new Error( `Failed to connect to ${mcpName} MCP: ${connectError.message}` ); } const disconnect = async () => { try { // Remove from pool this.connections.delete(connectionKey); // Close transport if it exists if (transport && typeof transport.close === "function") { await transport.close(); } // Close client if it exists if (client && typeof client.close === "function") { await client.close(); } // Disconnect from PM2 await disconnectFromPm2(); } catch (cleanupError) { console.error("Error during cleanup:", cleanupError); } }; return { client, transport, disconnect, }; } catch (error: any) { console.error( `[${mcpName}] Error creating connection: ${ error.message || String(error) }` ); // Cleanup on error try { await disconnectFromPm2(); } catch (cleanupError) { console.error("Error during cleanup:", cleanupError); } return null; } } async closeConnection(mcpName: string): Promise<void> { const connectionKey = `${this.poolName}:${mcpName}`; const connection = this.connections.get(connectionKey); if (connection) { this.connections.delete(connectionKey); try { await connection.disconnect(); } catch (error) { console.error(`[${mcpName}] Error closing connection:`, error); } } } async closeAllConnections(): Promise<void> { const connections = Array.from(this.connections.entries()); this.connections.clear(); await Promise.all( connections.map(async ([connectionKey, connection]) => { try { await connection.disconnect(); } catch (error) { console.error(`[${connectionKey}] Error closing connection:`, error); } }) ); } getPoolStats(): { poolName: string; activeConnections: number; pendingConnections: number; } { return { poolName: this.poolName, activeConnections: this.connections.size, pendingConnections: this.connectionPromises.size, }; } } // Global connection pool instance for CLI/API calls (separate from aggregator) const connectionPool = new McpConnectionPool("furi"); // Export function to get pooled connection export const getPooledConnection = ( mcpName: string ): Promise<ConnectionResources | null> => { return connectionPool.getConnection(mcpName); }; // Export function to close specific connection export const closePooledConnection = (mcpName: string): Promise<void> => { return connectionPool.closeConnection(mcpName); }; // Export function to close all connections (useful for cleanup) export const closeAllPooledConnections = (): Promise<void> => { return connectionPool.closeAllConnections(); }; // Export function to get pool statistics (for debugging) export const getPoolStats = () => { return connectionPool.getPoolStats(); }; // Export function to check if Unix socket is available for an MCP export const isUnixSocketAvailable = (mcpName: string): boolean => { const socketPath = getSocketPath(mcpName); if (!socketPath) { return false; } const fs = require("fs"); const exists = fs.existsSync(socketPath); if (!exists && (process.env.DEBUG || process.env.VERBOSE)) { console.log(`[${mcpName}] Unix socket not found at ${socketPath}`); return false; } return exists; }; // Export function to get connection info for an MCP export const getConnectionInfo = async ( mcpName: string ): Promise<{ isRunning: boolean; hasUnixSocket: boolean; socketPath: string; transportType?: "unix" | "stdio"; }> => { const socketPath = getSocketPath(mcpName) || resolveFromUserData(`/transport/furi_${mcpName.replace("/", "-")}.sock`); try { await connectToPm2(); const isRunning = await checkProcessStatus(mcpName); await disconnectFromPm2(); return { isRunning, hasUnixSocket: isUnixSocketAvailable(mcpName), socketPath, transportType: isUnixSocketAvailable(mcpName) ? "unix" : "stdio", }; } catch (error) { return { isRunning: false, hasUnixSocket: false, socketPath, }; } }; // Connect to PM2 with ref-count; fast no-op if already connected export const connectToPm2 = async (): Promise<void> => { pm2LeaseCount += 1; if (pm2Connected) return; await actuallyConnectToPm2(); }; // Disconnect from PM2; only actually disconnect when all leases released export const disconnectFromPm2 = async (): Promise<void> => { if (pm2LeaseCount > 0) pm2LeaseCount -= 1; if (pm2LeaseCount === 0 && pm2Connected) { actuallyDisconnectFromPm2(); } }; // Forcefully reset PM2 connection (e.g., on shutdown) export const resetPm2Connection = (): void => { pm2LeaseCount = 0; if (pm2Connected) { actuallyDisconnectFromPm2(); } pm2ListCache = null; }; // Get PM2 process list with short-lived cache export const getPm2List = async (): Promise<any[]> => { const now = Date.now(); if (pm2ListCache && now - pm2ListCache.timestamp < PM2_LIST_TTL_MS) { return pm2ListCache.data; } const list = await getPm2ListInternal(); pm2ListCache = { timestamp: now, data: list }; return list; }; // Get PM2 process info without caching (precise) export const getPm2ProcessInfo = async (processName: string): Promise<any> => { return new Promise<any>((resolve, reject) => { pm2.describe(processName, (err, proc) => { if (err) { reject(new Error(`Failed to get process info: ${err.message}`)); return; } resolve(proc[0]); }); }); }; // Check if process is running and valid export const checkProcessStatus = async ( mcpName: string, spinner?: any ): Promise<boolean> => { const processName = `furi_${mcpName.replace("/", "-")}`; const list = await getPm2List(); const processEntry = list.find((p) => p.name === processName); if (!processEntry) { if (spinner) { spinner.error( `[${mcpName}] Server not running \n \x1b[2mStart it first with furi start ${mcpName}\x1b[0m` ); } return false; } if (processEntry.pm2_env.status !== "online") { if (spinner) { spinner.error( `[${mcpName}] Process is not running (status: ${processEntry.pm2_env.status})` ); } return false; } return true; }; // Get configuration for a package export const getPackageConfig = ( mcpName: string ): { cwdAbsolute: string; env: Record<string, string>; cmd: string; cmdArgs: string[]; } => { const configPath = resolveFromUserData("configuration.json"); const config = JSON.parse(readFileSync(configPath, "utf-8")); const mcpConfig = config[mcpName] || (config.installed && config.installed[mcpName]); if (!mcpConfig) { throw new Error(`[${mcpName}] Configuration not found in getPackageConfig`); } let cwdAbsolute: string; if (mcpConfig.source) { cwdAbsolute = mcpConfig.source; if (!isAbsolute(cwdAbsolute)) { // console.warn( // `[${mcpName}] mcpConfig.source was not an absolute path. Resolving from base. Source: ${cwdAbsolute}` // ); cwdAbsolute = resolveFromUserData(cwdAbsolute); } } else { // console.warn( // `[${mcpName}] mcpConfig.source is not defined. Falling back to default installed path structure.` // ); const parts = mcpName.split("/"); const owner = parts[0]; const repo = parts[1]; if (owner && repo) { cwdAbsolute = resolveFromUserData("installed", owner, repo); } else { cwdAbsolute = resolveFromUserData("installed", mcpName); } } // Prepare environment variables const env: Record<string, string> = {}; // Add relevant environment variables from process.env if (process.env.PATH) env.PATH = process.env.PATH; if (process.env.NODE_ENV) env.NODE_ENV = process.env.NODE_ENV; if (process.env.HOME) env.HOME = process.env.HOME; if (process.env.USER) env.USER = process.env.USER; // Add package-specific environment variables if (mcpConfig?.env) { // Only include non-undefined values for (const [key, value] of Object.entries(mcpConfig.env)) { if (value !== undefined && value !== null) { env[key] = String(value); } } } // Get the command and args from the config file const configCmd = mcpConfig?.run || "npm run start"; const [cmd, ...cmdArgs] = configCmd.split(" "); return { cwdAbsolute, env, cmd, cmdArgs }; }; // Setup MCP connection (legacy function, kept for backward compatibility) export const setupMcpConnection = async ( mcpName: string, spinner?: any ): Promise<ConnectionResources | null> => { // For backward compatibility, use the pooled connection const connection = await getPooledConnection(mcpName); if (!connection && spinner) { spinner.error( `[${mcpName}] Failed to connect to MCP server. Make sure it's running with 'furi start ${mcpName}'` ); } return connection; };

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ashwwwin/furi'

If you have feedback or need assistance with the MCP directory API, please join our Discord server