/**
* Replication Resource
*
* Primary/replica status, replication slots, WAL status, and lag monitoring.
*/
import type { PostgresAdapter } from '../PostgresAdapter.js';
import type { ResourceDefinition, RequestContext } from '../../../types/index.js';
interface ReplicationInfo {
role: string;
replicationSlots: Record<string, unknown>[];
replicationStats: Record<string, unknown>[];
walStatus: Record<string, unknown>;
replicationDelay?: string;
statusMessage: string;
}
export function createReplicationResource(adapter: PostgresAdapter): ResourceDefinition {
return {
uri: 'postgres://replication',
name: 'Replication Status',
description: 'Primary/replica status, replication slots, WAL status, and lag monitoring',
mimeType: 'application/json',
handler: async (_uri: string, _context: RequestContext) => {
// Check if we're on primary or replica
const roleResult = await adapter.executeQuery('SELECT pg_is_in_recovery() as is_replica');
const isReplica = roleResult.rows?.[0]?.['is_replica'] === true;
// Determine if this is a standalone server (not a replica and no replicas connected)
let role: string;
let statusMessage: string;
let replicationSlots: Record<string, unknown>[] = [];
let replicationStats: Record<string, unknown>[] = [];
if (isReplica) {
role = 'replica';
statusMessage = 'This server is a replica receiving data from a primary server.';
} else {
// Primary server - check for replication activity
const slotsResult = await adapter.executeQuery(`
SELECT
slot_name,
slot_type,
database,
active,
restart_lsn,
confirmed_flush_lsn,
wal_status,
safe_wal_size
FROM pg_replication_slots
`);
replicationSlots = slotsResult.rows ?? [];
// Get replication statistics
const statsResult = await adapter.executeQuery(`
SELECT
client_addr,
application_name,
state,
sync_state,
replay_lsn,
write_lag,
flush_lag,
replay_lag
FROM pg_stat_replication
`);
replicationStats = statsResult.rows ?? [];
// Determine if standalone (not a replica and no replication activity)
const hasReplicationSlots = replicationSlots.length > 0;
const hasConnectedReplicas = replicationStats.length > 0;
if (hasReplicationSlots || hasConnectedReplicas) {
role = 'primary';
const activeSlots = replicationSlots.filter(s => s['active'] === true).length;
if (hasConnectedReplicas) {
statusMessage = `Primary server with ${replicationStats.length.toString()} connected replica(s).`;
} else if (activeSlots === 0 && hasReplicationSlots) {
statusMessage = `Primary server with ${replicationSlots.length.toString()} replication slot(s) but no connected replicas. Check replica connectivity.`;
} else {
statusMessage = 'Primary server configured for replication.';
}
} else {
role = 'standalone';
statusMessage = 'Standalone PostgreSQL server with no replication configured. This is expected if you do not require high availability or read replicas.';
}
}
const replicationInfo: ReplicationInfo = {
role,
replicationSlots,
replicationStats,
walStatus: {},
statusMessage
};
if (isReplica) {
// Replica server - get replication delay
const lagResult = await adapter.executeQuery(`
SELECT
now() - pg_last_xact_replay_timestamp() AS replication_delay
`);
const delay = lagResult.rows?.[0]?.['replication_delay'];
// Handle interval type from PostgreSQL - convert to string representation
if (delay != null && typeof delay === 'object') {
replicationInfo.replicationDelay = JSON.stringify(delay);
} else if (typeof delay === 'string') {
replicationInfo.replicationDelay = delay;
} else if (delay != null) {
replicationInfo.replicationDelay = JSON.stringify(delay);
} else {
replicationInfo.replicationDelay = 'Unknown';
}
}
// Get WAL status (works on both primary and replica)
try {
const walResult = await adapter.executeQuery(`
SELECT
pg_current_wal_lsn() as current_wal_lsn,
pg_walfile_name(pg_current_wal_lsn()) as current_wal_file
`);
replicationInfo.walStatus = walResult.rows?.[0] ?? {};
} catch {
// pg_current_wal_lsn() might fail on replica
replicationInfo.walStatus = { note: 'WAL position unavailable (replica mode)' };
}
return replicationInfo;
}
};
}