MCP GitHub Issue Server

/** * Database connection pool implementation */ import { Database, open } from 'sqlite'; import { Logger } from '../../../logging/index.js'; import { ErrorCodes, createError } from '../../../errors/index.js'; import { StorageConfig } from '../../../types/storage.js'; import { MonitoringConfig } from '../../monitoring/index.js'; import { ConnectionStateManager } from './state.js'; import { WALManager } from '../wal/manager.js'; import { join } from 'path'; import crypto from 'crypto'; import { isDatabaseError, isTransientError } from '../../../utils/error-utils.js'; interface PoolConnection { db: Database; id: string; inUse: boolean; lastUsed: number; createdAt: number; errorCount: number; usageCount: number; totalUsageTime: number; lastError?: Error; } interface RetryOptions { maxAttempts: number; initialDelay: number; maxDelay: number; backoffFactor: number; } const DEFAULT_RETRY_OPTIONS: RetryOptions = { maxAttempts: 5, initialDelay: 100, maxDelay: 2000, backoffFactor: 2, }; export class ConnectionPool { private readonly logger: Logger; private readonly config: StorageConfig; private readonly connections: Map<string, PoolConnection>; private readonly minConnections: number; private readonly maxConnections: number; private readonly idleTimeout: number; private readonly maxAge: number = 30 * 60 * 1000; // 30 minutes max connection age private readonly stateManager: ConnectionStateManager; private cleanupInterval: NodeJS.Timeout | null; private readonly _dbPath: string; /** * Get the database file path */ get databasePath(): string { return this._dbPath; } private readonly connectionIds = new WeakMap<Database, string>(); private readonly verifiedConnections = new Set<string>(); private isInitialized = false; private totalConnectionsCreated: number = 0; private totalConnectionErrors: number = 0; constructor( config: StorageConfig & { monitoring?: MonitoringConfig }, options: { minConnections?: number; maxConnections?: number; idleTimeout?: number; } = {} ) { // Increase max listeners to prevent warning process.setMaxListeners(20); this.logger = Logger.getInstance().child({ component: 'ConnectionPool', context: { database: config.name || 'default', minConnections: options.minConnections || 1, maxConnections: Math.min(options.maxConnections || 5, 5), idleTimeout: options.idleTimeout || 30000, }, }); this.config = config; this.connections = new Map(); this.minConnections = options.minConnections || 1; this.maxConnections = Math.min(options.maxConnections || 5, 5); // Cap at 5 connections this.idleTimeout = options.idleTimeout || 30000; // 30 seconds idle timeout this.cleanupInterval = null; this._dbPath = join(config.baseDir || './data', `${config.name || 'default'}.db`); // Initialize state manager with monitoring config this.stateManager = ConnectionStateManager.getInstance({ errorThreshold: config.monitoring?.healthCheck?.errorThreshold, responseTimeThreshold: config.monitoring?.healthCheck?.responseTimeThreshold, }); this.logger.info('Connection pool created', { config: { dbPath: this._dbPath, minConnections: this.minConnections, maxConnections: this.maxConnections, idleTimeout: this.idleTimeout, maxAge: this.maxAge, }, context: { operation: 'create', timestamp: Date.now(), }, }); } /** * Get the unique ID for a database connection */ getConnectionId(db: Database): string { let id = this.connectionIds.get(db); if (!id) { id = crypto.randomUUID(); this.connectionIds.set(db, id); } return id; } /** * Initialize the connection pool */ async initialize(): Promise<void> { if (this.isInitialized) { return; } const initStart = Date.now(); let initDb: Database | undefined; let walManager: WALManager | undefined; try { this.logger.info('Initializing connection pool', { context: { operation: 'initialize', timestamp: initStart, }, }); // First-time initialization const sqlite3 = await import('sqlite3'); // Create initial database connection with retries initDb = await this.retryOperation( async () => { const db = await open({ filename: this._dbPath, driver: sqlite3.default.Database, mode: sqlite3.default.OPEN_READWRITE | sqlite3.default.OPEN_CREATE, }); // Get the underlying driver const driver = (db as any).driver; if (driver) { // Configure low-level driver settings driver.configure && driver.configure('busyTimeout', 5000); } return db; }, 'initialize', { maxAttempts: 10, // More attempts for initialization initialDelay: 200, maxDelay: 3000, backoffFactor: 1.5, } ); try { // Enable WAL mode and configure database walManager = WALManager.getInstance(this._dbPath); await walManager.enableWAL(initDb); await walManager.checkpoint(initDb); // Force checkpoint before proceeding // Basic verification await initDb.get('SELECT 1'); // Create single initial connection const conn = await this.createConnection(); this.connections.set(conn.id, conn); // Close initial connection now that we have our pool connection await initDb.close(); initDb = undefined; this.isInitialized = true; // Start monitoring with reduced intervals this.stateManager.startMonitoring(); this.cleanupInterval = setInterval(() => this.cleanupIdleConnections(), this.idleTimeout); const duration = Date.now() - initStart; this.logger.info('Connection pool initialized', { duration, connections: this.connections.size, context: { operation: 'initialize', timestamp: Date.now(), }, }); } catch (error) { // Clean up initial connection if something failed if (initDb) { await initDb.close().catch(() => {}); initDb = undefined; } throw error; } } catch (error) { const duration = Date.now() - initStart; this.logger.error('Failed to initialize connection pool', error, { duration, isTransient: isTransientError(error), isDatabaseError: isDatabaseError(error), context: { operation: 'initialize', timestamp: Date.now(), }, }); throw createError( ErrorCodes.STORAGE_INIT, 'Failed to initialize connection pool', 'initialize', error instanceof Error ? error.message : String(error), { duration, isTransient: isTransientError(error), isDatabaseError: isDatabaseError(error), } ); } } /** * Retry an operation with exponential backoff */ private async retryOperation<T>( operation: () => Promise<T>, operationName: string, options: RetryOptions = DEFAULT_RETRY_OPTIONS ): Promise<T> { let lastError: Error | undefined; let delay = options.initialDelay; for (let attempt = 1; attempt <= options.maxAttempts; attempt++) { try { return await operation(); } catch (error) { lastError = error instanceof Error ? error : new Error(String(error)); // Only retry on transient errors if (!isTransientError(error)) { throw error; } if (attempt < options.maxAttempts) { this.logger.debug(`Retrying ${operationName} after error`, { attempt, delay, error: lastError, context: { operation: operationName, timestamp: Date.now(), }, }); await new Promise(resolve => setTimeout(resolve, delay)); delay = Math.min(delay * options.backoffFactor, options.maxDelay); } } } throw createError( ErrorCodes.STORAGE_ERROR, `${operationName} failed after ${options.maxAttempts} attempts`, operationName, lastError?.message || 'Operation failed', { attempts: options.maxAttempts, lastError: lastError ? { name: lastError.name, message: lastError.message, stack: lastError.stack, } : undefined, isTransient: true, } ); } /** * Get a connection from the pool */ async getConnection(): Promise<Database> { const getStart = Date.now(); let acquiredConnection: string | undefined; try { // First try to find a healthy available connection for (const [id, conn] of this.connections.entries()) { if (!conn.inUse) { if (this.stateManager.isHealthy(id) && !this.stateManager.hasActiveTransaction(id)) { conn.inUse = true; conn.lastUsed = Date.now(); conn.usageCount++; this.stateManager.markInUse(id); acquiredConnection = id; this.logger.debug('Reusing healthy connection', { id, usageCount: conn.usageCount, age: Date.now() - conn.createdAt, context: { operation: 'getConnection', timestamp: Date.now(), }, }); return conn.db; } } } // If we haven't reached max connections, create a new one if (this.connections.size < this.maxConnections) { const conn = await this.createConnection(); conn.inUse = true; this.stateManager.markInUse(conn.id); acquiredConnection = conn.id; return conn.db; } // Otherwise wait for a connection to become available return new Promise((resolve, reject) => { const timeout = setTimeout(() => { this.logger.warn('Connection acquisition timeout', { waitTime: Date.now() - getStart, activeConnections: Array.from(this.connections.entries()).map(([id, conn]) => ({ id, inUse: conn.inUse, age: Date.now() - conn.createdAt, lastUsed: Date.now() - conn.lastUsed, hasTransaction: this.stateManager.hasActiveTransaction(id), })), context: { operation: 'getConnection', timestamp: Date.now(), }, }); const timeoutError = new Error('Timed out waiting for available connection'); this.logger.error('Connection acquisition timeout', timeoutError, { waitTime: Date.now() - getStart, activeConnections: this.connections.size, context: { operation: 'getConnection', timestamp: Date.now(), }, }); reject( createError( ErrorCodes.STORAGE_ERROR, 'Connection timeout', 'getConnection', 'Timed out waiting for available connection', { waitTime: Date.now() - getStart, activeConnections: this.connections.size, isTransient: true, } ) ); }, this.config.connection?.busyTimeout || 2000); const checkConnection = async () => { for (const [id] of this.connections.entries()) { const conn = this.connections.get(id); if (conn && !conn.inUse) { if (this.stateManager.isHealthy(id) && !this.stateManager.hasActiveTransaction(id)) { clearTimeout(timeout); conn.inUse = true; conn.lastUsed = Date.now(); conn.usageCount++; this.stateManager.markInUse(id); acquiredConnection = id; this.logger.debug('Connection became available', { id, waitTime: Date.now() - getStart, usageCount: conn.usageCount, context: { operation: 'getConnection', timestamp: Date.now(), }, }); resolve(conn.db); return; } } } setTimeout(checkConnection, 100); }; checkConnection(); }); } catch (error) { this.logger.error('Failed to acquire connection', error, { duration: Date.now() - getStart, acquiredConnection, activeConnections: this.connections.size, isTransient: isTransientError(error), isDatabaseError: isDatabaseError(error), context: { operation: 'getConnection', timestamp: Date.now(), }, }); throw createError( ErrorCodes.STORAGE_ERROR, 'Failed to acquire database connection', 'getConnection', error instanceof Error ? error.message : String(error), { duration: Date.now() - getStart, acquiredConnection, activeConnections: this.connections.size, isTransient: isTransientError(error), isDatabaseError: isDatabaseError(error), } ); } } /** * Release a connection back to the pool */ releaseConnection(db: Database): void { const id = this.getConnectionId(db); const conn = Array.from(this.connections.values()).find(c => c.id === id); if (conn) { const usageDuration = Date.now() - conn.lastUsed; conn.inUse = false; conn.lastUsed = Date.now(); conn.totalUsageTime += usageDuration; this.stateManager.markAvailable(id); this.logger.debug('Connection released', { id, usageDuration, totalUsageTime: conn.totalUsageTime, usageCount: conn.usageCount, context: { operation: 'releaseConnection', timestamp: Date.now(), }, }); } } /** * Create a new connection */ private async createConnection(): Promise<PoolConnection> { const createStart = Date.now(); const sqlite3 = await import('sqlite3'); const id = crypto.randomUUID(); try { this.logger.debug('Creating new connection', { id, existingConnections: this.connections.size, context: { operation: 'createConnection', timestamp: createStart, }, }); const db = await this.retryOperation(async () => { const db = await open({ filename: this._dbPath, driver: sqlite3.default.Database, mode: sqlite3.default.OPEN_READWRITE | sqlite3.default.OPEN_CREATE, }); // Get the underlying driver const driver = (db as any).driver; if (driver) { // Configure low-level driver settings driver.configure && driver.configure('busyTimeout', 5000); } // Configure connection with optimized memory settings await db.exec('PRAGMA foreign_keys=ON'); await db.exec('PRAGMA journal_mode=WAL'); await db.exec('PRAGMA synchronous=NORMAL'); await db.exec('PRAGMA cache_size=-8000'); // 8MB per connection await db.exec('PRAGMA page_size=4096'); await db.exec('PRAGMA mmap_size=67108864'); // 64MB memory mapping await db.exec('PRAGMA temp_store=MEMORY'); await db.exec('PRAGMA busy_timeout=5000'); await db.exec('PRAGMA threads=2'); // Allow two threads per connection await db.exec('PRAGMA read_uncommitted=0'); // Strict isolation return db; }, 'createConnection'); // Store connection ID this.connectionIds.set(db, id); // Skip verification if already verified if (!this.verifiedConnections.has(id)) { try { await db.get('SELECT 1'); this.verifiedConnections.add(id); this.logger.debug('Connection verified', { id, duration: Date.now() - createStart, context: { operation: 'verifyConnection', timestamp: Date.now(), }, }); } catch (error) { this.logger.error('Failed to verify connection', error, { id, duration: Date.now() - createStart, isTransient: isTransientError(error), isDatabaseError: isDatabaseError(error), context: { operation: 'verifyConnection', timestamp: Date.now(), }, }); await db.close().catch(() => {}); // Attempt to close on error throw createError( ErrorCodes.STORAGE_ERROR, 'Failed to verify database connection', 'verifyConnection', error instanceof Error ? error.message : String(error), { connectionId: id, duration: Date.now() - createStart, isTransient: isTransientError(error), isDatabaseError: isDatabaseError(error), } ); } } const conn: PoolConnection = { db, id, inUse: false, lastUsed: Date.now(), createdAt: Date.now(), errorCount: 0, usageCount: 0, totalUsageTime: 0, }; this.connections.set(id, conn); this.stateManager.registerConnection(id); this.totalConnectionsCreated++; this.logger.info('Created new connection', { id, duration: Date.now() - createStart, totalCreated: this.totalConnectionsCreated, activeConnections: this.connections.size, context: { operation: 'createConnection', timestamp: Date.now(), }, }); return conn; } catch (error) { this.totalConnectionErrors++; this.logger.error('Failed to create connection', error, { id, duration: Date.now() - createStart, totalErrors: this.totalConnectionErrors, isTransient: isTransientError(error), isDatabaseError: isDatabaseError(error), context: { operation: 'createConnection', timestamp: Date.now(), }, }); throw createError( ErrorCodes.STORAGE_ERROR, 'Failed to create database connection', 'createConnection', error instanceof Error ? error.message : String(error), { connectionId: id, duration: Date.now() - createStart, isTransient: isTransientError(error), isDatabaseError: isDatabaseError(error), } ); } } /** * Clean up idle connections but maintain minimum */ private async cleanupIdleConnections(): Promise<void> { const cleanupStart = Date.now(); const now = Date.now(); const idsToRemove: string[] = []; // Find connections to remove for (const [id, conn] of this.connections.entries()) { // Remove if: // 1. Connection is idle and timed out // 2. Connection has exceeded max age // 3. Connection has errors and isn't in use if ( (!conn.inUse && now - conn.lastUsed > this.idleTimeout) || now - conn.createdAt > this.maxAge || (conn.errorCount > 0 && !conn.inUse) ) { // Keep minimum connections unless they're errored if (this.connections.size > this.minConnections || conn.errorCount > 0) { idsToRemove.push(id); } } } if (idsToRemove.length > 0) { this.logger.info('Starting connection cleanup', { connectionsToRemove: idsToRemove.length, totalConnections: this.connections.size, context: { operation: 'cleanup', timestamp: cleanupStart, }, }); } // Remove connections for (const id of idsToRemove) { const conn = this.connections.get(id); if (conn) { try { await conn.db.close(); this.connections.delete(id); this.stateManager.unregisterConnection(id); this.connectionIds.delete(conn.db); this.logger.debug('Removed connection', { id, age: now - conn.createdAt, idleTime: now - conn.lastUsed, errorCount: conn.errorCount, usageCount: conn.usageCount, totalUsageTime: conn.totalUsageTime, context: { operation: 'cleanup', timestamp: Date.now(), }, }); } catch (error) { this.logger.error('Failed to close connection', error, { id, isTransient: isTransientError(error), isDatabaseError: isDatabaseError(error), context: { operation: 'cleanup', timestamp: Date.now(), }, }); } } } if (idsToRemove.length > 0) { this.logger.info('Connection cleanup completed', { removedCount: idsToRemove.length, remainingConnections: this.connections.size, duration: Date.now() - cleanupStart, context: { operation: 'cleanup', timestamp: Date.now(), }, }); } } /** * Get current pool metrics */ getMetrics() { const metrics = { ...this.stateManager.getMetrics(), connections: { total: this.connections.size, inUse: Array.from(this.connections.values()).filter(c => c.inUse).length, idle: Array.from(this.connections.values()).filter(c => !c.inUse).length, totalCreated: this.totalConnectionsCreated, totalErrors: this.totalConnectionErrors, }, usage: Array.from(this.connections.values()).map(conn => ({ id: conn.id, age: Date.now() - conn.createdAt, usageCount: conn.usageCount, totalUsageTime: conn.totalUsageTime, errorCount: conn.errorCount, lastUsed: Date.now() - conn.lastUsed, })), }; this.logger.debug('Pool metrics retrieved', { metrics, context: { operation: 'getMetrics', timestamp: Date.now(), }, }); return metrics; } /** * Close all connections */ async close(): Promise<void> { const closeStart = Date.now(); this.stateManager.stopMonitoring(); if (this.cleanupInterval) { clearInterval(this.cleanupInterval); this.cleanupInterval = null; } this.logger.info('Closing connection pool', { activeConnections: this.connections.size, context: { operation: 'close', timestamp: closeStart, }, }); // Force close all connections, even those with transactions const closePromises = Array.from(this.connections.entries()).map(async ([id, conn]) => { try { // Rollback any active transactions if (this.stateManager.hasActiveTransaction(id)) { try { await conn.db.exec('ROLLBACK'); this.logger.debug('Rolled back transaction on close', { id, context: { operation: 'close', timestamp: Date.now(), }, }); } catch (e) { // Ignore rollback errors on close } } await conn.db.close(); this.stateManager.unregisterConnection(id); this.connectionIds.delete(conn.db); this.logger.debug('Closed connection', { id, usageCount: conn.usageCount, totalUsageTime: conn.totalUsageTime, context: { operation: 'close', timestamp: Date.now(), }, }); } catch (error) { this.logger.error('Failed to close connection', error, { id, isTransient: isTransientError(error), isDatabaseError: isDatabaseError(error), context: { operation: 'close', timestamp: Date.now(), }, }); } }); await Promise.all(closePromises); this.connections.clear(); this.verifiedConnections.clear(); this.logger.info('Connection pool closed', { duration: Date.now() - closeStart, totalConnectionsCreated: this.totalConnectionsCreated, totalConnectionErrors: this.totalConnectionErrors, context: { operation: 'close', timestamp: Date.now(), }, }); } }