Skip to main content
Glama
YobieBen
by YobieBen
mcp-client-manager.ts17.4 kB
/** * MCP Client Manager Module * * Author: Yobie Benjamin * Version: 0.2 * Date: July 28, 2025 * * Manages MCP client connections to external services. * This module handles the hub's role as an MCP client connecting to services like Stripe. */ import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js'; import { WebSocketClientTransport } from '@modelcontextprotocol/sdk/client/websocket.js'; import winston from 'winston'; import { ServiceRegistry, ServiceMetadata } from '../registry/service-registry.js'; import type { Tool, Resource, Prompt } from '@modelcontextprotocol/sdk/types.js'; const logger = winston.createLogger({ level: 'debug', format: winston.format.simple() }); /** * MCP service configuration * Defines how to connect to an external MCP service */ export interface MCPServiceConfig { id: string; name: string; description: string; transport: 'stdio' | 'websocket' | 'http'; endpoint: string; command?: string; // For stdio transport args?: string[]; // For stdio transport url?: string; // For websocket/http transport headers?: Record<string, string>; // For http transport reconnectPolicy?: { maxRetries: number; retryDelayMs: number; backoffMultiplier: number; }; } /** * Active MCP client connection * Represents a live connection to an external MCP service */ interface MCPClientConnection { serviceId: string; config: MCPServiceConfig; client: Client; transport: any; // Transport instance connected: boolean; reconnectAttempts: number; capabilities?: { tools: boolean; resources: boolean; prompts: boolean; completion: boolean; }; } /** * Manages all MCP client connections from the hub to external services * Handles connection lifecycle, protocol communication, and error recovery */ export class MCPClientManager { private registry: ServiceRegistry; private connections: Map<string, MCPClientConnection>; private reconnectTimers: Map<string, NodeJS.Timeout>; constructor(registry: ServiceRegistry) { /** * Initialize with service registry * Registry tracks all available services */ this.registry = registry; this.connections = new Map(); this.reconnectTimers = new Map(); } /** * Connect to multiple MCP services * Establishes connections based on service configurations */ async connectToServices(configs: MCPServiceConfig[]): Promise<void> { logger.info(`Connecting to ${configs.length} MCP services...`); /** * Connect to each service in parallel * Failures in one connection don't block others */ const connectionPromises = configs.map(config => this.connectToService(config).catch(error => { logger.error(`Failed to connect to ${config.id}:`, error); return null; }) ); await Promise.all(connectionPromises); logger.info(`Connected to ${this.connections.size} services`); } /** * Connect to a single MCP service * Establishes connection based on transport type */ async connectToService(config: MCPServiceConfig): Promise<void> { logger.info(`Connecting to MCP service: ${config.id}`, config); try { /** * Create transport based on configuration * Different services use different transport mechanisms */ const transport = await this.createTransport(config); /** * Create MCP client instance * This client will communicate using the MCP protocol */ const client = new Client( { name: `llama-hub-client-${config.id}`, version: '0.2.0' }, { capabilities: {} } ); /** * Connect the client to the transport * Establishes the MCP protocol connection */ await client.connect(transport); /** * Query service capabilities * Determine what features the service supports */ const serverInfo = await client.getServerInfo(); const capabilities = serverInfo.capabilities || {}; /** * Store connection information * Track active connection for future use */ const connection: MCPClientConnection = { serviceId: config.id, config, client, transport, connected: true, reconnectAttempts: 0, capabilities: { tools: !!capabilities.tools, resources: !!capabilities.resources, prompts: !!capabilities.prompts, completion: !!capabilities.completion } }; this.connections.set(config.id, connection); /** * Register service in the registry * Make service available to the hub */ this.registry.registerService({ id: config.id, name: config.name, description: config.description, version: serverInfo.serverInfo?.version || 'unknown', endpoint: config.endpoint, transport: config.transport, capabilities: connection.capabilities, status: 'connected', lastHeartbeat: new Date() }); /** * Set up connection monitoring * Handle disconnections and errors */ this.setupConnectionMonitoring(connection); logger.info(`Successfully connected to ${config.id}`); } catch (error) { logger.error(`Failed to connect to ${config.id}:`, error); /** * Schedule reconnection attempt * Try to recover from connection failure */ this.scheduleReconnect(config); throw error; } } /** * Create transport instance based on configuration * Different transport types for different services */ private async createTransport(config: MCPServiceConfig): Promise<any> { switch (config.transport) { case 'stdio': /** * Stdio transport for local processes * Used for services running as local executables */ if (!config.command) { throw new Error('Stdio transport requires command'); } return new StdioClientTransport({ command: config.command, args: config.args || [] }); case 'websocket': /** * WebSocket transport for network services * Used for services accessible over WebSocket */ if (!config.url) { throw new Error('WebSocket transport requires URL'); } return new WebSocketClientTransport(new URL(config.url)); case 'http': /** * HTTP transport would be implemented here * For REST-based MCP services */ throw new Error('HTTP transport not yet implemented'); default: throw new Error(`Unknown transport type: ${config.transport}`); } } /** * Set up monitoring for a connection * Handles disconnections and automatic reconnection */ private setupConnectionMonitoring(connection: MCPClientConnection): void { const { client, serviceId } = connection; /** * Monitor for connection errors * Attempt reconnection on failure */ client.onerror = (error) => { logger.error(`Connection error for ${serviceId}:`, error); this.handleDisconnection(connection); }; client.onclose = () => { logger.warn(`Connection closed for ${serviceId}`); this.handleDisconnection(connection); }; } /** * Handle service disconnection * Updates status and schedules reconnection */ private handleDisconnection(connection: MCPClientConnection): void { connection.connected = false; /** * Update service status in registry * Mark service as disconnected */ this.registry.updateServiceStatus(connection.serviceId, 'disconnected'); /** * Schedule reconnection attempt * Try to restore connection automatically */ this.scheduleReconnect(connection.config); } /** * Schedule reconnection attempt for a service * Implements exponential backoff for retries */ private scheduleReconnect(config: MCPServiceConfig): void { const connection = this.connections.get(config.id); const policy = config.reconnectPolicy || { maxRetries: 5, retryDelayMs: 5000, backoffMultiplier: 2 }; /** * Check if we should attempt reconnection * Respect maximum retry limit */ const attempts = connection?.reconnectAttempts || 0; if (attempts >= policy.maxRetries) { logger.error(`Max reconnection attempts reached for ${config.id}`); this.registry.updateServiceStatus(config.id, 'error'); return; } /** * Calculate delay with exponential backoff * Increases delay between retry attempts */ const delay = policy.retryDelayMs * Math.pow(policy.backoffMultiplier, attempts); logger.info(`Scheduling reconnection for ${config.id} in ${delay}ms (attempt ${attempts + 1})`); /** * Clear existing timer if any * Prevent multiple reconnection attempts */ const existingTimer = this.reconnectTimers.get(config.id); if (existingTimer) { clearTimeout(existingTimer); } /** * Schedule reconnection * Attempt to restore connection after delay */ const timer = setTimeout(async () => { logger.info(`Attempting reconnection to ${config.id}`); try { await this.connectToService(config); // Reset attempts on successful connection const conn = this.connections.get(config.id); if (conn) { conn.reconnectAttempts = 0; } } catch (error) { // Update attempts and let it reschedule const conn = this.connections.get(config.id); if (conn) { conn.reconnectAttempts = attempts + 1; } } this.reconnectTimers.delete(config.id); }, delay); this.reconnectTimers.set(config.id, timer); } /** * List tools available from a service * Queries the service for its tool catalog */ async listServiceTools(serviceId: string): Promise<Tool[]> { const connection = this.connections.get(serviceId); if (!connection || !connection.connected) { throw new Error(`Service ${serviceId} is not connected`); } try { const response = await connection.client.listTools(); return response.tools || []; } catch (error) { logger.error(`Failed to list tools for ${serviceId}:`, error); throw error; } } /** * Execute a tool on a service * Sends tool execution request via MCP protocol */ async executeServiceTool( serviceId: string, toolName: string, arguments: Record<string, any> ): Promise<any> { const connection = this.connections.get(serviceId); if (!connection || !connection.connected) { throw new Error(`Service ${serviceId} is not connected`); } logger.debug(`Executing tool ${toolName} on ${serviceId}`, { arguments }); try { const response = await connection.client.callTool({ name: toolName, arguments }); return response.content; } catch (error) { logger.error(`Tool execution failed on ${serviceId}:`, error); throw error; } } /** * List resources available from a service * Queries the service for available resources */ async listServiceResources(serviceId: string): Promise<Resource[]> { const connection = this.connections.get(serviceId); if (!connection || !connection.connected) { throw new Error(`Service ${serviceId} is not connected`); } try { const response = await connection.client.listResources(); return response.resources || []; } catch (error) { logger.error(`Failed to list resources for ${serviceId}:`, error); throw error; } } /** * Read a resource from a service * Fetches resource content via MCP protocol */ async readServiceResource(serviceId: string, uri: string): Promise<string> { const connection = this.connections.get(serviceId); if (!connection || !connection.connected) { throw new Error(`Service ${serviceId} is not connected`); } try { const response = await connection.client.readResource({ uri }); // Extract text content from response const textContent = response.contents ?.filter(c => c.type === 'text') .map(c => c.text) .join('\n'); return textContent || ''; } catch (error) { logger.error(`Failed to read resource from ${serviceId}:`, error); throw error; } } /** * List prompts available from a service * Queries the service for available prompts */ async listServicePrompts(serviceId: string): Promise<Prompt[]> { const connection = this.connections.get(serviceId); if (!connection || !connection.connected) { throw new Error(`Service ${serviceId} is not connected`); } try { const response = await connection.client.listPrompts(); return response.prompts || []; } catch (error) { logger.error(`Failed to list prompts for ${serviceId}:`, error); throw error; } } /** * Get a specific prompt from a service * Fetches prompt template with arguments */ async getServicePrompt( serviceId: string, promptName: string, arguments?: Record<string, string> ): Promise<any> { const connection = this.connections.get(serviceId); if (!connection || !connection.connected) { throw new Error(`Service ${serviceId} is not connected`); } try { const response = await connection.client.getPrompt({ name: promptName, arguments }); return response; } catch (error) { logger.error(`Failed to get prompt from ${serviceId}:`, error); throw error; } } /** * Get service-specific context * Queries service for contextual information */ async getServiceContext(serviceId: string): Promise<any> { const connection = this.connections.get(serviceId); if (!connection || !connection.connected) { return { status: 'disconnected' }; } try { // Query service for any contextual information // This could be service-specific metadata, state, etc. const tools = await this.listServiceTools(serviceId); const resources = await this.listServiceResources(serviceId); return { status: 'connected', toolCount: tools.length, resourceCount: resources.length, capabilities: connection.capabilities }; } catch (error) { logger.error(`Failed to get context from ${serviceId}:`, error); return { status: 'error', error: String(error) }; } } /** * Disconnect from a service * Cleanly closes the connection to an MCP service */ async disconnectService(serviceId: string): Promise<void> { const connection = this.connections.get(serviceId); if (!connection) { logger.warn(`Cannot disconnect unknown service: ${serviceId}`); return; } logger.info(`Disconnecting from ${serviceId}`); try { /** * Close the client connection * Properly terminate MCP protocol session */ await connection.client.close(); /** * Clean up transport if needed * Release transport resources */ if (connection.transport && typeof connection.transport.close === 'function') { await connection.transport.close(); } } catch (error) { logger.error(`Error disconnecting from ${serviceId}:`, error); } /** * Clean up connection tracking * Remove from active connections */ this.connections.delete(serviceId); /** * Cancel any pending reconnection * Stop reconnection attempts */ const timer = this.reconnectTimers.get(serviceId); if (timer) { clearTimeout(timer); this.reconnectTimers.delete(serviceId); } /** * Unregister from service registry * Remove service from available services */ this.registry.unregisterService(serviceId); } /** * Disconnect from all services * Clean shutdown of all connections */ async disconnectAll(): Promise<void> { logger.info('Disconnecting from all services'); const disconnectPromises = Array.from(this.connections.keys()).map( serviceId => this.disconnectService(serviceId) ); await Promise.all(disconnectPromises); logger.info('All services disconnected'); } /** * Get connection statistics * Provides overview of connection states */ getConnectionStats(): { total: number; connected: number; disconnected: number; reconnecting: number; } { let connected = 0; let disconnected = 0; let reconnecting = 0; for (const connection of this.connections.values()) { if (connection.connected) { connected++; } else if (this.reconnectTimers.has(connection.serviceId)) { reconnecting++; } else { disconnected++; } } return { total: this.connections.size, connected, disconnected, reconnecting }; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/YobieBen/llama-maverick-hub-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server