import pg from "pg";
import { logger } from "../utils/logger.js";
import type { ThoughtRecord } from "./structuredThinking.js";
import type { SQLitePlannerService } from "./sqlitePlanner.js";
const { Client } = pg;
export interface SyncStatus {
readonly lastSyncAt: string;
readonly pendingLocal: number;
readonly pendingRemote: number;
readonly conflictsDetected: number;
readonly syncState: "idle" | "syncing" | "error";
readonly lastError?: string;
}
export interface SyncConfig {
readonly postgresConnectionString?: string;
readonly syncIntervalMs: number;
readonly batchSize: number;
readonly conflictResolution: "last-write-wins" | "manual" | "merge";
readonly enableAutoSync: boolean;
}
export interface ConflictRecord {
readonly thoughtId: string;
readonly localVersion: ThoughtRecord;
readonly remoteVersion: ThoughtRecord;
readonly detectedAt: string;
}
/**
* DatabaseSyncService manages synchronization between local SQLite cache
* and production PostgreSQL database.
*
* **Architecture**:
* - SQLite: Fast local cache, offline resilience, command queue
* - PostgreSQL: Source of truth, distributed coordination, audit trail
*
* **Sync Strategy**:
* - Background worker pushes local changes every N seconds
* - Pulls remote changes on startup and periodically
* - Conflict resolution via configurable strategies
* - Offline queue persists commands when network unavailable
*/
export class DatabaseSyncService {
private readonly config: SyncConfig;
private readonly localPlanner: SQLitePlannerService;
private syncIntervalTimer: NodeJS.Timeout | null = null;
private currentlySyncing = false;
private pgClient: pg.Client | null = null;
private lastSyncTimestamp: Date | null = null;
private status: SyncStatus = {
lastSyncAt: new Date().toISOString(),
pendingLocal: 0,
pendingRemote: 0,
conflictsDetected: 0,
syncState: "idle",
};
public constructor(
localPlanner: SQLitePlannerService,
config: Partial<SyncConfig> = {},
) {
this.localPlanner = localPlanner;
this.config = {
syncIntervalMs: config.syncIntervalMs ?? 60000, // Default 1 minute
batchSize: config.batchSize ?? 50,
conflictResolution: config.conflictResolution ?? "last-write-wins",
enableAutoSync: config.enableAutoSync ?? true,
postgresConnectionString: config.postgresConnectionString,
};
if (this.config.enableAutoSync && this.config.postgresConnectionString) {
this.startAutoSync();
}
}
/**
* Start background sync worker
*/
public startAutoSync(): void {
if (this.syncIntervalTimer) {
logger.warn("Auto-sync already running");
return;
}
if (!this.config.postgresConnectionString) {
logger.warn("Cannot start auto-sync: PostgreSQL connection string not configured");
return;
}
logger.info("Starting auto-sync", {
intervalMs: this.config.syncIntervalMs,
batchSize: this.config.batchSize,
});
this.syncIntervalTimer = setInterval(() => {
void this.syncNow();
}, this.config.syncIntervalMs);
}
/**
* Stop background sync worker
*/
public stopAutoSync(): void {
if (this.syncIntervalTimer) {
clearInterval(this.syncIntervalTimer);
this.syncIntervalTimer = null;
logger.info("Auto-sync stopped");
}
}
/**
* Manually trigger immediate sync
*/
public async syncNow(): Promise<SyncStatus> {
if (this.currentlySyncing) {
logger.debug("Sync already in progress, skipping");
return this.status;
}
if (!this.config.postgresConnectionString) {
logger.warn("Cannot sync: PostgreSQL not configured");
return this.status;
}
this.currentlySyncing = true;
this.status = { ...this.status, syncState: "syncing" };
try {
logger.debug("Starting sync cycle");
// Phase 1: Push local changes to PostgreSQL
await this.pushLocalChanges();
// Phase 2: Pull remote changes from PostgreSQL
await this.pullRemoteChanges();
// Phase 3: Resolve conflicts
await this.resolveConflicts();
this.status = {
lastSyncAt: new Date().toISOString(),
pendingLocal: 0,
pendingRemote: 0,
conflictsDetected: this.status.conflictsDetected,
syncState: "idle",
};
logger.info("Sync completed successfully", this.status);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.status = {
...this.status,
syncState: "error",
lastError: errorMessage,
};
logger.error("Sync failed", { error: errorMessage });
} finally {
this.currentlySyncing = false;
}
return this.status;
}
/**
* Get current sync status
*/
public getStatus(): SyncStatus {
return { ...this.status };
}
/**
* Check if PostgreSQL is configured and reachable
*/
public async checkPostgresHealth(): Promise<boolean> {
if (!this.config.postgresConnectionString) {
return false;
}
try {
await this.ensureConnection();
const result = await this.pgClient!.query("SELECT 1");
return result.rowCount === 1;
} catch (error) {
logger.error("PostgreSQL health check failed", { error });
return false;
}
}
/**
* Ensure PostgreSQL connection is established
*/
private async ensureConnection(): Promise<void> {
if (this.pgClient) {
return;
}
if (!this.config.postgresConnectionString) {
throw new Error("PostgreSQL connection string not configured");
}
this.pgClient = new Client({
connectionString: this.config.postgresConnectionString,
});
await this.pgClient.connect();
logger.info("PostgreSQL connection established for DatabaseSyncService");
// Create structured_thoughts table if it doesn't exist
await this.pgClient.query(`
CREATE TABLE IF NOT EXISTS structured_thoughts (
id TEXT PRIMARY KEY,
stage TEXT NOT NULL,
thought TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
ordering INTEGER NOT NULL,
metadata JSONB,
server_id TEXT,
synced_at TIMESTAMPTZ DEFAULT NOW()
)
`);
// Create index for efficient sync queries
await this.pgClient.query(`
CREATE INDEX IF NOT EXISTS idx_structured_thoughts_synced_at
ON structured_thoughts(synced_at)
`);
}
/**
* Push local SQLite changes to PostgreSQL
*/
private async pushLocalChanges(): Promise<void> {
await this.ensureConnection();
// Get all thoughts from local SQLite
const localThoughts = this.localPlanner.getAllThoughts();
if (localThoughts.length === 0) {
logger.debug("No local thoughts to push");
return;
}
const serverId = process.env.IT_MCP_SERVER_ID ?? "unknown";
// Batch insert/update to PostgreSQL
const batchSize = this.config.batchSize;
for (let i = 0; i < localThoughts.length; i += batchSize) {
const batch = localThoughts.slice(i, i + batchSize);
for (const thought of batch) {
await this.pgClient!.query(
`
INSERT INTO structured_thoughts (id, stage, thought, timestamp, ordering, metadata, server_id, synced_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
ON CONFLICT (id) DO UPDATE SET
stage = EXCLUDED.stage,
thought = EXCLUDED.thought,
timestamp = EXCLUDED.timestamp,
ordering = EXCLUDED.ordering,
metadata = EXCLUDED.metadata,
server_id = EXCLUDED.server_id,
synced_at = NOW()
`,
[
thought.id,
thought.stage,
thought.thought,
thought.timestamp,
thought.order,
thought.metadata ? JSON.stringify(thought.metadata) : null,
serverId,
],
);
}
logger.debug("Pushed batch to PostgreSQL", {
batchStart: i,
batchSize: batch.length,
});
}
logger.info("Pushed local changes to PostgreSQL", {
thoughtsCount: localThoughts.length,
});
}
/**
* Pull remote PostgreSQL changes to local SQLite
*/
private async pullRemoteChanges(): Promise<void> {
await this.ensureConnection();
// Query for thoughts modified since last sync
const sinceTimestamp = this.lastSyncTimestamp?.toISOString() ?? "1970-01-01T00:00:00Z";
const result = await this.pgClient!.query(
`
SELECT id, stage, thought, timestamp, ordering, metadata
FROM structured_thoughts
WHERE synced_at > $1
ORDER BY synced_at ASC
`,
[sinceTimestamp],
);
if (result.rows.length === 0) {
logger.debug("No remote changes to pull");
return;
}
// Insert remote thoughts into local SQLite
for (const row of result.rows) {
const thought: ThoughtRecord = {
id: row.id,
stage: row.stage,
order: row.ordering,
thought: row.thought,
timestamp: row.timestamp,
metadata: row.metadata ? JSON.parse(row.metadata as string) : undefined,
};
// Use upsert to handle conflicts
this.localPlanner.upsertThought(thought);
}
logger.info("Pulled remote changes from PostgreSQL", {
thoughtsCount: result.rows.length,
});
this.lastSyncTimestamp = new Date();
}
/**
* Resolve conflicts between local and remote versions
*/
private async resolveConflicts(): Promise<void> {
// With upsert strategy and timestamp-based sync, conflicts are automatically
// resolved by last-write-wins (newer timestamp overwrites older)
//
// Future enhancement: Could track conflicts in a separate table for manual review
logger.debug("Conflict resolution using strategy", {
strategy: this.config.conflictResolution,
});
// For now, conflicts are implicitly resolved during push/pull
// by comparing timestamps and keeping the newer version
}
/**
* Get list of unresolved conflicts
*/
public async getConflicts(): Promise<ConflictRecord[]> {
// TODO: Query local conflicts table
logger.debug("Get conflicts (not yet implemented)");
return [];
}
/**
* Manually resolve a conflict
*/
public async resolveConflict(
thoughtId: string,
resolution: "keep-local" | "keep-remote" | "merge",
): Promise<void> {
logger.debug("Manual conflict resolution (not yet implemented)", {
thoughtId,
resolution,
});
// TODO: Apply resolution and remove from conflicts table
}
/**
* Cleanup: stop sync worker and close PostgreSQL connection
*/
public async destroy(): Promise<void> {
this.stopAutoSync();
if (this.pgClient) {
await this.pgClient.end();
this.pgClient = null;
logger.info("PostgreSQL connection closed");
}
logger.info("DatabaseSyncService destroyed");
}
}