manager.tsโข11.2 kB
/**
* Connection manager for handling multiple MCP server connections
*/
import { EventEmitter } from "events";
import { ServerConfig } from "../types/config.js";
import {
Connection,
ConnectionEventType,
ConnectionEventCallback,
ConnectionEvent,
ConnectionStatus,
ConnectionPoolConfig,
IConnectionManager,
IConnectionPool,
} from "./types.js";
import { ConnectionPool } from "./pool.js";
import { ConnectionFactory } from "./factory.js";
import { HealthMonitor } from "./healthMonitor.js";
import { Logger, createChildLogger } from "../utils/logging.js";
import { RecoveryCoordinator } from "../errors/recovery.js";
/**
* Connection manager orchestrates connections to multiple MCP servers
*/
export class ConnectionManager
extends EventEmitter
implements IConnectionManager
{
private _pool: IConnectionPool;
private _healthMonitor: HealthMonitor;
private _logger: Logger;
private _recoveryCoordinator: RecoveryCoordinator;
private servers: Record<string, ServerConfig> = {};
private isInitialized = false;
private isStarted = false;
constructor(
poolConfig: ConnectionPoolConfig = {},
connectionFactory?: ConnectionFactory
) {
super();
this._pool = new ConnectionPool(poolConfig, connectionFactory);
this._healthMonitor = new HealthMonitor();
this._logger = createChildLogger({ module: "ConnectionManager" });
this._recoveryCoordinator = new RecoveryCoordinator();
this.setupPoolEventForwarding();
}
/**
* Get the connection pool
*/
get pool(): IConnectionPool {
return this._pool;
}
/**
* Get the status of all connections
*/
get status(): Record<string, ConnectionStatus> {
return this._pool.status;
}
/**
* Initialize the connection manager with server configurations
*/
async initialize(servers: Record<string, ServerConfig>): Promise<void> {
if (this.isInitialized) {
throw new Error("Connection manager is already initialized");
}
// Deduplicate server names (case-insensitive, trimmed) before adding to pool
const dedupedServers: Record<string, ServerConfig> = {};
const seen = new Set<string>();
for (const [serverName, config] of Object.entries(servers)) {
const normalized = serverName.trim().toLowerCase();
if (seen.has(normalized)) {
this._logger.error(
`Duplicate server name detected during initialization: "${serverName}"`,
{
serverName,
}
);
continue;
}
seen.add(normalized);
dedupedServers[serverName] = config;
}
this.servers = dedupedServers;
// Validate all server configurations
this.validateServerConfigurations();
// Add all servers to the pool concurrently
const serverEntries = Object.entries(this.servers);
const results = await Promise.allSettled(
serverEntries.map(([serverName, config]) =>
this._pool.addConnection(serverName, config)
)
);
// Log failures but continue initialization
results.forEach((result, index) => {
const [serverName] = serverEntries[index];
if (result.status === "rejected") {
this._logger.error(
`โ ๏ธ Failed to initialize server "${serverName}": ${(result.reason as Error).message}`
);
// Remove failed server from registry
delete this.servers[serverName];
}
});
this.isInitialized = true;
this.emit("initialized", {
serverCount: Object.keys(this.servers).length,
servers: Object.keys(this.servers),
});
}
/**
* Connect to a specific server
*/
async connect(serverName: string): Promise<void> {
this.ensureInitialized();
const connection = this._pool.getConnection(serverName);
if (!connection) {
throw new Error(`Server "${serverName}" not found in pool`);
}
if (connection.isConnected()) {
return;
}
try {
await connection.connect();
} catch (error) {
this._logger.error(`Failed to connect to server "${serverName}"`, {
error,
});
throw error;
}
}
/**
* Disconnect from a specific server
*/
async disconnect(serverName: string): Promise<void> {
this.ensureInitialized();
const connection = this._pool.getConnection(serverName);
if (!connection) {
return;
}
try {
await connection.disconnect();
} catch (error) {
this._logger.error(`Failed to disconnect from server "${serverName}"`, {
error,
});
throw error;
}
}
/**
* Reconnect to a specific server
*/
async reconnect(serverName: string): Promise<void> {
this.ensureInitialized();
await this.disconnect(serverName);
await this.connect(serverName);
}
/**
* Get a specific connection
*/
getConnection(serverName: string): Connection | undefined {
return this._pool.getConnection(serverName);
}
/**
* Get names of all connected servers
*/
getConnectedServers(): string[] {
return this._pool.getConnectedServers();
}
/**
* Check if a specific server is connected
*/
isServerConnected(serverName: string): boolean {
const connection = this._pool.getConnection(serverName);
return connection?.isConnected() ?? false;
}
/**
* Start the connection manager
*/
async start(): Promise<void> {
this.ensureInitialized();
if (this.isStarted) {
return;
}
try {
await this._pool.start();
this.isStarted = true;
this.emit("started", {
serverCount: Object.keys(this.servers).length,
connectedServers: this.getConnectedServers(),
});
} catch (error) {
this._logger.error("Failed to start connection manager", { error });
throw error;
}
}
/**
* Stop the connection manager
*/
async stop(): Promise<void> {
if (!this.isStarted) {
return;
}
try {
await this._pool.stop();
this.isStarted = false;
this.emit("stopped", {
serverCount: Object.keys(this.servers).length,
});
} catch (error) {
this._logger.error("Failed to stop connection manager", { error });
throw error;
}
}
/**
* Add a new server configuration
*/
async addServer(serverName: string, config: ServerConfig): Promise<void> {
this.ensureInitialized();
if (this.servers[serverName]) {
const error = new Error(
`โ Server name conflict detected: "${serverName}" already exists.\n` +
`๐ก Resolution: Use a unique server name or remove the existing server first.\n` +
`๐ Existing servers: ${Object.keys(this.servers).join(", ")}`
);
this._logger.error("Server name conflict detected", {
serverName,
existingServers: Object.keys(this.servers),
});
throw error;
}
this.servers[serverName] = config;
await this._pool.addConnection(serverName, config);
// Add to health monitor
const connection = this._pool.getConnection(serverName);
if (connection) {
this._healthMonitor.addConnection(connection);
}
// Auto-connect if manager is started
if (this.isStarted) {
try {
await this.connect(serverName);
} catch (error) {
this._logger.warn(
`Failed to auto-connect to new server "${serverName}"`,
{ error }
);
}
}
this.emit("serverAdded", { serverName, config });
}
/**
* Remove a server configuration
*/
async removeServer(serverName: string): Promise<void> {
this.ensureInitialized();
if (!this.servers[serverName]) {
return;
}
await this._pool.removeConnection(serverName);
delete this.servers[serverName];
this.emit("serverRemoved", { serverName });
}
/**
* Get server configuration
*/
getServerConfig(serverName: string): ServerConfig | undefined {
return this.servers[serverName];
}
/**
* Get all server names
*/
getServerNames(): string[] {
return Object.keys(this.servers);
}
/**
* Get connection statistics
*/
getStats() {
const serverNames = this.getServerNames();
const connectedServers = this.getConnectedServers();
return {
totalServers: serverNames.length,
connectedServers: connectedServers.length,
disconnectedServers: serverNames.length - connectedServers.length,
connectionRate:
serverNames.length > 0
? connectedServers.length / serverNames.length
: 0,
activeConnections: this._pool.activeConnections,
poolSize: this._pool.size,
};
}
/**
* Validate server configurations
*/
private validateServerConfigurations(): void {
for (const [serverName, config] of Object.entries(this.servers)) {
if (!config || typeof config !== "object") {
throw new Error(`Invalid configuration for server "${serverName}"`);
}
if (
!config.type ||
(config.type !== "stdio" &&
config.type !== "http" &&
config.type !== "sse")
) {
throw new Error(
`Invalid transport type for server "${serverName}": ${(config as any).type}`
);
}
}
}
/**
* Setup event forwarding from the pool
*/
private setupPoolEventForwarding(): void {
const eventTypes: ConnectionEventType[] = [
"connecting",
"connected",
"disconnected",
"reconnecting",
"failed",
"error",
];
eventTypes.forEach((eventType) => {
this._pool.on(eventType, (event: ConnectionEvent) => {
this.emit(eventType, event);
});
});
// Forward pool-specific events
this._pool.on("poolStarted" as any, () => {
this.emit("poolStarted" as any);
});
this._pool.on("poolStopped" as any, () => {
this.emit("poolStopped" as any);
});
this._pool.on("connectionAdded" as any, (data: any) => {
this.emit("connectionAdded" as any, data);
});
this._pool.on("connectionRemoved" as any, (data: any) => {
this.emit("connectionRemoved" as any, data);
});
}
/**
* Ensure the manager is initialized
*/
private ensureInitialized(): void {
if (!this.isInitialized) {
throw new Error(
"Connection manager not initialized. Call initialize() first."
);
}
}
/**
* Override EventEmitter methods for type safety
*/
on(event: ConnectionEventType, listener: ConnectionEventCallback): this;
on(event: string, listener: (...args: any[]) => void): this;
on(event: string | ConnectionEventType, listener: any): this {
return super.on(event, listener);
}
off(event: ConnectionEventType, listener: ConnectionEventCallback): this;
off(event: string, listener: (...args: any[]) => void): this;
off(event: string | ConnectionEventType, listener: any): this {
return super.off(event, listener);
}
emit(event: ConnectionEventType, data: ConnectionEvent): boolean;
emit(event: string, ...args: any[]): boolean;
emit(event: string | ConnectionEventType, ...args: any[]): boolean {
return super.emit(event, ...args);
}
}