Skip to main content
Glama
serverManager.ts22.5 kB
import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js'; import { processEnvironment } from '@src/config/envProcessor.js'; import { setupCapabilities } from '@src/core/capabilities/capabilityManager.js'; import { ClientManager } from '@src/core/client/clientManager.js'; import { InstructionAggregator } from '@src/core/instructions/instructionAggregator.js'; import type { OutboundConnection } from '@src/core/types/client.js'; import { AuthProviderTransport, InboundConnection, InboundConnectionConfig, MCPServerParams, OperationOptions, OutboundConnections, ServerStatus, } from '@src/core/types/index.js'; import { type ClientConnection, PresetNotificationService, } from '@src/domains/preset/services/presetNotificationService.js'; import logger, { debugIf } from '@src/logger/logger.js'; import { enhanceServerWithLogging } from '@src/logger/mcpLoggingEnhancer.js'; import { createTransports, inferTransportType } from '@src/transport/transportFactory.js'; import { executeOperation } from '@src/utils/core/operationExecution.js'; export class ServerManager { private static instance: ServerManager | undefined; private inboundConns: Map<string, InboundConnection> = new Map(); private serverConfig: { name: string; version: string }; private serverCapabilities: { capabilities: Record<string, unknown> }; private outboundConns: OutboundConnections = new Map<string, OutboundConnection>(); private transports: Record<string, Transport> = {}; private connectionSemaphore: Map<string, Promise<void>> = new Map(); private disconnectingIds: Set<string> = new Set(); private instructionAggregator?: InstructionAggregator; private clientManager?: ClientManager; private mcpServers: Map<string, { transport: AuthProviderTransport; config: MCPServerParams }> = new Map(); private constructor( config: { name: string; version: string }, capabilities: { capabilities: Record<string, unknown> }, outboundConns: OutboundConnections, transports: Record<string, Transport>, ) { this.serverConfig = config; this.serverCapabilities = capabilities; this.outboundConns = outboundConns; this.transports = transports; this.clientManager = ClientManager.getOrCreateInstance(); } public static getOrCreateInstance( config: { name: string; version: string }, capabilities: { capabilities: Record<string, unknown> }, outboundConns: OutboundConnections, transports: Record<string, Transport>, ): ServerManager { if (!ServerManager.instance) { ServerManager.instance = new ServerManager(config, capabilities, outboundConns, transports); } return ServerManager.instance; } public static get current(): ServerManager { if (!ServerManager.instance) { throw new Error('ServerManager not initialized'); } return ServerManager.instance; } // Test utility method to reset singleton state public static resetInstance(): void { if (ServerManager.instance) { // Clean up existing connections with forced close for (const [sessionId] of ServerManager.instance.inboundConns) { ServerManager.instance.disconnectTransport(sessionId, true); } ServerManager.instance.inboundConns.clear(); ServerManager.instance.connectionSemaphore.clear(); ServerManager.instance.disconnectingIds.clear(); } ServerManager.instance = undefined; } /** * Set the instruction aggregator instance * @param aggregator The instruction aggregator to use */ public setInstructionAggregator(aggregator: InstructionAggregator): void { this.instructionAggregator = aggregator; // Listen for instruction changes and update existing server instances aggregator.on('instructions-changed', () => { this.updateServerInstructions(); }); debugIf('Instruction aggregator set for ServerManager'); } /** * Update all server instances with new aggregated instructions */ private updateServerInstructions(): void { logger.info(`Server instructions have changed. Active sessions: ${this.inboundConns.size}`); for (const [sessionId, _inboundConn] of this.inboundConns) { try { // Note: The MCP SDK doesn't provide a direct way to update instructions // on an existing server instance. Instructions are set during server construction. // For now, we'll log this for future server instances. debugIf(() => ({ message: `Instructions changed notification for session ${sessionId}`, meta: { sessionId } })); } catch (error) { logger.warn(`Failed to process instruction change for session ${sessionId}: ${error}`); } } } public async connectTransport(transport: Transport, sessionId: string, opts: InboundConnectionConfig): Promise<void> { // Check if a connection is already in progress for this session const existingConnection = this.connectionSemaphore.get(sessionId); if (existingConnection) { logger.warn(`Connection already in progress for session ${sessionId}, waiting...`); await existingConnection; return; } // Check if transport is already connected if (this.inboundConns.has(sessionId)) { logger.warn(`Transport already connected for session ${sessionId}`); return; } // Create connection promise to prevent race conditions const connectionPromise = this.performConnection(transport, sessionId, opts); this.connectionSemaphore.set(sessionId, connectionPromise); try { await connectionPromise; } finally { // Clean up the semaphore entry this.connectionSemaphore.delete(sessionId); } } private async performConnection( transport: Transport, sessionId: string, opts: InboundConnectionConfig, ): Promise<void> { // Set connection timeout const connectionTimeoutMs = 30000; // 30 seconds const timeoutPromise = new Promise<never>((_, reject) => { setTimeout(() => reject(new Error(`Connection timeout for session ${sessionId}`)), connectionTimeoutMs); }); try { await Promise.race([this.doConnect(transport, sessionId, opts), timeoutPromise]); } catch (error) { // Update status to Error if connection exists const connection = this.inboundConns.get(sessionId); if (connection) { connection.status = ServerStatus.Error; connection.lastError = error instanceof Error ? error : new Error(String(error)); } logger.error(`Failed to connect transport for session ${sessionId}:`, error); throw error; } } private async doConnect(transport: Transport, sessionId: string, opts: InboundConnectionConfig): Promise<void> { // Get filtered instructions based on client's filter criteria using InstructionAggregator const filteredInstructions = this.instructionAggregator?.getFilteredInstructions(opts, this.outboundConns) || ''; // Create server capabilities with filtered instructions const serverOptionsWithInstructions = { ...this.serverCapabilities, instructions: filteredInstructions || undefined, }; // Create a new server instance for this transport const server = new Server(this.serverConfig, serverOptionsWithInstructions); // Create server info object first const serverInfo: InboundConnection = { server, status: ServerStatus.Connecting, connectedAt: new Date(), ...opts, }; // Enhance server with logging middleware enhanceServerWithLogging(server); // Set up capabilities for this server instance await setupCapabilities(this.outboundConns, serverInfo); // Update the configuration reload service with server info // Config reload service removed - handled by ConfigChangeHandler // Store the server instance this.inboundConns.set(sessionId, serverInfo); // Connect the transport to the new server instance await server.connect(transport); // Update status to Connected after successful connection serverInfo.status = ServerStatus.Connected; serverInfo.lastConnected = new Date(); // Register client with preset notification service if preset is used if (opts.presetName) { const notificationService = PresetNotificationService.getInstance(); const clientConnection: ClientConnection = { id: sessionId, presetName: opts.presetName, sendNotification: async (method: string, params?: Record<string, unknown>) => { try { if (serverInfo.status === ServerStatus.Connected && serverInfo.server.transport) { await serverInfo.server.notification({ method, params: params || {} }); debugIf(() => ({ message: 'Sent notification to client', meta: { sessionId, method } })); } else { logger.warn('Cannot send notification to disconnected client', { sessionId, method }); } } catch (error) { logger.error('Failed to send notification to client', { sessionId, method, error: error instanceof Error ? error.message : 'Unknown error', }); throw error; } }, isConnected: () => serverInfo.status === ServerStatus.Connected && !!serverInfo.server.transport, }; notificationService.trackClient(clientConnection, opts.presetName); logger.info('Registered client for preset notifications', { sessionId, presetName: opts.presetName, }); } logger.info(`Connected transport for session ${sessionId}`); } public disconnectTransport(sessionId: string, forceClose: boolean = false): void { // Prevent recursive disconnection calls if (this.disconnectingIds.has(sessionId)) { return; } const server = this.inboundConns.get(sessionId); if (server) { this.disconnectingIds.add(sessionId); try { // Update status to Disconnected server.status = ServerStatus.Disconnected; // Only close the transport if explicitly requested (e.g., during shutdown) // Don't close if this is called from an onclose handler to avoid recursion if (forceClose && server.server.transport) { try { server.server.transport.close(); } catch (error) { logger.error(`Error closing transport for session ${sessionId}:`, error); } } // Untrack client from preset notification service const notificationService = PresetNotificationService.getInstance(); notificationService.untrackClient(sessionId); debugIf(() => ({ message: 'Untracked client from preset notifications', meta: { sessionId } })); this.inboundConns.delete(sessionId); // Config reload service removed - handled by ConfigChangeHandler logger.info(`Disconnected transport for session ${sessionId}`); } finally { this.disconnectingIds.delete(sessionId); } } } public getTransport(sessionId: string): Transport | undefined { return this.inboundConns.get(sessionId)?.server.transport; } public getTransports(): Map<string, Transport> { const transports = new Map<string, Transport>(); for (const [id, server] of this.inboundConns.entries()) { if (server.server.transport) { transports.set(id, server.server.transport); } } return transports; } public getClientTransports(): Record<string, Transport> { return this.transports; } public getClients(): OutboundConnections { return this.outboundConns; } /** * Safely get a client by name. Returns undefined if not found or not an own property. * Encapsulates access to prevent prototype pollution and accidental key collisions. */ public getClient(serverName: string): OutboundConnection | undefined { return this.outboundConns.get(serverName); } public getActiveTransportsCount(): number { return this.inboundConns.size; } public getServer(sessionId: string): InboundConnection | undefined { return this.inboundConns.get(sessionId); } public getInboundConnections(): Map<string, InboundConnection> { return this.inboundConns; } public updateClientsAndTransports(newClients: OutboundConnections, newTransports: Record<string, Transport>): void { this.outboundConns = newClients; this.transports = newTransports; } /** * Executes a server operation with error handling and retry logic * @param inboundConn The inbound connection to execute the operation on * @param operation The operation to execute * @param options Operation options including timeout and retry settings */ public async executeServerOperation<T>( inboundConn: InboundConnection, operation: (inboundConn: InboundConnection) => Promise<T>, options: OperationOptions = {}, ): Promise<T> { // Check connection status before executing operation if (inboundConn.status !== ServerStatus.Connected || !inboundConn.server.transport) { throw new Error(`Cannot execute operation: server status is ${inboundConn.status}`); } return executeOperation(() => operation(inboundConn), 'server', options); } /** * Start a new MCP server instance */ public async startServer(serverName: string, config: MCPServerParams): Promise<void> { try { logger.info(`Starting MCP server: ${serverName}`); // Check if server is already running if (this.mcpServers.has(serverName)) { logger.warn(`Server ${serverName} is already running`); return; } // Skip disabled servers if (config.disabled) { logger.info(`Server ${serverName} is disabled, skipping start`); return; } // Process environment variables in config const processedConfig = this.processServerConfig(config); // Infer transport type if not specified const configWithType = inferTransportType(processedConfig, serverName); // Create transport for the server const transport = await this.createServerTransport(serverName, configWithType); // Store server info this.mcpServers.set(serverName, { transport, config: configWithType, }); // Create client connection to the server using ClientManager await this.connectToServer(serverName, transport, configWithType); logger.info(`Successfully started MCP server: ${serverName}`); } catch (error) { logger.error(`Failed to start MCP server ${serverName}:`, error); throw error; } } /** * Stop a server instance */ public async stopServer(serverName: string): Promise<void> { try { logger.info(`Stopping MCP server: ${serverName}`); // Check if server is running const serverInfo = this.mcpServers.get(serverName); if (!serverInfo) { logger.warn(`Server ${serverName} is not running`); return; } // Disconnect client from the server using ClientManager await this.disconnectFromServer(serverName); // Clean up transport const { transport } = serverInfo; try { if (transport.close) { await transport.close(); } } catch (error) { logger.warn(`Error closing transport for server ${serverName}:`, error); } // Remove from tracking this.mcpServers.delete(serverName); logger.info(`Successfully stopped MCP server: ${serverName}`); } catch (error) { logger.error(`Failed to stop MCP server ${serverName}:`, error); throw error; } } /** * Restart a server instance */ public async restartServer(serverName: string, config: MCPServerParams): Promise<void> { try { logger.info(`Restarting MCP server: ${serverName}`); // Check if server is currently running and stop it const isCurrentlyRunning = this.mcpServers.has(serverName); if (isCurrentlyRunning) { logger.info(`Stopping existing server ${serverName} before restart`); await this.stopServer(serverName); } // Start the server with new configuration await this.startServer(serverName, config); logger.info(`Successfully restarted MCP server: ${serverName}`); } catch (error) { logger.error(`Failed to restart MCP server ${serverName}:`, error); throw error; } } /** * Process server configuration to handle environment variables */ private processServerConfig(config: MCPServerParams): MCPServerParams { try { // Create a mutable copy for processing const processedConfig = { ...config }; // Process environment variables if enabled - only pass env-related fields const envConfig = { inheritParentEnv: config.inheritParentEnv, envFilter: config.envFilter, env: config.env, }; const processedEnv = processEnvironment(envConfig); // Replace environment variables in the config while preserving all other fields if (processedEnv.processedEnv && Object.keys(processedEnv.processedEnv).length > 0) { processedConfig.env = processedEnv.processedEnv; } return processedConfig; } catch (error) { logger.warn(`Failed to process environment variables for server config:`, error); return config; } } /** * Create a transport for the given server configuration */ private async createServerTransport(serverName: string, config: MCPServerParams): Promise<AuthProviderTransport> { try { debugIf(() => ({ message: `Creating transport for server ${serverName}`, meta: { serverName, type: config.type, command: config.command, url: config.url }, })); // Create transport using the factory pattern const transports = createTransports({ [serverName]: config }); const transport = transports[serverName]; if (!transport) { throw new Error(`Failed to create transport for server ${serverName}`); } debugIf(() => ({ message: `Successfully created transport for server ${serverName}`, meta: { serverName, transportType: config.type }, })); return transport; } catch (error) { logger.error(`Failed to create transport for server ${serverName}:`, error); throw error; } } /** * Connect to a server using ClientManager */ private async connectToServer( serverName: string, transport: AuthProviderTransport, _config: MCPServerParams, ): Promise<void> { try { if (!this.clientManager) { throw new Error('ClientManager not initialized'); } // Create client connection using the existing ClientManager infrastructure const clients = await this.clientManager.createClients({ [serverName]: transport }); // Update our local outbound connections const newClient = clients.get(serverName); if (newClient) { this.outboundConns.set(serverName, newClient); this.transports[serverName] = transport; } debugIf(() => ({ message: `Successfully connected to server ${serverName}`, meta: { serverName, status: newClient?.status }, })); } catch (error) { logger.error(`Failed to connect to server ${serverName}:`, error); throw error; } } /** * Disconnect from a server using ClientManager */ private async disconnectFromServer(serverName: string): Promise<void> { try { if (!this.clientManager) { throw new Error('ClientManager not initialized'); } // Remove from outbound connections this.outboundConns.delete(serverName); delete this.transports[serverName]; // ClientManager doesn't have explicit disconnect method, so we clean up our references // The actual transport cleanup happens in stopServer debugIf(() => ({ message: `Successfully disconnected from server ${serverName}`, meta: { serverName }, })); } catch (error) { logger.error(`Failed to disconnect from server ${serverName}:`, error); throw error; } } /** * Get the status of all managed MCP servers */ public getMcpServerStatus(): Map<string, { running: boolean; config: MCPServerParams }> { const status = new Map<string, { running: boolean; config: MCPServerParams }>(); for (const [serverName, serverInfo] of this.mcpServers.entries()) { status.set(serverName, { running: true, config: serverInfo.config, }); } return status; } /** * Check if a specific MCP server is running */ public isMcpServerRunning(serverName: string): boolean { return this.mcpServers.has(serverName); } /** * Update metadata for a running server without restarting it */ public async updateServerMetadata(serverName: string, newConfig: MCPServerParams): Promise<void> { try { const serverInfo = this.mcpServers.get(serverName); if (!serverInfo) { logger.warn(`Cannot update metadata for ${serverName}: server not running`); return; } debugIf(() => ({ message: `Updating metadata for server ${serverName}`, meta: { oldConfig: serverInfo.config, newConfig, }, })); // Update the stored configuration with new metadata serverInfo.config = { ...serverInfo.config, ...newConfig }; // Update transport metadata if supported const { transport } = serverInfo; if (transport && 'tags' in transport) { // Update tags and other metadata on transport if (newConfig.tags) { transport.tags = newConfig.tags; } } // Update outbound connections metadata const outboundConn = this.outboundConns.get(serverName); if (outboundConn && outboundConn.transport && 'tags' in outboundConn.transport) { // Update tags in the outbound connection outboundConn.transport.tags = newConfig.tags; } debugIf(() => ({ message: `Successfully updated metadata for server ${serverName}`, meta: { newTags: newConfig.tags }, })); } catch (error) { logger.error(`Failed to update metadata for server ${serverName}:`, error); throw error; } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/1mcp-app/agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server