PostgreSQL MCP Server
by HenkDz
Verified
import { DatabaseConnection } from '../utils/connection.js';
interface MonitoringResult {
timestamp: string;
metrics: {
database: DatabaseMetrics;
tables: Record<string, TableMetrics>;
queries: ActiveQueryInfo[];
locks: LockInfo[];
replication?: ReplicationInfo[];
};
alerts: Alert[];
}
interface DatabaseMetrics {
name: string;
size: string;
connections: {
active: number;
idle: number;
total: number;
max: number;
};
uptime: string;
transactions: {
committed: number;
rolledBack: number;
};
cacheHitRatio: number;
}
interface TableMetrics {
name: string;
size: string;
rowCount: number;
deadTuples: number;
lastVacuum: string | null;
lastAnalyze: string | null;
scanCount: number;
indexUseRatio: number;
}
interface ActiveQueryInfo {
pid: number;
username: string;
database: string;
startTime: string;
duration: number;
state: string;
query: string;
waitEvent?: string;
}
interface LockInfo {
relation: string;
mode: string;
granted: boolean;
pid: number;
username: string;
query: string;
}
interface ReplicationInfo {
clientAddr: string;
state: string;
sentLsn: string;
writeLsn: string;
flushLsn: string;
replayLsn: string;
writeLag: string | null;
flushLag: string | null;
replayLag: string | null;
}
interface Alert {
level: 'info' | 'warning' | 'critical';
message: string;
context?: Record<string, unknown>;
}
/**
* Get real-time monitoring information for a PostgreSQL database
*/
export async function monitorDatabase(
connectionString: string,
options: {
includeTables?: boolean;
includeQueries?: boolean;
includeLocks?: boolean;
includeReplication?: boolean;
alertThresholds?: {
connectionPercentage?: number;
longRunningQuerySeconds?: number;
cacheHitRatio?: number;
deadTuplesPercentage?: number;
vacuumAge?: number;
};
} = {}
): Promise<MonitoringResult> {
const db = DatabaseConnection.getInstance();
const alerts: Alert[] = [];
try {
await db.connect(connectionString);
// Get timestamp
const now = new Date();
const timestamp = now.toISOString();
// Get database metrics
const dbMetrics = await getDatabaseMetrics(db);
// Check connection threshold
const connectionPercentage = (dbMetrics.connections.total / dbMetrics.connections.max) * 100;
if (options.alertThresholds?.connectionPercentage &&
connectionPercentage > options.alertThresholds.connectionPercentage) {
alerts.push({
level: connectionPercentage > 90 ? 'critical' : 'warning',
message: `High connection usage: ${connectionPercentage.toFixed(1)}%`,
context: {
current: dbMetrics.connections.total,
max: dbMetrics.connections.max
}
});
}
// Check cache hit ratio
if (options.alertThresholds?.cacheHitRatio &&
dbMetrics.cacheHitRatio < options.alertThresholds.cacheHitRatio) {
alerts.push({
level: dbMetrics.cacheHitRatio < 0.8 ? 'critical' : 'warning',
message: `Low cache hit ratio: ${(dbMetrics.cacheHitRatio * 100).toFixed(1)}%`,
context: {
current: dbMetrics.cacheHitRatio
}
});
}
// Get table metrics if requested
const tableMetrics: Record<string, TableMetrics> = {};
if (options.includeTables) {
const tables = await getTableMetrics(db);
for (const table of tables) {
tableMetrics[table.name] = table;
// Check for tables with high dead tuple percentage
if (options.alertThresholds?.deadTuplesPercentage) {
const deadTuplePercentage = table.rowCount > 0
? (table.deadTuples / table.rowCount) * 100
: 0;
if (deadTuplePercentage > options.alertThresholds.deadTuplesPercentage) {
alerts.push({
level: deadTuplePercentage > 30 ? 'critical' : 'warning',
message: `High dead tuple percentage in table ${table.name}: ${deadTuplePercentage.toFixed(1)}%`,
context: {
table: table.name,
deadTuples: table.deadTuples,
totalRows: table.rowCount
}
});
}
}
// Check for tables that haven't been vacuumed recently
if (options.alertThresholds?.vacuumAge && table.lastVacuum) {
const lastVacuumDate = new Date(table.lastVacuum);
const daysSinceVacuum = Math.floor((now.getTime() - lastVacuumDate.getTime()) / (1000 * 60 * 60 * 24));
if (daysSinceVacuum > options.alertThresholds.vacuumAge) {
alerts.push({
level: 'warning',
message: `Table ${table.name} hasn't been vacuumed in ${daysSinceVacuum} days`,
context: {
table: table.name,
lastVacuum: table.lastVacuum
}
});
}
}
}
}
// Get active queries if requested
let activeQueries: ActiveQueryInfo[] = [];
if (options.includeQueries) {
activeQueries = await getActiveQueries(db);
// Check for long-running queries
if (options.alertThresholds?.longRunningQuerySeconds) {
const longRunningQueries = activeQueries.filter(
q => q.duration > options.alertThresholds!.longRunningQuerySeconds!
);
for (const query of longRunningQueries) {
alerts.push({
level: query.duration > options.alertThresholds.longRunningQuerySeconds * 2 ? 'critical' : 'warning',
message: `Long-running query (${query.duration.toFixed(1)}s) by ${query.username}`,
context: {
pid: query.pid,
duration: query.duration,
query: query.query.substring(0, 100) + (query.query.length > 100 ? '...' : '')
}
});
}
}
}
// Get lock information if requested
let locks: LockInfo[] = [];
if (options.includeLocks) {
locks = await getLockInfo(db);
// Alert on blocking locks
const blockingLocks = locks.filter(l => !l.granted);
if (blockingLocks.length > 0) {
alerts.push({
level: 'warning',
message: `${blockingLocks.length} blocking locks detected`,
context: {
count: blockingLocks.length
}
});
}
}
// Get replication information if requested
let replication: ReplicationInfo[] = [];
if (options.includeReplication) {
replication = await getReplicationInfo(db);
// Alert on replication lag
for (const replica of replication) {
if (replica.replayLag) {
const lagMatch = replica.replayLag.match(/(\d+):(\d+):(\d+)/);
if (lagMatch) {
const hours = parseInt(lagMatch[1]);
const minutes = parseInt(lagMatch[2]);
if (hours > 0 || minutes > 5) {
alerts.push({
level: hours > 0 ? 'critical' : 'warning',
message: `High replication lag for ${replica.clientAddr}: ${replica.replayLag}`,
context: {
clientAddr: replica.clientAddr,
lag: replica.replayLag
}
});
}
}
}
}
}
return {
timestamp,
metrics: {
database: dbMetrics,
tables: tableMetrics,
queries: activeQueries,
locks,
replication: options.includeReplication ? replication : undefined
},
alerts
};
} catch (error) {
return {
timestamp: new Date().toISOString(),
metrics: {
database: {
name: '',
size: '',
connections: { active: 0, idle: 0, total: 0, max: 0 },
uptime: '',
transactions: { committed: 0, rolledBack: 0 },
cacheHitRatio: 0
},
tables: {},
queries: [],
locks: []
},
alerts: [{
level: 'critical',
message: `Monitoring error: ${error instanceof Error ? error.message : String(error)}`
}]
};
} finally {
await db.disconnect();
}
}
/**
* Get database-level metrics
*/
async function getDatabaseMetrics(db: DatabaseConnection): Promise<DatabaseMetrics> {
// Get database name and size
const dbInfo = await db.query<{ datname: string; size: string }>(
`SELECT
current_database() as datname,
pg_size_pretty(pg_database_size(current_database())) as size`
);
// Get connection information
const connectionInfo = await db.query<{ max_conn: string; active: string; idle: string; total: string }>(
`SELECT
setting as max_conn
FROM pg_settings
WHERE name = 'max_connections'`
);
const activeConns = await db.query<{ count: string }>(
`SELECT count(*) FROM pg_stat_activity WHERE state = 'active' AND pid <> pg_backend_pid()`
);
const idleConns = await db.query<{ count: string }>(
`SELECT count(*) FROM pg_stat_activity WHERE state = 'idle'`
);
const totalConns = await db.query<{ count: string }>(
`SELECT count(*) FROM pg_stat_activity`
);
// Get uptime
const uptime = await db.query<{ uptime: string }>(
`SELECT pg_postmaster_start_time()::text as uptime`
);
// Get transaction stats
const txStats = await db.query<{ xact_commit: string; xact_rollback: string }>(
`SELECT xact_commit, xact_rollback
FROM pg_stat_database
WHERE datname = current_database()`
);
// Get cache hit ratio
const cacheHit = await db.query<{ ratio: number }>(
`SELECT
CASE WHEN blks_hit + blks_read = 0 THEN 0
ELSE blks_hit::float / (blks_hit + blks_read)::float
END as ratio
FROM pg_stat_database
WHERE datname = current_database()`
);
return {
name: dbInfo[0].datname,
size: dbInfo[0].size,
connections: {
active: parseInt(activeConns[0].count),
idle: parseInt(idleConns[0].count),
total: parseInt(totalConns[0].count),
max: parseInt(connectionInfo[0].max_conn)
},
uptime: uptime[0].uptime,
transactions: {
committed: parseInt(txStats[0].xact_commit),
rolledBack: parseInt(txStats[0].xact_rollback)
},
cacheHitRatio: cacheHit[0].ratio
};
}
/**
* Get table-level metrics
*/
async function getTableMetrics(db: DatabaseConnection): Promise<TableMetrics[]> {
const tableStats = await db.query<{
relname: string;
size: string;
n_live_tup: string;
n_dead_tup: string;
last_vacuum: string | null;
last_analyze: string | null;
seq_scan: string;
idx_scan: string;
}>(
`SELECT
c.relname,
pg_size_pretty(pg_total_relation_size(c.oid)) as size,
s.n_live_tup,
s.n_dead_tup,
s.last_vacuum,
s.last_analyze,
s.seq_scan,
s.idx_scan
FROM pg_class c
JOIN pg_stat_user_tables s ON s.relid = c.oid
WHERE c.relkind = 'r'
ORDER BY c.relname`
);
return tableStats.map(table => ({
name: table.relname,
size: table.size,
rowCount: parseInt(table.n_live_tup),
deadTuples: parseInt(table.n_dead_tup),
lastVacuum: table.last_vacuum,
lastAnalyze: table.last_analyze,
scanCount: parseInt(table.seq_scan),
indexUseRatio: parseInt(table.seq_scan) + parseInt(table.idx_scan) > 0
? parseInt(table.idx_scan) / (parseInt(table.seq_scan) + parseInt(table.idx_scan))
: 0
}));
}
/**
* Get information about active queries
*/
async function getActiveQueries(db: DatabaseConnection): Promise<ActiveQueryInfo[]> {
const queries = await db.query<{
pid: string;
usename: string;
datname: string;
query_start: string;
state: string;
wait_event: string | null;
query: string;
}>(
`SELECT
pid,
usename,
datname,
query_start::text,
state,
wait_event,
query
FROM pg_stat_activity
WHERE state != 'idle'
AND pid <> pg_backend_pid()
ORDER BY query_start`
);
const now = new Date();
return queries.map(q => {
const startTime = new Date(q.query_start);
const durationSeconds = (now.getTime() - startTime.getTime()) / 1000;
return {
pid: parseInt(q.pid),
username: q.usename,
database: q.datname,
startTime: q.query_start,
duration: durationSeconds,
state: q.state,
waitEvent: q.wait_event || undefined,
query: q.query
};
});
}
/**
* Get information about locks
*/
async function getLockInfo(db: DatabaseConnection): Promise<LockInfo[]> {
const locks = await db.query<{
relation: string;
mode: string;
granted: string;
pid: string;
usename: string;
query: string;
}>(
`SELECT
CASE
WHEN l.relation IS NOT NULL THEN (SELECT relname FROM pg_class WHERE oid = l.relation)
ELSE 'transactionid'
END as relation,
l.mode,
l.granted::text,
l.pid,
a.usename,
a.query
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE l.pid <> pg_backend_pid()
ORDER BY relation, mode`
);
return locks.map(lock => ({
relation: lock.relation,
mode: lock.mode,
granted: lock.granted === 't',
pid: parseInt(lock.pid),
username: lock.usename,
query: lock.query
}));
}
/**
* Get information about replication
*/
async function getReplicationInfo(db: DatabaseConnection): Promise<ReplicationInfo[]> {
const replication = await db.query<{
client_addr: string | null;
state: string;
sent_lsn: string;
write_lsn: string;
flush_lsn: string;
replay_lsn: string;
write_lag: string | null;
flush_lag: string | null;
replay_lag: string | null;
}>(
`SELECT
client_addr,
state,
sent_lsn::text,
write_lsn::text,
flush_lsn::text,
replay_lsn::text,
write_lag::text,
flush_lag::text,
replay_lag::text
FROM pg_stat_replication`
);
return replication.map(rep => ({
clientAddr: rep.client_addr || 'local',
state: rep.state,
sentLsn: rep.sent_lsn,
writeLsn: rep.write_lsn,
flushLsn: rep.flush_lsn,
replayLsn: rep.replay_lsn,
writeLag: rep.write_lag,
flushLag: rep.flush_lag,
replayLag: rep.replay_lag
}));
}