/**
* MCP Client Manager Module
*
* Author: Yobie Benjamin
* Version: 0.2
* Date: July 28, 2025
*
* Manages MCP client connections to external services.
* This module handles the hub's role as an MCP client connecting to services like Stripe.
*/
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { WebSocketClientTransport } from '@modelcontextprotocol/sdk/client/websocket.js';
import winston from 'winston';
import { ServiceRegistry, ServiceMetadata } from '../registry/service-registry.js';
import type { Tool, Resource, Prompt } from '@modelcontextprotocol/sdk/types.js';
const logger = winston.createLogger({
level: 'debug',
format: winston.format.simple()
});
/**
* MCP service configuration
* Defines how to connect to an external MCP service
*/
export interface MCPServiceConfig {
id: string;
name: string;
description: string;
transport: 'stdio' | 'websocket' | 'http';
endpoint: string;
command?: string; // For stdio transport
args?: string[]; // For stdio transport
url?: string; // For websocket/http transport
headers?: Record<string, string>; // For http transport
reconnectPolicy?: {
maxRetries: number;
retryDelayMs: number;
backoffMultiplier: number;
};
}
/**
* Active MCP client connection
* Represents a live connection to an external MCP service
*/
interface MCPClientConnection {
serviceId: string;
config: MCPServiceConfig;
client: Client;
transport: any; // Transport instance
connected: boolean;
reconnectAttempts: number;
capabilities?: {
tools: boolean;
resources: boolean;
prompts: boolean;
completion: boolean;
};
}
/**
* Manages all MCP client connections from the hub to external services
* Handles connection lifecycle, protocol communication, and error recovery
*/
export class MCPClientManager {
private registry: ServiceRegistry;
private connections: Map<string, MCPClientConnection>;
private reconnectTimers: Map<string, NodeJS.Timeout>;
constructor(registry: ServiceRegistry) {
/**
* Initialize with service registry
* Registry tracks all available services
*/
this.registry = registry;
this.connections = new Map();
this.reconnectTimers = new Map();
}
/**
* Connect to multiple MCP services
* Establishes connections based on service configurations
*/
async connectToServices(configs: MCPServiceConfig[]): Promise<void> {
logger.info(`Connecting to ${configs.length} MCP services...`);
/**
* Connect to each service in parallel
* Failures in one connection don't block others
*/
const connectionPromises = configs.map(config =>
this.connectToService(config).catch(error => {
logger.error(`Failed to connect to ${config.id}:`, error);
return null;
})
);
await Promise.all(connectionPromises);
logger.info(`Connected to ${this.connections.size} services`);
}
/**
* Connect to a single MCP service
* Establishes connection based on transport type
*/
async connectToService(config: MCPServiceConfig): Promise<void> {
logger.info(`Connecting to MCP service: ${config.id}`, config);
try {
/**
* Create transport based on configuration
* Different services use different transport mechanisms
*/
const transport = await this.createTransport(config);
/**
* Create MCP client instance
* This client will communicate using the MCP protocol
*/
const client = new Client(
{
name: `llama-hub-client-${config.id}`,
version: '0.2.0'
},
{
capabilities: {}
}
);
/**
* Connect the client to the transport
* Establishes the MCP protocol connection
*/
await client.connect(transport);
/**
* Query service capabilities
* Determine what features the service supports
*/
const serverInfo = await client.getServerInfo();
const capabilities = serverInfo.capabilities || {};
/**
* Store connection information
* Track active connection for future use
*/
const connection: MCPClientConnection = {
serviceId: config.id,
config,
client,
transport,
connected: true,
reconnectAttempts: 0,
capabilities: {
tools: !!capabilities.tools,
resources: !!capabilities.resources,
prompts: !!capabilities.prompts,
completion: !!capabilities.completion
}
};
this.connections.set(config.id, connection);
/**
* Register service in the registry
* Make service available to the hub
*/
this.registry.registerService({
id: config.id,
name: config.name,
description: config.description,
version: serverInfo.serverInfo?.version || 'unknown',
endpoint: config.endpoint,
transport: config.transport,
capabilities: connection.capabilities,
status: 'connected',
lastHeartbeat: new Date()
});
/**
* Set up connection monitoring
* Handle disconnections and errors
*/
this.setupConnectionMonitoring(connection);
logger.info(`Successfully connected to ${config.id}`);
} catch (error) {
logger.error(`Failed to connect to ${config.id}:`, error);
/**
* Schedule reconnection attempt
* Try to recover from connection failure
*/
this.scheduleReconnect(config);
throw error;
}
}
/**
* Create transport instance based on configuration
* Different transport types for different services
*/
private async createTransport(config: MCPServiceConfig): Promise<any> {
switch (config.transport) {
case 'stdio':
/**
* Stdio transport for local processes
* Used for services running as local executables
*/
if (!config.command) {
throw new Error('Stdio transport requires command');
}
return new StdioClientTransport({
command: config.command,
args: config.args || []
});
case 'websocket':
/**
* WebSocket transport for network services
* Used for services accessible over WebSocket
*/
if (!config.url) {
throw new Error('WebSocket transport requires URL');
}
return new WebSocketClientTransport(new URL(config.url));
case 'http':
/**
* HTTP transport would be implemented here
* For REST-based MCP services
*/
throw new Error('HTTP transport not yet implemented');
default:
throw new Error(`Unknown transport type: ${config.transport}`);
}
}
/**
* Set up monitoring for a connection
* Handles disconnections and automatic reconnection
*/
private setupConnectionMonitoring(connection: MCPClientConnection): void {
const { client, serviceId } = connection;
/**
* Monitor for connection errors
* Attempt reconnection on failure
*/
client.onerror = (error) => {
logger.error(`Connection error for ${serviceId}:`, error);
this.handleDisconnection(connection);
};
client.onclose = () => {
logger.warn(`Connection closed for ${serviceId}`);
this.handleDisconnection(connection);
};
}
/**
* Handle service disconnection
* Updates status and schedules reconnection
*/
private handleDisconnection(connection: MCPClientConnection): void {
connection.connected = false;
/**
* Update service status in registry
* Mark service as disconnected
*/
this.registry.updateServiceStatus(connection.serviceId, 'disconnected');
/**
* Schedule reconnection attempt
* Try to restore connection automatically
*/
this.scheduleReconnect(connection.config);
}
/**
* Schedule reconnection attempt for a service
* Implements exponential backoff for retries
*/
private scheduleReconnect(config: MCPServiceConfig): void {
const connection = this.connections.get(config.id);
const policy = config.reconnectPolicy || {
maxRetries: 5,
retryDelayMs: 5000,
backoffMultiplier: 2
};
/**
* Check if we should attempt reconnection
* Respect maximum retry limit
*/
const attempts = connection?.reconnectAttempts || 0;
if (attempts >= policy.maxRetries) {
logger.error(`Max reconnection attempts reached for ${config.id}`);
this.registry.updateServiceStatus(config.id, 'error');
return;
}
/**
* Calculate delay with exponential backoff
* Increases delay between retry attempts
*/
const delay = policy.retryDelayMs * Math.pow(policy.backoffMultiplier, attempts);
logger.info(`Scheduling reconnection for ${config.id} in ${delay}ms (attempt ${attempts + 1})`);
/**
* Clear existing timer if any
* Prevent multiple reconnection attempts
*/
const existingTimer = this.reconnectTimers.get(config.id);
if (existingTimer) {
clearTimeout(existingTimer);
}
/**
* Schedule reconnection
* Attempt to restore connection after delay
*/
const timer = setTimeout(async () => {
logger.info(`Attempting reconnection to ${config.id}`);
try {
await this.connectToService(config);
// Reset attempts on successful connection
const conn = this.connections.get(config.id);
if (conn) {
conn.reconnectAttempts = 0;
}
} catch (error) {
// Update attempts and let it reschedule
const conn = this.connections.get(config.id);
if (conn) {
conn.reconnectAttempts = attempts + 1;
}
}
this.reconnectTimers.delete(config.id);
}, delay);
this.reconnectTimers.set(config.id, timer);
}
/**
* List tools available from a service
* Queries the service for its tool catalog
*/
async listServiceTools(serviceId: string): Promise<Tool[]> {
const connection = this.connections.get(serviceId);
if (!connection || !connection.connected) {
throw new Error(`Service ${serviceId} is not connected`);
}
try {
const response = await connection.client.listTools();
return response.tools || [];
} catch (error) {
logger.error(`Failed to list tools for ${serviceId}:`, error);
throw error;
}
}
/**
* Execute a tool on a service
* Sends tool execution request via MCP protocol
*/
async executeServiceTool(
serviceId: string,
toolName: string,
arguments: Record<string, any>
): Promise<any> {
const connection = this.connections.get(serviceId);
if (!connection || !connection.connected) {
throw new Error(`Service ${serviceId} is not connected`);
}
logger.debug(`Executing tool ${toolName} on ${serviceId}`, { arguments });
try {
const response = await connection.client.callTool({
name: toolName,
arguments
});
return response.content;
} catch (error) {
logger.error(`Tool execution failed on ${serviceId}:`, error);
throw error;
}
}
/**
* List resources available from a service
* Queries the service for available resources
*/
async listServiceResources(serviceId: string): Promise<Resource[]> {
const connection = this.connections.get(serviceId);
if (!connection || !connection.connected) {
throw new Error(`Service ${serviceId} is not connected`);
}
try {
const response = await connection.client.listResources();
return response.resources || [];
} catch (error) {
logger.error(`Failed to list resources for ${serviceId}:`, error);
throw error;
}
}
/**
* Read a resource from a service
* Fetches resource content via MCP protocol
*/
async readServiceResource(serviceId: string, uri: string): Promise<string> {
const connection = this.connections.get(serviceId);
if (!connection || !connection.connected) {
throw new Error(`Service ${serviceId} is not connected`);
}
try {
const response = await connection.client.readResource({ uri });
// Extract text content from response
const textContent = response.contents
?.filter(c => c.type === 'text')
.map(c => c.text)
.join('\n');
return textContent || '';
} catch (error) {
logger.error(`Failed to read resource from ${serviceId}:`, error);
throw error;
}
}
/**
* List prompts available from a service
* Queries the service for available prompts
*/
async listServicePrompts(serviceId: string): Promise<Prompt[]> {
const connection = this.connections.get(serviceId);
if (!connection || !connection.connected) {
throw new Error(`Service ${serviceId} is not connected`);
}
try {
const response = await connection.client.listPrompts();
return response.prompts || [];
} catch (error) {
logger.error(`Failed to list prompts for ${serviceId}:`, error);
throw error;
}
}
/**
* Get a specific prompt from a service
* Fetches prompt template with arguments
*/
async getServicePrompt(
serviceId: string,
promptName: string,
arguments?: Record<string, string>
): Promise<any> {
const connection = this.connections.get(serviceId);
if (!connection || !connection.connected) {
throw new Error(`Service ${serviceId} is not connected`);
}
try {
const response = await connection.client.getPrompt({
name: promptName,
arguments
});
return response;
} catch (error) {
logger.error(`Failed to get prompt from ${serviceId}:`, error);
throw error;
}
}
/**
* Get service-specific context
* Queries service for contextual information
*/
async getServiceContext(serviceId: string): Promise<any> {
const connection = this.connections.get(serviceId);
if (!connection || !connection.connected) {
return { status: 'disconnected' };
}
try {
// Query service for any contextual information
// This could be service-specific metadata, state, etc.
const tools = await this.listServiceTools(serviceId);
const resources = await this.listServiceResources(serviceId);
return {
status: 'connected',
toolCount: tools.length,
resourceCount: resources.length,
capabilities: connection.capabilities
};
} catch (error) {
logger.error(`Failed to get context from ${serviceId}:`, error);
return { status: 'error', error: String(error) };
}
}
/**
* Disconnect from a service
* Cleanly closes the connection to an MCP service
*/
async disconnectService(serviceId: string): Promise<void> {
const connection = this.connections.get(serviceId);
if (!connection) {
logger.warn(`Cannot disconnect unknown service: ${serviceId}`);
return;
}
logger.info(`Disconnecting from ${serviceId}`);
try {
/**
* Close the client connection
* Properly terminate MCP protocol session
*/
await connection.client.close();
/**
* Clean up transport if needed
* Release transport resources
*/
if (connection.transport && typeof connection.transport.close === 'function') {
await connection.transport.close();
}
} catch (error) {
logger.error(`Error disconnecting from ${serviceId}:`, error);
}
/**
* Clean up connection tracking
* Remove from active connections
*/
this.connections.delete(serviceId);
/**
* Cancel any pending reconnection
* Stop reconnection attempts
*/
const timer = this.reconnectTimers.get(serviceId);
if (timer) {
clearTimeout(timer);
this.reconnectTimers.delete(serviceId);
}
/**
* Unregister from service registry
* Remove service from available services
*/
this.registry.unregisterService(serviceId);
}
/**
* Disconnect from all services
* Clean shutdown of all connections
*/
async disconnectAll(): Promise<void> {
logger.info('Disconnecting from all services');
const disconnectPromises = Array.from(this.connections.keys()).map(
serviceId => this.disconnectService(serviceId)
);
await Promise.all(disconnectPromises);
logger.info('All services disconnected');
}
/**
* Get connection statistics
* Provides overview of connection states
*/
getConnectionStats(): {
total: number;
connected: number;
disconnected: number;
reconnecting: number;
} {
let connected = 0;
let disconnected = 0;
let reconnecting = 0;
for (const connection of this.connections.values()) {
if (connection.connected) {
connected++;
} else if (this.reconnectTimers.has(connection.serviceId)) {
reconnecting++;
} else {
disconnected++;
}
}
return {
total: this.connections.size,
connected,
disconnected,
reconnecting
};
}
}