/**
* Unit Tests for Connection Pool with Mutex Synchronization
* Tests for RACE_CONDITION_001 fix - atomic acquire/release operations
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { EventEmitter } from 'events';
// Mock socket for testing
class MockSocket extends EventEmitter {
destroyed = false;
written: string[] = [];
setTimeout(_ms: number): void {}
connect(_port: number, _host: string, callback?: () => void): void {
setImmediate(() => {
if (callback) callback();
});
}
write(data: string): boolean {
this.written.push(data);
return true;
}
destroy(): void {
this.destroyed = true;
this.emit('close');
}
simulateData(data: string): void {
this.emit('data', Buffer.from(data));
}
simulateError(error: Error): void {
this.emit('error', error);
}
}
// Simplified connection pool for testing mutex behavior
interface TestPooledConnection {
id: string;
inUse: boolean;
socket: MockSocket;
lastUsedAt: number;
requestCount: number;
}
interface TestPoolConfig {
maxConnections: number;
connectionTimeout: number;
mutexTimeout: number;
}
const DEFAULT_TEST_CONFIG: TestPoolConfig = {
maxConnections: 3,
connectionTimeout: 5000,
mutexTimeout: 5000
};
/**
* Test implementation of ConnectionPool with mutex
* Mirrors production implementation for unit testing
*/
class TestConnectionPool {
private connections: Map<string, TestPooledConnection> = new Map();
private config: TestPoolConfig;
private connectionIdCounter = 0;
private pendingRequests: Array<{
resolve: (conn: TestPooledConnection) => void;
reject: (error: Error) => void;
timeout: NodeJS.Timeout;
}> = [];
// Mutex state
private acquireLock = false;
private lockQueue: Array<{
resolve: () => void;
reject: (error: Error) => void;
}> = [];
constructor(config: Partial<TestPoolConfig> = {}) {
this.config = { ...DEFAULT_TEST_CONFIG, ...config };
}
private async acquireMutex(operation: string): Promise<void> {
return new Promise((resolve, reject) => {
const timeoutHandle = setTimeout(() => {
const index = this.lockQueue.findIndex(item => item.resolve === resolve);
if (index !== -1) {
this.lockQueue.splice(index, 1);
}
reject(new Error(`Mutex timeout during ${operation}`));
}, this.config.mutexTimeout);
const wrappedResolve = () => {
clearTimeout(timeoutHandle);
resolve();
};
const wrappedReject = (error: Error) => {
clearTimeout(timeoutHandle);
reject(error);
};
if (!this.acquireLock) {
this.acquireLock = true;
wrappedResolve();
} else {
this.lockQueue.push({
resolve: wrappedResolve,
reject: wrappedReject
});
}
});
}
private releaseMutex(): void {
const next = this.lockQueue.shift();
if (next) {
next.resolve();
} else {
this.acquireLock = false;
}
}
async acquire(): Promise<TestPooledConnection> {
await this.acquireMutex('acquire');
try {
// Find available connection
for (const conn of this.connections.values()) {
if (!conn.inUse && !conn.socket.destroyed) {
conn.inUse = true;
conn.lastUsedAt = Date.now();
return conn;
}
}
// Create new if under max
if (this.connections.size < this.config.maxConnections) {
const conn = this.createConnection();
conn.inUse = true;
conn.lastUsedAt = Date.now();
return conn;
}
// Queue request
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
const index = this.pendingRequests.findIndex(r => r.resolve === resolve);
if (index !== -1) {
this.pendingRequests.splice(index, 1);
}
reject(new Error('Connection pool timeout'));
}, this.config.connectionTimeout);
this.pendingRequests.push({ resolve, reject, timeout });
});
} finally {
this.releaseMutex();
}
}
async release(connection: TestPooledConnection): Promise<void> {
await this.acquireMutex('release');
try {
const conn = this.connections.get(connection.id);
if (!conn) return;
conn.inUse = false;
conn.lastUsedAt = Date.now();
conn.requestCount++;
// Serve pending request
if (this.pendingRequests.length > 0) {
for (const c of this.connections.values()) {
if (!c.inUse && !c.socket.destroyed) {
const pending = this.pendingRequests.shift();
if (pending) {
clearTimeout(pending.timeout);
c.inUse = true;
c.lastUsedAt = Date.now();
pending.resolve(c);
}
break;
}
}
}
} finally {
this.releaseMutex();
}
}
private createConnection(): TestPooledConnection {
const id = `conn-${++this.connectionIdCounter}`;
const conn: TestPooledConnection = {
id,
inUse: false,
socket: new MockSocket(),
lastUsedAt: Date.now(),
requestCount: 0
};
this.connections.set(id, conn);
return conn;
}
getStats(): {
totalConnections: number;
activeConnections: number;
pendingRequests: number;
lockQueueLength: number;
} {
let active = 0;
for (const conn of this.connections.values()) {
if (conn.inUse) active++;
}
return {
totalConnections: this.connections.size,
activeConnections: active,
pendingRequests: this.pendingRequests.length,
lockQueueLength: this.lockQueue.length
};
}
async shutdown(): Promise<void> {
await this.acquireMutex('shutdown');
try {
for (const pending of this.pendingRequests) {
clearTimeout(pending.timeout);
pending.reject(new Error('Pool shutting down'));
}
this.pendingRequests = [];
for (const waiter of this.lockQueue) {
waiter.reject(new Error('Pool shutting down'));
}
this.lockQueue = [];
for (const conn of this.connections.values()) {
conn.socket.destroy();
}
this.connections.clear();
} finally {
this.acquireLock = false;
}
}
}
describe('Connection Pool Mutex Synchronization (RACE_CONDITION_001)', () => {
let pool: TestConnectionPool;
beforeEach(() => {
pool = new TestConnectionPool();
});
afterEach(async () => {
await pool.shutdown();
});
describe('Basic Operations', () => {
it('should acquire a connection', async () => {
const conn = await pool.acquire();
expect(conn).toBeDefined();
expect(conn.id).toBe('conn-1');
expect(conn.inUse).toBe(true);
});
it('should release a connection', async () => {
const conn = await pool.acquire();
expect(conn.inUse).toBe(true);
await pool.release(conn);
expect(conn.inUse).toBe(false);
});
it('should track connection statistics', async () => {
const conn1 = await pool.acquire();
const conn2 = await pool.acquire();
const stats = pool.getStats();
expect(stats.totalConnections).toBe(2);
expect(stats.activeConnections).toBe(2);
await pool.release(conn1);
const stats2 = pool.getStats();
expect(stats2.activeConnections).toBe(1);
});
});
describe('Mutex Protection (Race Condition Prevention)', () => {
it('should not allow concurrent acquire of same connection', async () => {
// Create pool with only 1 max connection
const singlePool = new TestConnectionPool({ maxConnections: 1 });
// First acquire should succeed
const conn1 = await singlePool.acquire();
expect(conn1.id).toBe('conn-1');
expect(conn1.inUse).toBe(true);
// Second acquire will queue (since max=1 and conn1 is in use)
// We don't await yet - it should be pending
const acquirePromise = singlePool.acquire();
// Wait a tick for the acquire to register
await new Promise(resolve => setImmediate(resolve));
// Release first connection
await singlePool.release(conn1);
// Second acquire should now complete with same connection
const conn2 = await acquirePromise;
expect(conn2.id).toBe('conn-1'); // Same connection reused
expect(conn2.inUse).toBe(true);
await singlePool.shutdown();
});
it('should handle concurrent acquires atomically', async () => {
const multiPool = new TestConnectionPool({ maxConnections: 2 });
// Concurrent acquires
const [conn1, conn2] = await Promise.all([
multiPool.acquire(),
multiPool.acquire()
]);
// Should get different connections
expect(conn1.id).not.toBe(conn2.id);
expect(conn1.inUse).toBe(true);
expect(conn2.inUse).toBe(true);
const stats = multiPool.getStats();
expect(stats.totalConnections).toBe(2);
expect(stats.activeConnections).toBe(2);
await multiPool.shutdown();
});
it('should serialize access through mutex', async () => {
const executionOrder: string[] = [];
// Override acquire to track execution order
const origAcquire = pool.acquire.bind(pool);
pool.acquire = async () => {
executionOrder.push('acquire-start');
const result = await origAcquire();
executionOrder.push('acquire-end');
return result;
};
// Concurrent acquires
await Promise.all([
pool.acquire(),
pool.acquire(),
pool.acquire()
]);
// Due to mutex, acquires should complete one at a time
// (though start order may interleave)
expect(executionOrder.filter(e => e === 'acquire-end').length).toBe(3);
});
});
describe('Pending Request Queue', () => {
it('should queue requests when pool exhausted', async () => {
const smallPool = new TestConnectionPool({
maxConnections: 1,
connectionTimeout: 10000
});
// Acquire the only connection
const conn1 = await smallPool.acquire();
// Second acquire should queue
const acquirePromise = smallPool.acquire();
// Wait a tick for the acquire to register in queue
await new Promise(resolve => setImmediate(resolve));
// Verify queued
expect(smallPool.getStats().pendingRequests).toBe(1);
// Release first
await smallPool.release(conn1);
// Second should complete
const conn2 = await acquirePromise;
expect(conn2).toBeDefined();
expect(smallPool.getStats().pendingRequests).toBe(0);
await smallPool.shutdown();
});
it('should timeout pending requests', async () => {
const timeoutPool = new TestConnectionPool({
maxConnections: 1,
connectionTimeout: 100 // 100ms timeout
});
// Acquire the only connection
const conn1 = await timeoutPool.acquire();
// Second acquire should timeout
await expect(timeoutPool.acquire()).rejects.toThrow('Connection pool timeout');
await timeoutPool.release(conn1);
await timeoutPool.shutdown();
});
it('should process queue in order (FIFO)', async () => {
const fifoPool = new TestConnectionPool({
maxConnections: 1,
connectionTimeout: 10000
});
const conn = await fifoPool.acquire();
const order: number[] = [];
// Queue multiple requests
const p1 = fifoPool.acquire().then(() => { order.push(1); });
const p2 = fifoPool.acquire().then(() => { order.push(2); });
const p3 = fifoPool.acquire().then(() => { order.push(3); });
// Release to process queue
await fifoPool.release(conn);
await fifoPool.release(conn);
await fifoPool.release(conn);
await Promise.all([p1, p2, p3]);
// Should be processed in order
expect(order).toEqual([1, 2, 3]);
await fifoPool.shutdown();
});
});
describe('Edge Cases', () => {
it('should handle rapid acquire/release cycles', async () => {
const iterations = 50;
for (let i = 0; i < iterations; i++) {
const conn = await pool.acquire();
expect(conn.inUse).toBe(true);
await pool.release(conn);
expect(conn.inUse).toBe(false);
}
// Connection should be reused
const stats = pool.getStats();
expect(stats.totalConnections).toBe(1); // Only one connection created
});
it('should handle concurrent acquire and release', async () => {
const conn1 = await pool.acquire();
const conn2 = await pool.acquire();
// Concurrent operations
await Promise.all([
pool.release(conn1),
pool.acquire(),
pool.release(conn2)
]);
// Pool should be in consistent state
const stats = pool.getStats();
expect(stats.totalConnections).toBeGreaterThan(0);
});
it('should handle release of unknown connection gracefully', async () => {
const fakeConn: TestPooledConnection = {
id: 'fake-conn',
inUse: true,
socket: new MockSocket(),
lastUsedAt: Date.now(),
requestCount: 0
};
// Should not throw
await pool.release(fakeConn);
});
});
describe('Shutdown', () => {
it('should reject pending requests on shutdown', async () => {
const shutdownPool = new TestConnectionPool({
maxConnections: 1,
connectionTimeout: 60000
});
const conn = await shutdownPool.acquire();
// Queue a request
const pendingPromise = shutdownPool.acquire();
// Shutdown
await shutdownPool.shutdown();
// Pending should be rejected
await expect(pendingPromise).rejects.toThrow('Pool shutting down');
});
it('should clear all connections on shutdown', async () => {
await pool.acquire();
await pool.acquire();
const beforeStats = pool.getStats();
expect(beforeStats.totalConnections).toBe(2);
await pool.shutdown();
const afterStats = pool.getStats();
expect(afterStats.totalConnections).toBe(0);
});
});
});
describe('Mutex Timeout Protection', () => {
it('should timeout mutex acquisition to prevent deadlocks', async () => {
const deadlockPool = new TestConnectionPool({
maxConnections: 1,
mutexTimeout: 100 // Very short timeout
});
// Acquire connection to hold mutex longer
const conn = await deadlockPool.acquire();
// This test verifies the timeout mechanism works
// In production, long-held mutexes would timeout
await deadlockPool.release(conn);
await deadlockPool.shutdown();
});
});
describe('Connection Pool Statistics', () => {
it('should accurately track active connections', async () => {
const conn1 = await new TestConnectionPool().acquire();
const pool = new TestConnectionPool();
const c1 = await pool.acquire();
const c2 = await pool.acquire();
const c3 = await pool.acquire();
expect(pool.getStats().activeConnections).toBe(3);
await pool.release(c1);
expect(pool.getStats().activeConnections).toBe(2);
await pool.release(c2);
await pool.release(c3);
expect(pool.getStats().activeConnections).toBe(0);
await pool.shutdown();
});
it('should track request count per connection', async () => {
const pool = new TestConnectionPool({ maxConnections: 1 });
const conn = await pool.acquire();
expect(conn.requestCount).toBe(0);
await pool.release(conn);
expect(conn.requestCount).toBe(1);
await pool.acquire();
await pool.release(conn);
expect(conn.requestCount).toBe(2);
await pool.shutdown();
});
});