Skip to main content
Glama
databaseSync.ts11 kB
import pg from "pg"; import { logger } from "../utils/logger.js"; import type { ThoughtRecord } from "./structuredThinking.js"; import type { SQLitePlannerService } from "./sqlitePlanner.js"; const { Client } = pg; export interface SyncStatus { readonly lastSyncAt: string; readonly pendingLocal: number; readonly pendingRemote: number; readonly conflictsDetected: number; readonly syncState: "idle" | "syncing" | "error"; readonly lastError?: string; } export interface SyncConfig { readonly postgresConnectionString?: string; readonly syncIntervalMs: number; readonly batchSize: number; readonly conflictResolution: "last-write-wins" | "manual" | "merge"; readonly enableAutoSync: boolean; } export interface ConflictRecord { readonly thoughtId: string; readonly localVersion: ThoughtRecord; readonly remoteVersion: ThoughtRecord; readonly detectedAt: string; } /** * DatabaseSyncService manages synchronization between local SQLite cache * and production PostgreSQL database. * * **Architecture**: * - SQLite: Fast local cache, offline resilience, command queue * - PostgreSQL: Source of truth, distributed coordination, audit trail * * **Sync Strategy**: * - Background worker pushes local changes every N seconds * - Pulls remote changes on startup and periodically * - Conflict resolution via configurable strategies * - Offline queue persists commands when network unavailable */ export class DatabaseSyncService { private readonly config: SyncConfig; private readonly localPlanner: SQLitePlannerService; private syncIntervalTimer: NodeJS.Timeout | null = null; private currentlySyncing = false; private pgClient: pg.Client | null = null; private lastSyncTimestamp: Date | null = null; private status: SyncStatus = { lastSyncAt: new Date().toISOString(), pendingLocal: 0, pendingRemote: 0, conflictsDetected: 0, syncState: "idle", }; public constructor( localPlanner: SQLitePlannerService, config: Partial<SyncConfig> = {}, ) { this.localPlanner = localPlanner; this.config = { syncIntervalMs: config.syncIntervalMs ?? 60000, // Default 1 minute batchSize: config.batchSize ?? 50, conflictResolution: config.conflictResolution ?? "last-write-wins", enableAutoSync: config.enableAutoSync ?? true, postgresConnectionString: config.postgresConnectionString, }; if (this.config.enableAutoSync && this.config.postgresConnectionString) { this.startAutoSync(); } } /** * Start background sync worker */ public startAutoSync(): void { if (this.syncIntervalTimer) { logger.warn("Auto-sync already running"); return; } if (!this.config.postgresConnectionString) { logger.warn("Cannot start auto-sync: PostgreSQL connection string not configured"); return; } logger.info("Starting auto-sync", { intervalMs: this.config.syncIntervalMs, batchSize: this.config.batchSize, }); this.syncIntervalTimer = setInterval(() => { void this.syncNow(); }, this.config.syncIntervalMs); } /** * Stop background sync worker */ public stopAutoSync(): void { if (this.syncIntervalTimer) { clearInterval(this.syncIntervalTimer); this.syncIntervalTimer = null; logger.info("Auto-sync stopped"); } } /** * Manually trigger immediate sync */ public async syncNow(): Promise<SyncStatus> { if (this.currentlySyncing) { logger.debug("Sync already in progress, skipping"); return this.status; } if (!this.config.postgresConnectionString) { logger.warn("Cannot sync: PostgreSQL not configured"); return this.status; } this.currentlySyncing = true; this.status = { ...this.status, syncState: "syncing" }; try { logger.debug("Starting sync cycle"); // Phase 1: Push local changes to PostgreSQL await this.pushLocalChanges(); // Phase 2: Pull remote changes from PostgreSQL await this.pullRemoteChanges(); // Phase 3: Resolve conflicts await this.resolveConflicts(); this.status = { lastSyncAt: new Date().toISOString(), pendingLocal: 0, pendingRemote: 0, conflictsDetected: this.status.conflictsDetected, syncState: "idle", }; logger.info("Sync completed successfully", this.status); } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); this.status = { ...this.status, syncState: "error", lastError: errorMessage, }; logger.error("Sync failed", { error: errorMessage }); } finally { this.currentlySyncing = false; } return this.status; } /** * Get current sync status */ public getStatus(): SyncStatus { return { ...this.status }; } /** * Check if PostgreSQL is configured and reachable */ public async checkPostgresHealth(): Promise<boolean> { if (!this.config.postgresConnectionString) { return false; } try { await this.ensureConnection(); const result = await this.pgClient!.query("SELECT 1"); return result.rowCount === 1; } catch (error) { logger.error("PostgreSQL health check failed", { error }); return false; } } /** * Ensure PostgreSQL connection is established */ private async ensureConnection(): Promise<void> { if (this.pgClient) { return; } if (!this.config.postgresConnectionString) { throw new Error("PostgreSQL connection string not configured"); } this.pgClient = new Client({ connectionString: this.config.postgresConnectionString, }); await this.pgClient.connect(); logger.info("PostgreSQL connection established for DatabaseSyncService"); // Create structured_thoughts table if it doesn't exist await this.pgClient.query(` CREATE TABLE IF NOT EXISTS structured_thoughts ( id TEXT PRIMARY KEY, stage TEXT NOT NULL, thought TEXT NOT NULL, timestamp TIMESTAMPTZ NOT NULL, ordering INTEGER NOT NULL, metadata JSONB, server_id TEXT, synced_at TIMESTAMPTZ DEFAULT NOW() ) `); // Create index for efficient sync queries await this.pgClient.query(` CREATE INDEX IF NOT EXISTS idx_structured_thoughts_synced_at ON structured_thoughts(synced_at) `); } /** * Push local SQLite changes to PostgreSQL */ private async pushLocalChanges(): Promise<void> { await this.ensureConnection(); // Get all thoughts from local SQLite const localThoughts = this.localPlanner.getAllThoughts(); if (localThoughts.length === 0) { logger.debug("No local thoughts to push"); return; } const serverId = process.env.IT_MCP_SERVER_ID ?? "unknown"; // Batch insert/update to PostgreSQL const batchSize = this.config.batchSize; for (let i = 0; i < localThoughts.length; i += batchSize) { const batch = localThoughts.slice(i, i + batchSize); for (const thought of batch) { await this.pgClient!.query( ` INSERT INTO structured_thoughts (id, stage, thought, timestamp, ordering, metadata, server_id, synced_at) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) ON CONFLICT (id) DO UPDATE SET stage = EXCLUDED.stage, thought = EXCLUDED.thought, timestamp = EXCLUDED.timestamp, ordering = EXCLUDED.ordering, metadata = EXCLUDED.metadata, server_id = EXCLUDED.server_id, synced_at = NOW() `, [ thought.id, thought.stage, thought.thought, thought.timestamp, thought.order, thought.metadata ? JSON.stringify(thought.metadata) : null, serverId, ], ); } logger.debug("Pushed batch to PostgreSQL", { batchStart: i, batchSize: batch.length, }); } logger.info("Pushed local changes to PostgreSQL", { thoughtsCount: localThoughts.length, }); } /** * Pull remote PostgreSQL changes to local SQLite */ private async pullRemoteChanges(): Promise<void> { await this.ensureConnection(); // Query for thoughts modified since last sync const sinceTimestamp = this.lastSyncTimestamp?.toISOString() ?? "1970-01-01T00:00:00Z"; const result = await this.pgClient!.query( ` SELECT id, stage, thought, timestamp, ordering, metadata FROM structured_thoughts WHERE synced_at > $1 ORDER BY synced_at ASC `, [sinceTimestamp], ); if (result.rows.length === 0) { logger.debug("No remote changes to pull"); return; } // Insert remote thoughts into local SQLite for (const row of result.rows) { const thought: ThoughtRecord = { id: row.id, stage: row.stage, order: row.ordering, thought: row.thought, timestamp: row.timestamp, metadata: row.metadata ? JSON.parse(row.metadata as string) : undefined, }; // Use upsert to handle conflicts this.localPlanner.upsertThought(thought); } logger.info("Pulled remote changes from PostgreSQL", { thoughtsCount: result.rows.length, }); this.lastSyncTimestamp = new Date(); } /** * Resolve conflicts between local and remote versions */ private async resolveConflicts(): Promise<void> { // With upsert strategy and timestamp-based sync, conflicts are automatically // resolved by last-write-wins (newer timestamp overwrites older) // // Future enhancement: Could track conflicts in a separate table for manual review logger.debug("Conflict resolution using strategy", { strategy: this.config.conflictResolution, }); // For now, conflicts are implicitly resolved during push/pull // by comparing timestamps and keeping the newer version } /** * Get list of unresolved conflicts */ public async getConflicts(): Promise<ConflictRecord[]> { // TODO: Query local conflicts table logger.debug("Get conflicts (not yet implemented)"); return []; } /** * Manually resolve a conflict */ public async resolveConflict( thoughtId: string, resolution: "keep-local" | "keep-remote" | "merge", ): Promise<void> { logger.debug("Manual conflict resolution (not yet implemented)", { thoughtId, resolution, }); // TODO: Apply resolution and remove from conflicts table } /** * Cleanup: stop sync worker and close PostgreSQL connection */ public async destroy(): Promise<void> { this.stopAutoSync(); if (this.pgClient) { await this.pgClient.end(); this.pgClient = null; logger.info("PostgreSQL connection closed"); } logger.info("DatabaseSyncService destroyed"); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/acampkin95/MCPCentralManager'

If you have feedback or need assistance with the MCP directory API, please join our Discord server