Skip to main content
Glama
live-monitoring.ts14.8 kB
import { z } from 'zod'; import { DatabaseConnection } from '../types.js'; import { Logger } from '../utils/logger.js'; import { executeQuery } from '../utils/database.js'; import { sanitizeIdentifier } from '../utils/sanitize.js'; const GetLiveMetricsSchema = z.object({ duration: z.number().optional().default(10000), interval: z.number().optional().default(1000), metrics: z.array(z.enum(['queries', 'connections', 'locks', 'transactions', 'cache'])).optional() }); const GetHottestTablesSchema = z.object({ schema: z.string().optional().default('public'), limit: z.number().optional().default(10), sampleDuration: z.number().optional().default(5000), orderBy: z.enum(['seq_scan', 'idx_scan', 'writes', 'size']).optional().default('seq_scan') }); const GetTableMetricsSchema = z.object({ table: z.string(), schema: z.string().optional().default('public'), includeTrends: z.boolean().optional().default(true) }); interface MetricSnapshot { timestamp: string; queries: { active: number; idle: number; idleInTransaction: number; waiting: number; }; connections: { current: number; max: number; usagePercent: number; }; transactions: { committed: number; rolledBack: number; }; cache: { hitRatio: number; blocksHit: number; blocksRead: number; }; locks: { total: number; waiting: number; }; } async function collectMetricSnapshot( connection: DatabaseConnection, logger: Logger ): Promise<MetricSnapshot> { const activityQuery = ` SELECT COUNT(*) FILTER (WHERE state = 'active') as active, COUNT(*) FILTER (WHERE state = 'idle') as idle, COUNT(*) FILTER (WHERE state = 'idle in transaction') as idle_in_transaction, COUNT(*) FILTER (WHERE wait_event IS NOT NULL AND state = 'active') as waiting FROM pg_stat_activity WHERE backend_type = 'client backend' `; const connectionsQuery = ` SELECT (SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'client backend') as current, setting::int as max FROM pg_settings WHERE name = 'max_connections' `; const transactionsQuery = ` SELECT xact_commit as committed, xact_rollback as rolled_back FROM pg_stat_database WHERE datname = current_database() `; const cacheQuery = ` SELECT COALESCE( ROUND( SUM(blks_hit) * 100.0 / NULLIF(SUM(blks_hit) + SUM(blks_read), 0), 2 ), 0 ) as hit_ratio, SUM(blks_hit) as blocks_hit, SUM(blks_read) as blocks_read FROM pg_stat_database WHERE datname = current_database() `; const locksQuery = ` SELECT COUNT(*) as total, COUNT(*) FILTER (WHERE NOT granted) as waiting FROM pg_locks `; const [activityResult, connectionsResult, transactionsResult, cacheResult, locksResult] = await Promise.all([ executeQuery(connection, logger, { query: activityQuery, params: [] }), executeQuery(connection, logger, { query: connectionsQuery, params: [] }), executeQuery(connection, logger, { query: transactionsQuery, params: [] }), executeQuery(connection, logger, { query: cacheQuery, params: [] }), executeQuery(connection, logger, { query: locksQuery, params: [] }) ]); const activity = activityResult.rows[0]; const connections = connectionsResult.rows[0]; const transactions = transactionsResult.rows[0]; const cache = cacheResult.rows[0]; const locks = locksResult.rows[0]; const current = parseInt(connections.current || '0', 10); const max = parseInt(connections.max || '100', 10); return { timestamp: new Date().toISOString(), queries: { active: parseInt(activity.active || '0', 10), idle: parseInt(activity.idle || '0', 10), idleInTransaction: parseInt(activity.idle_in_transaction || '0', 10), waiting: parseInt(activity.waiting || '0', 10) }, connections: { current, max, usagePercent: Math.round((current / max) * 100) }, transactions: { committed: parseInt(transactions.committed || '0', 10), rolledBack: parseInt(transactions.rolled_back || '0', 10) }, cache: { hitRatio: parseFloat(cache.hit_ratio || '0'), blocksHit: parseInt(cache.blocks_hit || '0', 10), blocksRead: parseInt(cache.blocks_read || '0', 10) }, locks: { total: parseInt(locks.total || '0', 10), waiting: parseInt(locks.waiting || '0', 10) } }; } export async function getLiveMetrics( connection: DatabaseConnection, logger: Logger, args: z.infer<typeof GetLiveMetricsSchema> ): Promise<any> { const { duration, interval } = args; logger.info('getLiveMetrics', 'Collecting live metrics', { duration, interval }); const snapshots: MetricSnapshot[] = []; const iterations = Math.floor(duration / interval); for (let i = 0; i < iterations; i++) { const snapshot = await collectMetricSnapshot(connection, logger); snapshots.push(snapshot); if (i < iterations - 1) { await new Promise(resolve => setTimeout(resolve, interval)); } } const first = snapshots[0]; const last = snapshots[snapshots.length - 1]; const summary = { duration: `${duration}ms`, samples: snapshots.length, averages: { activeQueries: Math.round(snapshots.reduce((sum, s) => sum + s.queries.active, 0) / snapshots.length), connectionUsage: Math.round(snapshots.reduce((sum, s) => sum + s.connections.usagePercent, 0) / snapshots.length), cacheHitRatio: Math.round(snapshots.reduce((sum, s) => sum + s.cache.hitRatio, 0) / snapshots.length * 10) / 10, waitingLocks: Math.round(snapshots.reduce((sum, s) => sum + s.locks.waiting, 0) / snapshots.length * 10) / 10 }, peaks: { maxActiveQueries: Math.max(...snapshots.map(s => s.queries.active)), maxConnections: Math.max(...snapshots.map(s => s.connections.current)), maxWaitingLocks: Math.max(...snapshots.map(s => s.locks.waiting)) }, deltas: { transactionsCommitted: last.transactions.committed - first.transactions.committed, transactionsRolledBack: last.transactions.rolledBack - first.transactions.rolledBack, blocksRead: last.cache.blocksRead - first.cache.blocksRead } }; const issues: string[] = []; if (summary.averages.connectionUsage > 80) { issues.push(`High connection usage: ${summary.averages.connectionUsage}%`); } if (summary.averages.cacheHitRatio < 90) { issues.push(`Low cache hit ratio: ${summary.averages.cacheHitRatio}%`); } if (summary.peaks.maxWaitingLocks > 5) { issues.push(`Lock contention detected: up to ${summary.peaks.maxWaitingLocks} waiting locks`); } return { summary, issues: issues.length > 0 ? issues : undefined, snapshots }; } export async function getHottestTables( connection: DatabaseConnection, logger: Logger, args: z.infer<typeof GetHottestTablesSchema> ): Promise<any> { const { schema, limit, sampleDuration, orderBy } = args; logger.info('getHottestTables', 'Identifying hottest tables', { schema, orderBy }); const sanitizedSchema = sanitizeIdentifier(schema); const beforeQuery = ` SELECT relname as table_name, seq_scan, seq_tup_read, idx_scan, idx_tup_fetch, n_tup_ins, n_tup_upd, n_tup_del FROM pg_stat_user_tables WHERE schemaname = '${sanitizedSchema}' `; const beforeResult = await executeQuery(connection, logger, { query: beforeQuery, params: [] }); const beforeStats = new Map(beforeResult.rows.map(r => [r.table_name, r])); await new Promise(resolve => setTimeout(resolve, sampleDuration)); const afterResult = await executeQuery(connection, logger, { query: beforeQuery, params: [] }); const activity = afterResult.rows.map(after => { const before = beforeStats.get(after.table_name) || after; const seqScanDelta = parseInt(after.seq_scan || '0', 10) - parseInt(before.seq_scan || '0', 10); const idxScanDelta = parseInt(after.idx_scan || '0', 10) - parseInt(before.idx_scan || '0', 10); const insertsDelta = parseInt(after.n_tup_ins || '0', 10) - parseInt(before.n_tup_ins || '0', 10); const updatesDelta = parseInt(after.n_tup_upd || '0', 10) - parseInt(before.n_tup_upd || '0', 10); const deletesDelta = parseInt(after.n_tup_del || '0', 10) - parseInt(before.n_tup_del || '0', 10); return { table: after.table_name, activity: { seqScans: seqScanDelta, idxScans: idxScanDelta, totalScans: seqScanDelta + idxScanDelta, writes: insertsDelta + updatesDelta + deletesDelta, inserts: insertsDelta, updates: updatesDelta, deletes: deletesDelta }, totals: { seqScans: parseInt(after.seq_scan || '0', 10), idxScans: parseInt(after.idx_scan || '0', 10) } }; }); let sortKey: (a: any) => number; switch (orderBy) { case 'seq_scan': sortKey = a => a.activity.seqScans; break; case 'idx_scan': sortKey = a => a.activity.idxScans; break; case 'writes': sortKey = a => a.activity.writes; break; default: sortKey = a => a.activity.totalScans; } const sorted = activity .filter(a => a.activity.totalScans > 0 || a.activity.writes > 0) .sort((a, b) => sortKey(b) - sortKey(a)) .slice(0, limit); return { schema, sampleDuration: `${sampleDuration}ms`, orderBy, tables: sorted, totalTablesWithActivity: sorted.length }; } export async function getTableMetrics( connection: DatabaseConnection, logger: Logger, args: z.infer<typeof GetTableMetricsSchema> ): Promise<any> { const { table, schema, includeTrends } = args; logger.info('getTableMetrics', 'Getting table metrics', { schema, table }); const sanitizedSchema = sanitizeIdentifier(schema); const sanitizedTable = sanitizeIdentifier(table); const statsQuery = ` SELECT relname as table_name, seq_scan, seq_tup_read, idx_scan, idx_tup_fetch, n_tup_ins, n_tup_upd, n_tup_del, n_tup_hot_upd, n_live_tup, n_dead_tup, n_mod_since_analyze, last_vacuum, last_autovacuum, last_analyze, last_autoanalyze, vacuum_count, autovacuum_count, analyze_count, autoanalyze_count FROM pg_stat_user_tables WHERE schemaname = '${sanitizedSchema}' AND relname = '${sanitizedTable}' `; const sizeQuery = ` SELECT pg_total_relation_size('${sanitizedSchema}.${sanitizedTable}') as total_bytes, pg_relation_size('${sanitizedSchema}.${sanitizedTable}') as table_bytes, pg_indexes_size('${sanitizedSchema}.${sanitizedTable}'::regclass) as index_bytes `; const ioQuery = ` SELECT heap_blks_read, heap_blks_hit, idx_blks_read, idx_blks_hit, toast_blks_read, toast_blks_hit FROM pg_statio_user_tables WHERE schemaname = '${sanitizedSchema}' AND relname = '${sanitizedTable}' `; const [statsResult, sizeResult, ioResult] = await Promise.all([ executeQuery(connection, logger, { query: statsQuery, params: [] }), executeQuery(connection, logger, { query: sizeQuery, params: [] }), executeQuery(connection, logger, { query: ioQuery, params: [] }) ]); if (statsResult.rows.length === 0) { return { error: `Table ${schema}.${table} not found` }; } const stats = statsResult.rows[0]; const size = sizeResult.rows[0]; const io = ioResult.rows[0]; const liveTuples = parseInt(stats.n_live_tup || '0', 10); const deadTuples = parseInt(stats.n_dead_tup || '0', 10); const heapHit = parseInt(io?.heap_blks_hit || '0', 10); const heapRead = parseInt(io?.heap_blks_read || '0', 10); const idxHit = parseInt(io?.idx_blks_hit || '0', 10); const idxRead = parseInt(io?.idx_blks_read || '0', 10); const result: any = { table: `${schema}.${table}`, size: { total: formatBytes(parseInt(size.total_bytes || '0', 10)), table: formatBytes(parseInt(size.table_bytes || '0', 10)), indexes: formatBytes(parseInt(size.index_bytes || '0', 10)) }, rows: { live: liveTuples, dead: deadTuples, deadPercent: liveTuples > 0 ? Math.round((deadTuples / (liveTuples + deadTuples)) * 100) : 0 }, scans: { sequential: parseInt(stats.seq_scan || '0', 10), index: parseInt(stats.idx_scan || '0', 10), seqTuplesRead: parseInt(stats.seq_tup_read || '0', 10), idxTuplesFetched: parseInt(stats.idx_tup_fetch || '0', 10) }, modifications: { inserts: parseInt(stats.n_tup_ins || '0', 10), updates: parseInt(stats.n_tup_upd || '0', 10), deletes: parseInt(stats.n_tup_del || '0', 10), hotUpdates: parseInt(stats.n_tup_hot_upd || '0', 10), modsSinceAnalyze: parseInt(stats.n_mod_since_analyze || '0', 10) }, io: { heapHitRatio: heapHit + heapRead > 0 ? Math.round((heapHit / (heapHit + heapRead)) * 100) : 100, indexHitRatio: idxHit + idxRead > 0 ? Math.round((idxHit / (idxHit + idxRead)) * 100) : 100 }, maintenance: { lastVacuum: stats.last_vacuum, lastAutovacuum: stats.last_autovacuum, lastAnalyze: stats.last_analyze, lastAutoanalyze: stats.last_autoanalyze, vacuumCount: parseInt(stats.vacuum_count || '0', 10), analyzeCount: parseInt(stats.analyze_count || '0', 10) } }; const recommendations: string[] = []; if (result.rows.deadPercent > 20) { recommendations.push(`High dead tuple ratio (${result.rows.deadPercent}%) - consider VACUUM`); } if (result.scans.sequential > result.scans.index * 10 && result.scans.sequential > 100) { recommendations.push('High sequential scan ratio - review indexes'); } if (result.io.heapHitRatio < 90) { recommendations.push(`Low heap cache hit ratio (${result.io.heapHitRatio}%) - consider increasing shared_buffers`); } if (result.modifications.modsSinceAnalyze > liveTuples * 0.1) { recommendations.push('Many modifications since last ANALYZE - statistics may be stale'); } if (recommendations.length > 0) { result.recommendations = recommendations; } return result; } function formatBytes(bytes: number): string { if (bytes === 0) return '0 B'; const k = 1024; const sizes = ['B', 'KB', 'MB', 'GB', 'TB']; const i = Math.floor(Math.log(bytes) / Math.log(k)); return `${parseFloat((bytes / Math.pow(k, i)).toFixed(2))} ${sizes[i]}`; } export const liveMonitoringTools = { getLiveMetrics: { schema: GetLiveMetricsSchema, handler: getLiveMetrics }, getHottestTables: { schema: GetHottestTablesSchema, handler: getHottestTables }, getTableMetrics: { schema: GetTableMetricsSchema, handler: getTableMetrics } };

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bluwork/postgres-scout-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server