import { EventEmitter } from 'events';
import { Client as SSHClient, ConnectConfig } from 'ssh2';
import { v4 as uuidv4 } from 'uuid';
import { readFileSync } from 'fs';
import { Logger } from '../utils/logger.js';
import {
PooledConnection,
ConnectionPoolConfig,
ConnectionPoolStats,
SSHConnectionOptions,
HealthCheckResult,
} from '../types/index.js';
export interface CircuitBreakerState {
state: 'closed' | 'open' | 'half-open';
failures: number;
lastFailure: Date;
isOpen: boolean;
successCount: number;
nextAttemptTime: number;
responseTimeHistory: number[];
healthScore: number;
}
/**
* Production-ready SSH Connection Pool
* Manages reusable SSH connections with health checks, load balancing, and automatic cleanup
*/
export class ConnectionPool extends EventEmitter {
private connections: Map<string, PooledConnection> = new Map();
private connectionsByHost: Map<string, Set<string>> = new Map();
private roundRobinIndex: Map<string, number> = new Map();
private config: ConnectionPoolConfig;
private logger: Logger;
private healthCheckInterval: NodeJS.Timeout | null = null;
private cleanupInterval: NodeJS.Timeout | null = null;
private circuitBreakers: Map<string, CircuitBreakerState> = new Map();
private connectionHealthHistory: Map<string, HealthCheckResult[]> = new Map();
private metrics: {
totalConnections: number;
connectionsCreated: number;
connectionsDestroyed: number;
healthChecksPerformed: number;
reconnectionAttempts: number;
circuitBreakerTrips: number;
successfulHealthChecks: number;
failedHealthChecks: number;
averageResponseTime: number;
predictiveFailuresPrevented: number;
};
constructor(config: Partial<ConnectionPoolConfig> = {}) {
super();
this.config = {
maxConnectionsPerHost: config.maxConnectionsPerHost ?? 5,
connectionIdleTimeout: config.connectionIdleTimeout ?? 5 * 60 * 1000, // 5 minutes
keepAliveInterval: config.keepAliveInterval ?? 30 * 1000, // 30 seconds
connectionRetryAttempts: config.connectionRetryAttempts ?? 3,
healthCheckInterval: config.healthCheckInterval ?? 60 * 1000, // 1 minute
cleanupInterval: config.cleanupInterval ?? 2 * 60 * 1000, // 2 minutes
enableMetrics: config.enableMetrics ?? true,
enableLogging: config.enableLogging ?? true,
poolingStrategy: config.poolingStrategy ?? 'least-connections',
connectionTimeout: config.connectionTimeout ?? 30 * 1000, // 30 seconds
maxReconnectAttempts: config.maxReconnectAttempts ?? 5,
circuitBreakerThreshold: config.circuitBreakerThreshold ?? 3,
};
this.logger = new Logger('ConnectionPool');
this.metrics = {
totalConnections: 0,
connectionsCreated: 0,
connectionsDestroyed: 0,
healthChecksPerformed: 0,
reconnectionAttempts: 0,
circuitBreakerTrips: 0,
successfulHealthChecks: 0,
failedHealthChecks: 0,
averageResponseTime: 0,
predictiveFailuresPrevented: 0,
};
this.startHealthChecks();
this.startCleanupProcess();
if (this.config.enableLogging) {
this.logger.info('ConnectionPool initialized with config:', this.config);
}
}
/**
* Get or create a connection to the specified host
*/
async getConnection(
options: SSHConnectionOptions
): Promise<PooledConnection> {
const hostKey = `${options.host}:${options.port || 22}:${options.username}`;
// Check circuit breaker
const breaker = this.circuitBreakers.get(hostKey);
if (breaker?.isOpen) {
if (Date.now() < breaker.nextAttemptTime) {
const waitTime = Math.ceil(
(breaker.nextAttemptTime - Date.now()) / 1000
);
throw new Error(
`Circuit breaker is OPEN for ${hostKey}. ` +
`Too many recent failures (${breaker.failures}). ` +
`Wait ${waitTime}s before retry. ` +
`Health score: ${breaker.healthScore}/100`
);
} else {
// Transition to half-open
breaker.state = 'half-open';
breaker.isOpen = false;
breaker.successCount = 0;
if (this.config.enableLogging) {
this.logger.info(
`Circuit breaker ${hostKey} transitioning to half-open state`
);
}
}
}
// Try to get existing healthy connection
const existingConnection = await this.getExistingConnection(
hostKey,
options
);
if (existingConnection) {
existingConnection.lastUsed = new Date();
existingConnection.activeSessionCount++;
if (this.config.enableLogging) {
this.logger.debug(
`Reusing existing connection ${existingConnection.id} for ${hostKey}`
);
}
return existingConnection;
}
// Check if we can create a new connection
const hostConnections = this.connectionsByHost.get(hostKey);
if (
hostConnections &&
hostConnections.size >= this.config.maxConnectionsPerHost
) {
// Try to find a less busy connection
const connections = Array.from(hostConnections)
.map((id) => this.connections.get(id)!)
.filter((conn) => conn.isHealthy)
.sort((a, b) => a.activeSessionCount - b.activeSessionCount);
if (connections.length > 0) {
const connection = connections[0];
connection.lastUsed = new Date();
connection.activeSessionCount++;
if (this.config.enableLogging) {
this.logger.debug(
`Using least busy connection ${connection.id} for ${hostKey}`
);
}
return connection;
}
throw new Error(
`Maximum connections (${this.config.maxConnectionsPerHost}) reached for host ${hostKey}`
);
}
// Create new connection
return await this.createConnection(options);
}
/**
* Release a connection back to the pool
*/
async releaseConnection(connectionId: string): Promise<void> {
const connection = this.connections.get(connectionId);
if (!connection) {
if (this.config.enableLogging) {
this.logger.warn(
`Attempted to release unknown connection ${connectionId}`
);
}
return;
}
connection.activeSessionCount = Math.max(
0,
connection.activeSessionCount - 1
);
connection.lastUsed = new Date();
if (this.config.enableLogging) {
this.logger.debug(
`Released connection ${connectionId}, active sessions: ${connection.activeSessionCount}`
);
}
this.emit('connectionReleased', {
connectionId,
activeSessionCount: connection.activeSessionCount,
});
}
/**
* Close a specific connection
*/
async closeConnection(connectionId: string): Promise<void> {
const connection = this.connections.get(connectionId);
if (!connection) {
return;
}
const hostKey = `${connection.host}:${connection.port}:${connection.username}`;
try {
if (
connection.connection &&
typeof connection.connection.end === 'function'
) {
connection.connection.end();
}
} catch (error) {
if (this.config.enableLogging) {
this.logger.error(`Error closing connection ${connectionId}:`, error);
}
}
// Remove from tracking
this.connections.delete(connectionId);
const hostConnections = this.connectionsByHost.get(hostKey);
if (hostConnections) {
hostConnections.delete(connectionId);
if (hostConnections.size === 0) {
this.connectionsByHost.delete(hostKey);
this.roundRobinIndex.delete(hostKey);
}
}
this.metrics.connectionsDestroyed++;
if (this.config.enableLogging) {
this.logger.info(`Closed connection ${connectionId} to ${hostKey}`);
}
this.emit('connectionClosed', { connectionId, hostKey });
}
/**
* Close all connections
*/
async closeAllConnections(): Promise<void> {
const connectionIds = Array.from(this.connections.keys());
if (this.config.enableLogging) {
this.logger.info(`Closing ${connectionIds.length} connections`);
}
await Promise.all(connectionIds.map((id) => this.closeConnection(id)));
}
/**
* Get connection statistics
*/
getStats(): ConnectionPoolStats {
const connections = Array.from(this.connections.values());
const now = Date.now();
const connectionsByHost: Record<string, number> = {};
this.connectionsByHost.forEach((connections, hostKey) => {
connectionsByHost[hostKey] = connections.size;
});
const averageConnectionAge =
connections.length > 0
? connections.reduce(
(sum, conn) => sum + (now - conn.createdAt.getTime()),
0
) / connections.length
: 0;
return {
totalConnections: connections.length,
activeConnections: connections.filter((c) => c.activeSessionCount > 0)
.length,
idleConnections: connections.filter((c) => c.activeSessionCount === 0)
.length,
healthyConnections: connections.filter((c) => c.isHealthy).length,
unhealthyConnections: connections.filter((c) => !c.isHealthy).length,
connectionsByHost,
averageConnectionAge,
totalReconnectAttempts: this.metrics.reconnectionAttempts,
lastHealthCheckAt: new Date(),
};
}
/**
* Get detailed metrics
*/
getMetrics() {
return {
...this.metrics,
circuitBreakerStates: Object.fromEntries(this.circuitBreakers.entries()),
poolStats: this.getStats(),
};
}
/**
* Shutdown the connection pool
*/
async shutdown(): Promise<void> {
if (this.config.enableLogging) {
this.logger.info('Shutting down connection pool');
}
// Clear intervals
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = null;
}
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
}
// Close all connections
await this.closeAllConnections();
this.removeAllListeners();
}
/**
* Create a new SSH connection
*/
private async createConnection(
options: SSHConnectionOptions
): Promise<PooledConnection> {
const connectionId = uuidv4();
const hostKey = `${options.host}:${options.port || 22}:${options.username}`;
if (this.config.enableLogging) {
this.logger.info(`Creating new connection ${connectionId} to ${hostKey}`);
}
const sshClient = new SSHClient();
const connectConfig: ConnectConfig = {
host: options.host,
port: options.port || 22,
username: options.username,
password: options.password,
privateKey:
options.privateKey ||
(options.privateKeyPath
? readFileSync(options.privateKeyPath)
: undefined),
passphrase: options.passphrase,
readyTimeout: options.readyTimeout || this.config.connectionTimeout,
keepaliveInterval:
options.keepAliveInterval || this.config.keepAliveInterval,
keepaliveCountMax: options.keepAliveCountMax || 3,
algorithms: {
serverHostKey: [
'rsa-sha2-512',
'rsa-sha2-256',
'ssh-rsa',
'ecdsa-sha2-nistp256',
'ecdsa-sha2-nistp384',
'ecdsa-sha2-nistp521',
'ssh-ed25519',
],
cipher: ['aes128-gcm', 'aes256-gcm', 'aes128-ctr', 'aes256-ctr'],
hmac: ['hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'],
compress: ['none'],
kex: [
'ecdh-sha2-nistp256',
'ecdh-sha2-nistp384',
'ecdh-sha2-nistp521',
'diffie-hellman-group14-sha256',
],
},
};
const connection: PooledConnection = {
id: connectionId,
host: options.host,
port: options.port || 22,
username: options.username,
connection: sshClient,
createdAt: new Date(),
lastUsed: new Date(),
activeSessionCount: 1,
isHealthy: false,
reconnectAttempts: 0,
maxReconnectAttempts: this.config.maxReconnectAttempts,
metadata: {
hostKey,
connectConfig,
},
};
try {
await this.connectSSH(sshClient, connectConfig);
connection.isHealthy = true;
this.connections.set(connectionId, connection);
// Track by host
if (!this.connectionsByHost.has(hostKey)) {
this.connectionsByHost.set(hostKey, new Set());
}
this.connectionsByHost.get(hostKey)!.add(connectionId);
// Setup event handlers
this.setupConnectionHandlers(connection);
this.metrics.connectionsCreated++;
this.metrics.totalConnections++;
if (this.config.enableLogging) {
this.logger.info(
`Successfully created connection ${connectionId} to ${hostKey}`
);
}
this.emit('connectionCreated', { connectionId, hostKey });
return connection;
} catch (error) {
// Handle connection failure
this.handleConnectionFailure(hostKey, error);
throw error;
}
}
/**
* Connect SSH client with timeout handling
*/
private async connectSSH(
client: SSHClient,
config: ConnectConfig
): Promise<void> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
client.destroy();
reject(
new Error(
`Connection timeout after ${this.config.connectionTimeout}ms`
)
);
}, this.config.connectionTimeout);
client.on('ready', () => {
clearTimeout(timeout);
resolve();
});
client.on('error', (error) => {
clearTimeout(timeout);
reject(error);
});
client.connect(config);
});
}
/**
* Setup event handlers for a connection
*/
private setupConnectionHandlers(connection: PooledConnection): void {
const sshClient = connection.connection;
sshClient.on('error', (error: Error) => {
connection.isHealthy = false;
if (this.config.enableLogging) {
this.logger.error(`Connection ${connection.id} error:`, error);
}
this.emit('connectionError', { connectionId: connection.id, error });
});
sshClient.on('close', () => {
connection.isHealthy = false;
if (this.config.enableLogging) {
this.logger.info(`Connection ${connection.id} closed`);
}
this.emit('connectionClosed', { connectionId: connection.id });
// Attempt reconnection if there are active sessions
if (
connection.activeSessionCount > 0 &&
connection.reconnectAttempts < connection.maxReconnectAttempts
) {
this.attemptReconnection(connection);
}
});
sshClient.on('end', () => {
connection.isHealthy = false;
if (this.config.enableLogging) {
this.logger.info(`Connection ${connection.id} ended`);
}
});
}
/**
* Get existing healthy connection using load balancing strategy
*/
private async getExistingConnection(
hostKey: string,
_options: SSHConnectionOptions
): Promise<PooledConnection | null> {
const hostConnections = this.connectionsByHost.get(hostKey);
if (!hostConnections || hostConnections.size === 0) {
return null;
}
const healthyConnections = Array.from(hostConnections)
.map((id) => this.connections.get(id)!)
.filter((conn) => conn && conn.isHealthy);
if (healthyConnections.length === 0) {
return null;
}
// Apply load balancing strategy
switch (this.config.poolingStrategy) {
case 'round-robin':
return this.getRoundRobinConnection(hostKey, healthyConnections);
case 'least-connections':
return healthyConnections.sort(
(a, b) => a.activeSessionCount - b.activeSessionCount
)[0];
case 'random':
return healthyConnections[
Math.floor(Math.random() * healthyConnections.length)
];
default:
return healthyConnections[0];
}
}
/**
* Get connection using round-robin strategy
*/
private getRoundRobinConnection(
hostKey: string,
connections: PooledConnection[]
): PooledConnection {
const currentIndex = this.roundRobinIndex.get(hostKey) || 0;
const connection = connections[currentIndex % connections.length];
this.roundRobinIndex.set(hostKey, (currentIndex + 1) % connections.length);
return connection;
}
/**
* Handle connection failure and update circuit breaker
*/
private handleConnectionFailure(
hostKey: string,
error: Error | unknown
): void {
this.recordCircuitBreakerFailure(hostKey, error);
}
/**
* Record circuit breaker failure with enhanced state management
*/
private recordCircuitBreakerFailure(
hostKey: string,
error: Error | unknown
): void {
let breaker = this.circuitBreakers.get(hostKey);
if (!breaker) {
breaker = {
state: 'closed',
failures: 0,
lastFailure: new Date(),
isOpen: false,
successCount: 0,
nextAttemptTime: 0,
responseTimeHistory: [],
healthScore: 100,
};
this.circuitBreakers.set(hostKey, breaker);
}
breaker.failures++;
breaker.lastFailure = new Date();
breaker.healthScore = Math.max(0, breaker.healthScore - 20);
// Transition to open state if threshold exceeded
if (
breaker.state === 'closed' &&
breaker.failures >= this.config.circuitBreakerThreshold
) {
breaker.state = 'open';
breaker.isOpen = true;
breaker.nextAttemptTime = Date.now() + 60000; // 1 minute window
this.metrics.circuitBreakerTrips++;
if (this.config.enableLogging) {
this.logger.warn(
`Circuit breaker opened for ${hostKey} after ${breaker.failures} failures`
);
}
this.emit('circuitBreakerTripped', {
hostKey,
failures: breaker.failures,
state: breaker.state,
healthScore: breaker.healthScore,
});
} else if (breaker.state === 'half-open') {
// Return to open state from half-open on failure
breaker.state = 'open';
breaker.isOpen = true;
breaker.nextAttemptTime = Date.now() + 60000;
if (this.config.enableLogging) {
this.logger.warn(
`Circuit breaker returned to open state for ${hostKey}`
);
}
}
if (this.config.enableLogging) {
this.logger.error(
`Connection failure for ${hostKey} (failure #${breaker.failures}):`,
error
);
}
}
/**
* Attempt to reconnect a failed connection
*/
private async attemptReconnection(
connection: PooledConnection
): Promise<void> {
if (connection.reconnectAttempts >= connection.maxReconnectAttempts) {
if (this.config.enableLogging) {
this.logger.warn(
`Max reconnection attempts reached for connection ${connection.id}`
);
}
return;
}
connection.reconnectAttempts++;
this.metrics.reconnectionAttempts++;
const delay = Math.min(
1000 * Math.pow(2, connection.reconnectAttempts - 1),
30000
); // Exponential backoff, max 30s
if (this.config.enableLogging) {
this.logger.info(
`Attempting reconnection ${connection.reconnectAttempts}/${connection.maxReconnectAttempts} for ${connection.id} in ${delay}ms`
);
}
setTimeout(async () => {
try {
const newSSHClient = new SSHClient();
const connectConfig = connection.metadata
?.connectConfig as ConnectConfig;
await this.connectSSH(newSSHClient, connectConfig);
// Replace the old connection
connection.connection.destroy();
connection.connection = newSSHClient;
connection.isHealthy = true;
connection.reconnectAttempts = 0;
this.setupConnectionHandlers(connection);
if (this.config.enableLogging) {
this.logger.info(
`Successfully reconnected connection ${connection.id}`
);
}
this.emit('connectionReconnected', { connectionId: connection.id });
} catch (error) {
if (this.config.enableLogging) {
this.logger.error(
`Reconnection failed for connection ${connection.id}:`,
error
);
}
// Try again if we haven't exceeded max attempts
if (connection.reconnectAttempts < connection.maxReconnectAttempts) {
await this.attemptReconnection(connection);
} else {
// Give up and close the connection
await this.closeConnection(connection.id);
}
}
}, delay);
}
/**
* Start periodic health checks
*/
private startHealthChecks(): void {
if (
!this.config.healthCheckInterval ||
this.config.healthCheckInterval <= 0
) {
return;
}
this.healthCheckInterval = setInterval(async () => {
await this.performHealthChecks();
}, this.config.healthCheckInterval);
}
/**
* Perform comprehensive health checks on all connections
*/
private async performHealthChecks(): Promise<void> {
const connections = Array.from(this.connections.values());
if (this.config.enableLogging && connections.length > 0) {
this.logger.debug(
`Performing health checks on ${connections.length} connections`
);
}
const healthCheckPromises = connections.map(async (connection) => {
const startTime = Date.now();
const checkId = `health-${connection.id}-${Date.now()}`;
try {
const healthResult =
await this.performConnectionHealthCheck(connection);
// Update connection health
connection.isHealthy =
healthResult.status === 'healthy' ||
healthResult.status === 'warning';
connection.healthCheckAt = new Date();
// Store health history
this.storeConnectionHealthHistory(connection.id, healthResult);
// Update metrics
this.metrics.healthChecksPerformed++;
if (healthResult.success) {
this.metrics.successfulHealthChecks++;
} else {
this.metrics.failedHealthChecks++;
}
// Update average response time
this.updateAverageResponseTime(healthResult.responseTime);
// Update circuit breaker state
this.updateCircuitBreakerHealth(connection, healthResult);
// Predictive failure analysis
const failureRisk = this.analyzeFailureRisk(
connection.id,
healthResult
);
if (failureRisk > 0.7) {
this.emit('predictive-failure-warning', {
connectionId: connection.id,
risk: failureRisk,
recommendations: this.generateConnectionRecommendations(
connection,
healthResult
),
});
}
// Emit health check result
this.emit('connection-health-check', {
connectionId: connection.id,
result: healthResult,
});
} catch (error) {
connection.isHealthy = false;
this.metrics.failedHealthChecks++;
if (this.config.enableLogging) {
this.logger.warn(
`Health check failed for connection ${connection.id}:`,
error
);
}
// Create failed health result
const failedResult: HealthCheckResult = {
checkId,
checkType: 'network',
timestamp: new Date(),
status: 'critical',
metrics: {},
details: {
message: `Health check failed: ${error instanceof Error ? error.message : String(error)}`,
recoverable: true,
},
duration: Date.now() - startTime,
checks: {
connection: {
checkStatus: 'fail',
message: `Health check failed: ${error instanceof Error ? error.message : String(error)}`,
},
},
overallScore: 0,
};
this.storeConnectionHealthHistory(connection.id, failedResult);
}
});
await Promise.allSettled(healthCheckPromises);
}
/**
* Perform detailed health check on a single connection
*/
private async performConnectionHealthCheck(
connection: PooledConnection
): Promise<HealthCheckResult & { success: boolean; responseTime: number }> {
const startTime = Date.now();
const checkId = `health-${connection.id}-${Date.now()}`;
try {
const sshClient = connection.connection;
if (!sshClient || sshClient.destroyed) {
return {
checkId,
checkType: 'network',
timestamp: new Date(),
status: 'critical',
metrics: { destroyed: 1 },
details: {
message: 'SSH client is destroyed or unavailable',
diagnosis: 'Connection has been terminated',
recommendations: ['Reconnect to restore functionality'],
recoverable: true,
},
duration: Date.now() - startTime,
checks: {
connection: {
checkStatus: 'fail',
message: 'SSH client is destroyed or unavailable',
},
},
overallScore: 0,
success: false,
responseTime: Date.now() - startTime,
};
}
// Perform actual connectivity test
const testResult = await this.testConnectionConnectivity(sshClient);
const responseTime = Date.now() - startTime;
// Calculate health metrics
const metrics = {
responseTime,
activeSessionCount: connection.activeSessionCount,
connectionAge: Date.now() - connection.createdAt.getTime(),
reconnectAttempts: connection.reconnectAttempts,
};
// Determine status based on response time and connectivity
let status: HealthCheckResult['status'] = 'healthy';
const recommendations: string[] = [];
if (!testResult.success) {
status = 'critical';
recommendations.push('Connection failed connectivity test');
recommendations.push('Consider reconnecting or replacing connection');
} else if (responseTime > 10000) {
status = 'unhealthy';
recommendations.push('Very slow response time detected');
recommendations.push('Monitor network conditions');
} else if (responseTime > 5000) {
status = 'warning';
recommendations.push('Slow response time detected');
} else if (connection.reconnectAttempts > 0) {
status = 'warning';
recommendations.push('Connection has recent reconnection history');
}
return {
checkId,
checkType: 'network',
timestamp: new Date(),
status,
metrics,
details: {
message: `Connection health: ${status} (${responseTime}ms response time)`,
diagnosis: testResult.success
? 'Connection is responsive'
: `Connection failed: ${testResult.error}`,
recommendations,
recoverable: status !== 'critical',
},
duration: responseTime,
checks: {
connectivity: {
checkStatus:
status === 'healthy'
? 'pass'
: status === 'warning'
? 'warn'
: 'fail',
message: `Connection health: ${status}`,
value: responseTime,
duration: responseTime,
},
},
overallScore:
status === 'healthy'
? 100
: status === 'warning'
? 75
: status === 'unhealthy'
? 50
: 0,
success: testResult.success,
responseTime,
};
} catch (error) {
return {
checkId,
checkType: 'network',
timestamp: new Date(),
status: 'critical',
metrics: {},
details: {
message: `Health check error: ${error instanceof Error ? error.message : String(error)}`,
diagnosis: 'Unable to assess connection health',
recommendations: [
'Check connection stability',
'Consider reconnection',
],
recoverable: true,
},
duration: Date.now() - startTime,
checks: {
connectivity: {
checkStatus: 'fail',
message: `Health check error: ${error instanceof Error ? error.message : String(error)}`,
},
},
overallScore: 0,
success: false,
responseTime: Date.now() - startTime,
};
}
}
/**
* Test actual connectivity of SSH connection
*/
private async testConnectionConnectivity(
sshClient: SSHClient
): Promise<{ success: boolean; error?: string }> {
return new Promise((resolve) => {
try {
// Simple connectivity test - attempt to create a shell
sshClient.shell({ term: 'xterm-256color' }, (err, stream) => {
if (err) {
resolve({ success: false, error: err.message });
return;
}
// Immediately close the test shell
stream.close();
resolve({ success: true });
});
// Timeout for the test
setTimeout(() => {
resolve({ success: false, error: 'Connectivity test timeout' });
}, 5000);
} catch (error) {
resolve({
success: false,
error: error instanceof Error ? error.message : String(error),
});
}
});
}
/**
* Store connection health history for trend analysis
*/
private storeConnectionHealthHistory(
connectionId: string,
result: HealthCheckResult
): void {
if (!this.connectionHealthHistory.has(connectionId)) {
this.connectionHealthHistory.set(connectionId, []);
}
const history = this.connectionHealthHistory.get(connectionId)!;
history.push(result);
// Keep only last 100 results per connection
if (history.length > 100) {
history.shift();
}
}
/**
* Update circuit breaker with health information
*/
private updateCircuitBreakerHealth(
connection: PooledConnection,
healthResult: HealthCheckResult
): void {
const hostKey = `${connection.host}:${connection.port}:${connection.username}`;
let breaker = this.circuitBreakers.get(hostKey);
if (!breaker) {
breaker = {
state: 'closed',
failures: 0,
lastFailure: new Date(),
isOpen: false,
successCount: 0,
nextAttemptTime: 0,
responseTimeHistory: [],
healthScore: 100,
};
this.circuitBreakers.set(hostKey, breaker);
}
// Update response time history
if (
'responseTime' in healthResult &&
typeof healthResult.responseTime === 'number'
) {
breaker.responseTimeHistory.push(healthResult.responseTime);
if (breaker.responseTimeHistory.length > 50) {
breaker.responseTimeHistory.shift();
}
}
// Calculate health score (0-100)
let healthScore = 100;
if (healthResult.status === 'critical') healthScore = 0;
else if (healthResult.status === 'unhealthy') healthScore = 25;
else if (healthResult.status === 'warning') healthScore = 70;
// Factor in response time performance
if (breaker.responseTimeHistory.length > 0) {
const avgResponseTime =
breaker.responseTimeHistory.reduce((a, b) => a + b, 0) /
breaker.responseTimeHistory.length;
if (avgResponseTime > 5000) healthScore *= 0.8;
else if (avgResponseTime > 2000) healthScore *= 0.9;
}
breaker.healthScore = healthScore;
// Update circuit breaker state based on health
if (healthResult.status === 'healthy') {
if (breaker.state === 'half-open') {
breaker.successCount++;
if (breaker.successCount >= 3) {
breaker.state = 'closed';
breaker.failures = 0;
breaker.isOpen = false;
}
} else if (breaker.state === 'closed') {
breaker.failures = Math.max(0, breaker.failures - 1);
}
} else {
this.recordCircuitBreakerFailure(
hostKey,
new Error(healthResult.details.message)
);
}
}
/**
* Analyze failure risk based on connection history
*/
private analyzeFailureRisk(
connectionId: string,
currentResult: HealthCheckResult
): number {
const history = this.connectionHealthHistory.get(connectionId) || [];
if (history.length < 5) return 0;
let riskScore = 0;
// Recent failures increase risk
const recentResults = history.slice(-10);
const recentFailures = recentResults.filter(
(r) => r.status === 'unhealthy' || r.status === 'critical'
).length;
riskScore += (recentFailures / recentResults.length) * 0.4;
// Degrading response times
if (recentResults.length >= 5) {
const responseTimes = recentResults
.map((r) => r.metrics.responseTime)
.filter((rt) => typeof rt === 'number') as number[];
if (responseTimes.length >= 5) {
const recent = responseTimes.slice(-3);
const older = responseTimes.slice(-6, -3);
const recentAvg = recent.reduce((a, b) => a + b, 0) / recent.length;
const olderAvg = older.reduce((a, b) => a + b, 0) / older.length;
if (recentAvg > olderAvg * 1.5) {
riskScore += 0.3;
}
}
}
// Current status contributes to risk
if (currentResult.status === 'critical') riskScore += 0.3;
else if (currentResult.status === 'unhealthy') riskScore += 0.2;
else if (currentResult.status === 'warning') riskScore += 0.1;
return Math.min(1.0, riskScore);
}
/**
* Generate recommendations for connection health
*/
private generateConnectionRecommendations(
connection: PooledConnection,
healthResult: HealthCheckResult
): string[] {
const recommendations: string[] = [];
if (
healthResult.status === 'critical' ||
healthResult.status === 'unhealthy'
) {
recommendations.push('Connection requires immediate attention');
recommendations.push('Consider reconnecting or replacing connection');
}
if (connection.reconnectAttempts > 0) {
recommendations.push('Monitor connection stability');
recommendations.push('Investigate underlying network issues');
}
if (
healthResult.metrics.responseTime &&
healthResult.metrics.responseTime > 5000
) {
recommendations.push(
'Slow response times detected - check network conditions'
);
recommendations.push('Consider connection pooling optimization');
}
if (connection.activeSessionCount > 10) {
recommendations.push('High session load - consider load balancing');
}
return recommendations;
}
/**
* Update average response time metric
*/
private updateAverageResponseTime(newResponseTime: number): void {
if (this.metrics.successfulHealthChecks === 1) {
this.metrics.averageResponseTime = newResponseTime;
} else {
this.metrics.averageResponseTime =
(this.metrics.averageResponseTime *
(this.metrics.successfulHealthChecks - 1) +
newResponseTime) /
this.metrics.successfulHealthChecks;
}
}
/**
* Start cleanup process for idle connections
*/
private startCleanupProcess(): void {
if (!this.config.cleanupInterval || this.config.cleanupInterval <= 0) {
return;
}
this.cleanupInterval = setInterval(async () => {
await this.cleanupIdleConnections();
}, this.config.cleanupInterval);
}
/**
* Clean up idle connections
*/
private async cleanupIdleConnections(): Promise<void> {
const now = Date.now();
const connectionsToClose: string[] = [];
this.connections.forEach((connection, connectionId) => {
const idleTime = now - connection.lastUsed.getTime();
// Only clean up idle connections with no active sessions
if (
connection.activeSessionCount === 0 &&
idleTime > this.config.connectionIdleTimeout
) {
connectionsToClose.push(connectionId);
}
});
if (connectionsToClose.length > 0) {
if (this.config.enableLogging) {
this.logger.info(
`Cleaning up ${connectionsToClose.length} idle connections`
);
}
await Promise.all(
connectionsToClose.map((id) => this.closeConnection(id))
);
this.emit('connectionsCleanedUp', { count: connectionsToClose.length });
}
}
}