Skip to main content
Glama

1MCP Server

mcpLoadingManager.ts15.6 kB
import { EventEmitter } from 'events'; import { ClientManager } from '@src/core/client/clientManager.js'; import { AuthProviderTransport, OutboundConnections } from '@src/core/types/index.js'; import logger, { debugIf } from '@src/logger/logger.js'; import { LoadingState, LoadingStateEvent, LoadingStateTracker, LoadingSummary } from './loadingStateTracker.js'; /** * Configuration options for MCP loading behavior */ export interface McpLoadingConfig { /** Maximum time to wait for each server (ms) */ readonly serverTimeoutMs: number; /** Maximum number of retry attempts per server */ readonly maxRetries: number; /** Initial delay between retries (ms) */ readonly retryDelayMs: number; /** Maximum number of servers to initialize concurrently */ readonly maxConcurrentLoads: number; /** Whether to continue loading other servers if some fail */ readonly continueOnFailure: boolean; /** Whether to enable background retry for failed servers */ readonly enableBackgroundRetry: boolean; /** Interval for background retry attempts (ms) */ readonly backgroundRetryIntervalMs: number; } /** * Default configuration for MCP loading */ export const DEFAULT_LOADING_CONFIG: McpLoadingConfig = { serverTimeoutMs: 30000, // 30 seconds per server maxRetries: 3, retryDelayMs: 2000, // 2 seconds initial delay maxConcurrentLoads: 5, // Load 5 servers at once continueOnFailure: true, enableBackgroundRetry: true, backgroundRetryIntervalMs: 60000, // Retry every minute }; /** * Result of loading a specific server */ interface ServerLoadResult { readonly name: string; readonly success: boolean; readonly error?: Error; readonly duration: number; readonly retryCount: number; } /** * Events emitted by McpLoadingManager */ export const enum McpLoadingEvent { LoadingStarted = 'loading-started', ServerLoading = 'server-loading', ServerLoaded = 'server-loaded', ServerFailed = 'server-failed', OAuthRequired = 'oauth-required', LoadingProgress = 'loading-progress', LoadingComplete = 'loading-complete', BackgroundRetry = 'background-retry', } export interface McpLoadingEvents { [McpLoadingEvent.LoadingStarted]: (serverNames: string[]) => void; [McpLoadingEvent.ServerLoading]: (name: string) => void; [McpLoadingEvent.ServerLoaded]: (name: string, result: ServerLoadResult) => void; [McpLoadingEvent.ServerFailed]: (name: string, result: ServerLoadResult) => void; [McpLoadingEvent.OAuthRequired]: (name: string, authUrl?: string) => void; [McpLoadingEvent.LoadingProgress]: (summary: LoadingSummary) => void; [McpLoadingEvent.LoadingComplete]: (summary: LoadingSummary) => void; [McpLoadingEvent.BackgroundRetry]: (name: string, attempt: number) => void; } /** * Manages asynchronous loading of MCP servers without blocking HTTP server startup * * This manager coordinates the initialization of multiple MCP servers in parallel, * provides real-time status updates, and handles retries and error recovery. * The HTTP server can start immediately while this manager loads servers in the background. * * @example * ```typescript * const manager = new McpLoadingManager(clientManager, config); * manager.on(McpLoadingEvent.LoadingComplete, (summary) => { * console.log(`${summary.ready}/${summary.totalServers} servers ready`); * }); * * // Start loading asynchronously * const loadingPromise = manager.startAsyncLoading(transports); * * // HTTP server can start immediately * const expressServer = new ExpressServer(serverManager); * expressServer.start(); * * // Optionally wait for loading to complete * await loadingPromise; * ``` */ export class McpLoadingManager extends EventEmitter { private clientManager: ClientManager; private config: McpLoadingConfig; private stateTracker: LoadingStateTracker; private loadingSemaphore: Map<string, Promise<ServerLoadResult>> = new Map(); private backgroundRetryTimer?: ReturnType<typeof setTimeout>; private isShuttingDown: boolean = false; private abortControllers: Map<string, AbortController> = new Map(); constructor(clientManager: ClientManager, config: Partial<McpLoadingConfig> = {}) { super(); this.clientManager = clientManager; this.config = { ...DEFAULT_LOADING_CONFIG, ...config }; this.stateTracker = new LoadingStateTracker(); // Forward state tracker events this.stateTracker.on(LoadingStateEvent.ServerStateChanged, (name, info) => { if (info.state === LoadingState.Ready) { this.emit(McpLoadingEvent.ServerLoaded, name, { name, success: true, duration: info.duration || 0, retryCount: info.retryCount, }); } else if (info.state === LoadingState.Failed) { this.emit(McpLoadingEvent.ServerFailed, name, { name, success: false, error: info.error, duration: info.duration || 0, retryCount: info.retryCount, }); } else if (info.state === LoadingState.AwaitingOAuth) { this.emit(McpLoadingEvent.OAuthRequired, name, info.authorizationUrl); } }); this.stateTracker.on(LoadingStateEvent.LoadingProgress, (summary) => { this.emit(McpLoadingEvent.LoadingProgress, summary); }); this.stateTracker.on(LoadingStateEvent.LoadingComplete, (summary) => { this.emit(McpLoadingEvent.LoadingComplete, summary); this.setupBackgroundRetry(); }); this.setMaxListeners(100); // Allow many listeners } /** * Start asynchronous loading of MCP servers * Returns immediately, loading happens in background */ public async startAsyncLoading(transports: Record<string, AuthProviderTransport>): Promise<OutboundConnections> { const serverNames = Object.keys(transports); if (serverNames.length === 0) { logger.info('No MCP servers to load'); return new Map(); } logger.info(`Starting async loading of ${serverNames.length} MCP servers`); this.stateTracker.startLoading(serverNames); this.emit(McpLoadingEvent.LoadingStarted, serverNames); // Start loading servers with concurrency control this.loadServersWithConcurrency(transports); // Return current connections (may be empty initially) return this.clientManager.getClients(); } /** * Load servers with concurrency control */ private async loadServersWithConcurrency(transports: Record<string, AuthProviderTransport>): Promise<void> { const serverEntries = Object.entries(transports); const semaphore = new Map<string, Promise<void>>(); // Process servers in batches based on maxConcurrentLoads for (let i = 0; i < serverEntries.length; i += this.config.maxConcurrentLoads) { const batch = serverEntries.slice(i, i + this.config.maxConcurrentLoads); // Start all servers in this batch const batchPromises = batch.map(([name, transport]) => { const loadPromise = this.loadSingleServer(name, transport); semaphore.set(name, loadPromise); return loadPromise; }); // Wait for this batch to complete before starting next batch await Promise.allSettled(batchPromises); // Clean up completed promises for (const [name] of batch) { semaphore.delete(name); } if (this.isShuttingDown) { logger.info('Loading cancelled due to shutdown'); break; } } logger.info('Initial server loading phase completed'); } /** * Load a single server with retry logic */ private async loadSingleServer(name: string, transport: AuthProviderTransport): Promise<void> { if (this.isShuttingDown) return; this.emit(McpLoadingEvent.ServerLoading, name); this.stateTracker.updateServerState(name, LoadingState.Loading, { progress: { phase: 'initializing', message: 'Starting server connection' }, }); let lastError: Error | undefined; let retryCount = 0; while (retryCount <= this.config.maxRetries && !this.isShuttingDown) { try { this.stateTracker.updateServerState(name, LoadingState.Loading, { progress: { phase: retryCount > 0 ? 'retrying' : 'connecting', message: retryCount > 0 ? `Retry attempt ${retryCount}` : 'Connecting to server', }, }); // Attempt to create and connect client await this.createClientWithTimeout(name, transport); // Success! this.stateTracker.updateServerState(name, LoadingState.Ready); logger.info(`Successfully loaded MCP server: ${name} (${retryCount} retries)`); return; } catch (error) { lastError = error instanceof Error ? error : new Error(String(error)); retryCount++; this.stateTracker.incrementRetryCount(name); // Handle OAuth case specially if (lastError.name === 'OAuthRequiredError') { logger.info(`OAuth required for ${name}`); this.stateTracker.updateServerState(name, LoadingState.AwaitingOAuth, { error: lastError, }); return; // Don't retry OAuth errors } // Handle other errors logger.warn(`Failed to load ${name} (attempt ${retryCount}): ${lastError.message}`); if (retryCount <= this.config.maxRetries && !this.isShuttingDown) { const delay = this.config.retryDelayMs * Math.pow(2, retryCount - 1); // Exponential backoff logger.info(`Retrying ${name} in ${delay}ms...`); await this.sleep(delay); } } } // All retries exhausted this.stateTracker.updateServerState(name, LoadingState.Failed, { error: lastError || new Error('Unknown error'), }); if (this.config.continueOnFailure) { logger.error(`Failed to load ${name} after ${this.config.maxRetries} retries, continuing with other servers`); } else { logger.error(`Failed to load ${name}, stopping loading process`); throw lastError; } } /** * Create client with timeout and cancellation support */ private async createClientWithTimeout(name: string, transport: AuthProviderTransport): Promise<void> { // Create abort controller for this specific server loading operation const abortController = new AbortController(); this.abortControllers.set(name, abortController); try { const timeoutPromise = new Promise<never>((_, reject) => { const timeoutId = setTimeout(() => { abortController.abort(); reject(new Error(`Timeout loading ${name} after ${this.config.serverTimeoutMs}ms`)); }, this.config.serverTimeoutMs); // Clear timeout if operation is aborted abortController.signal.addEventListener('abort', () => { clearTimeout(timeoutId); reject(new Error(`Loading ${name} was cancelled`)); }); }); const loadPromise = this.clientManager.createSingleClient(name, transport, abortController.signal); await Promise.race([loadPromise, timeoutPromise]); } finally { // Clean up abort controller this.abortControllers.delete(name); } } /** * Set up background retry for failed servers */ private setupBackgroundRetry(): void { if (!this.config.enableBackgroundRetry || this.isShuttingDown) { return; } this.backgroundRetryTimer = setInterval(() => { this.performBackgroundRetry(); }, this.config.backgroundRetryIntervalMs); logger.info('Background retry enabled for failed servers'); } /** * Perform background retry for failed servers */ private async performBackgroundRetry(): Promise<void> { if (this.isShuttingDown) return; const failedServers = this.stateTracker.getServersByState(LoadingState.Failed); if (failedServers.length === 0) { return; } logger.info(`Background retry for ${failedServers.length} failed servers`); // Retry a subset of failed servers to avoid overwhelming the system const serversToRetry = failedServers.slice(0, 3); // Retry max 3 at a time for (const serverInfo of serversToRetry) { if (this.isShuttingDown) break; const transport = this.clientManager.getTransport(serverInfo.name); if (transport) { this.emit(McpLoadingEvent.BackgroundRetry, serverInfo.name, serverInfo.retryCount + 1); // Don't wait for completion, let it run in background this.loadSingleServer(serverInfo.name, transport).catch((error) => { debugIf(() => ({ message: `Background retry failed for ${serverInfo.name}: ${error.message}` })); }); } } } /** * Get current loading state tracker */ public getStateTracker(): LoadingStateTracker { return this.stateTracker; } /** * Get current loading summary */ public getSummary(): LoadingSummary { return this.stateTracker.getSummary(); } /** * Check if a specific server is ready */ public isServerReady(name: string): boolean { const state = this.stateTracker.getServerState(name); return state?.state === LoadingState.Ready; } /** * Get list of ready servers */ public getReadyServers(): string[] { return this.stateTracker.getServersByState(LoadingState.Ready).map((s) => s.name); } /** * Get list of failed servers */ public getFailedServers(): string[] { return this.stateTracker.getServersByState(LoadingState.Failed).map((s) => s.name); } /** * Cancel loading of a specific server */ public cancelServerLoading(serverName: string): void { const abortController = this.abortControllers.get(serverName); if (abortController) { logger.info(`Cancelling loading of server: ${serverName}`); abortController.abort(); this.stateTracker.updateServerState(serverName, LoadingState.Cancelled); } else { logger.warn(`No active loading operation found for server: ${serverName}`); } } /** * Cancel loading of multiple servers */ public cancelServersLoading(serverNames: string[]): void { for (const serverName of serverNames) { this.cancelServerLoading(serverName); } } /** * Cancel all currently loading servers */ public cancelAllLoading(): void { const loadingServers = Array.from(this.abortControllers.keys()); if (loadingServers.length > 0) { logger.info(`Cancelling loading of ${loadingServers.length} servers`); this.cancelServersLoading(loadingServers); } } /** * Get list of servers that are currently being loaded and can be cancelled */ public getCancellableServers(): string[] { return Array.from(this.abortControllers.keys()); } /** * Shutdown the loading manager */ public shutdown(): void { this.isShuttingDown = true; if (this.backgroundRetryTimer) { clearInterval(this.backgroundRetryTimer); this.backgroundRetryTimer = undefined; } // Cancel any active loading operations this.cancelAllLoading(); // Update state for any remaining pending/loading servers const pendingServers = this.stateTracker.getServersByState(LoadingState.Pending); const loadingServers = this.stateTracker.getServersByState(LoadingState.Loading); for (const server of [...pendingServers, ...loadingServers]) { this.stateTracker.updateServerState(server.name, LoadingState.Cancelled); } logger.info('MCP loading manager shutdown complete'); } /** * Utility method for sleeping */ private sleep(ms: number): Promise<void> { return new Promise((resolve) => setTimeout(resolve, ms)); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/1mcp-app/agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server