/**
* postgres-mcp - Connection Pool Manager
*
* Wraps pg connection pooling with health monitoring,
* statistics tracking, and graceful shutdown support.
*/
import pg from 'pg';
import type { PoolClient, QueryResult as PgQueryResult } from 'pg';
import type { PoolConfig, PoolStats, HealthStatus } from '../types/index.js';
import { PoolError, ConnectionError } from '../types/index.js';
import { logger } from '../utils/logger.js';
/**
* Connection pool configuration with defaults
*/
export interface ConnectionPoolConfig {
host: string;
port: number;
user: string;
password: string;
database: string;
pool?: PoolConfig | undefined;
ssl?: pg.ConnectionConfig['ssl'] | undefined;
statementTimeout?: number | undefined;
applicationName?: string | undefined;
}
/**
* Connection pool wrapper with statistics and health monitoring
*/
export class ConnectionPool {
private pool: pg.Pool | null = null;
private config: ConnectionPoolConfig;
private stats: PoolStats = {
total: 0,
active: 0,
idle: 0,
waiting: 0,
totalQueries: 0
};
private shuttingDown = false;
constructor(config: ConnectionPoolConfig) {
this.config = config;
}
/**
* Initialize the connection pool
*/
async initialize(): Promise<void> {
if (this.pool !== null) {
logger.warn('Connection pool already initialized');
return;
}
logger.info('Initializing PostgreSQL connection pool', {
host: this.config.host,
port: this.config.port,
database: this.config.database
});
try {
const poolConfig: pg.PoolConfig = {
host: this.config.host,
port: this.config.port,
user: this.config.user,
password: this.config.password,
database: this.config.database,
max: this.config.pool?.max ?? 10,
min: this.config.pool?.min ?? 0,
idleTimeoutMillis: this.config.pool?.idleTimeoutMillis ?? 10000,
connectionTimeoutMillis: this.config.pool?.connectionTimeoutMillis ?? 10000,
allowExitOnIdle: this.config.pool?.allowExitOnIdle ?? true,
application_name: this.config.applicationName ?? 'postgres-mcp'
};
if (this.config.ssl === true) {
poolConfig.ssl = { rejectUnauthorized: false };
} else if (this.config.ssl !== undefined && this.config.ssl !== false) {
poolConfig.ssl = this.config.ssl;
}
if (this.config.statementTimeout !== undefined && this.config.statementTimeout > 0) {
poolConfig.statement_timeout = this.config.statementTimeout;
}
this.pool = new pg.Pool(poolConfig);
// Set up event handlers
this.pool.on('connect', () => {
this.stats.total++;
logger.debug('New connection established');
});
this.pool.on('acquire', () => {
this.stats.active++;
this.stats.idle = Math.max(0, this.stats.idle - 1);
});
this.pool.on('release', () => {
this.stats.active = Math.max(0, this.stats.active - 1);
this.stats.idle++;
});
this.pool.on('remove', () => {
this.stats.total = Math.max(0, this.stats.total - 1);
this.stats.idle = Math.max(0, this.stats.idle - 1);
});
this.pool.on('error', (err) => {
logger.error('Pool error', { error: err.message });
});
// Test connection
const client = await this.pool.connect();
const result = await client.query('SELECT version()');
client.release();
const version = result.rows[0] as { version?: string } | undefined;
logger.info('PostgreSQL connection pool initialized', {
version: version?.version ?? 'unknown'
});
} catch (error) {
// Clean up pool on initialization failure
if (this.pool !== null) {
try {
await this.pool.end();
} catch {
// Ignore cleanup errors
}
this.pool = null;
}
const message = error instanceof Error ? error.message : 'Unknown error';
logger.error('Failed to initialize connection pool', { error: message });
throw new ConnectionError(`Failed to connect to PostgreSQL: ${message}`);
}
}
/**
* Get a connection from the pool
*/
async getConnection(): Promise<PoolClient> {
if (this.pool === null) {
throw new PoolError('Connection pool not initialized');
}
if (this.shuttingDown) {
throw new PoolError('Connection pool is shutting down');
}
try {
this.stats.waiting++;
const client = await this.pool.connect();
this.stats.waiting = Math.max(0, this.stats.waiting - 1);
return client;
} catch (error) {
this.stats.waiting = Math.max(0, this.stats.waiting - 1);
const message = error instanceof Error ? error.message : 'Unknown error';
throw new PoolError(`Failed to acquire connection: ${message}`);
}
}
/**
* Release a connection back to the pool
*/
releaseConnection(client: PoolClient): void {
try {
client.release();
} catch (error) {
logger.warn('Error releasing connection', {
error: error instanceof Error ? error.message : 'Unknown error'
});
}
}
/**
* Execute a query using a pooled connection
*/
async query<T extends Record<string, unknown>[]>(
sql: string,
params?: unknown[]
): Promise<PgQueryResult<T[number]>> {
if (this.pool === null) {
throw new PoolError('Connection pool not initialized');
}
const startTime = Date.now();
this.stats.totalQueries++;
try {
const result = await this.pool.query<T[number]>(sql, params);
logger.debug('Query executed', {
sql: sql.substring(0, 100),
rowCount: result.rowCount,
durationMs: Date.now() - startTime
});
return result;
} catch (error) {
const message = error instanceof Error ? error.message : 'Unknown error';
logger.error('Query failed', { sql: sql.substring(0, 100), error: message });
throw error;
}
}
/**
* Get pool statistics
*/
getStats(): PoolStats {
if (this.pool !== null) {
// Update stats from pool if available
this.stats.total = this.pool.totalCount;
this.stats.idle = this.pool.idleCount;
this.stats.waiting = this.pool.waitingCount;
this.stats.active = this.stats.total - this.stats.idle;
}
return { ...this.stats };
}
/**
* Check pool health
*/
async checkHealth(): Promise<HealthStatus> {
if (this.pool === null || this.shuttingDown) {
return {
connected: false,
error: this.shuttingDown ? 'Pool is shutting down' : 'Pool not initialized'
};
}
const startTime = Date.now();
try {
const result = await this.pool.query('SELECT version(), current_database()');
const latencyMs = Date.now() - startTime;
const row = result.rows[0] as { version?: string; current_database?: string } | undefined;
return {
connected: true,
latencyMs,
version: row?.version ?? undefined,
poolStats: this.getStats(),
details: {
database: row?.current_database
}
};
} catch (error) {
return {
connected: false,
error: error instanceof Error ? error.message : 'Unknown error',
latencyMs: Date.now() - startTime
};
}
}
/**
* Gracefully shutdown the pool
*/
async shutdown(): Promise<void> {
if (this.pool === null) {
return;
}
logger.info('Shutting down connection pool...');
this.shuttingDown = true;
try {
await this.pool.end();
this.pool = null;
logger.info('Connection pool shut down successfully');
} catch (error) {
logger.error('Error during pool shutdown', {
error: error instanceof Error ? error.message : 'Unknown error'
});
throw error;
}
}
/**
* Check if pool is initialized
*/
isInitialized(): boolean {
return this.pool !== null && !this.shuttingDown;
}
/**
* Check if pool is shutting down
*/
isClosing(): boolean {
return this.shuttingDown;
}
}