Skip to main content
Glama
schema-migration.ts7.81 kB
/** * Schema Migration System * * Manages database schema migrations with versioning and rollback support. * Implements idempotent migrations for PostgreSQL with pgvector. */ import { readFileSync } from "fs"; import { dirname, join } from "path"; import type { PoolClient } from "pg"; import { fileURLToPath } from "url"; import { Logger } from "../utils/logger.js"; import type { DatabaseConnectionManager } from "./connection-manager"; // ES module __dirname equivalent const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); /** * Migration definition */ export interface Migration { version: number; name: string; upFile: string; // Path to SQL file for migration downFile: string; // Path to SQL file for rollback appliedAt?: Date; } /** * Schema Migration System class * Manages database schema creation, migration, and rollback */ export class SchemaMigrationSystem { private dbManager: DatabaseConnectionManager; private migrationsDir: string; private migrations: Migration[]; constructor(dbManager: DatabaseConnectionManager, migrationsDir?: string) { this.dbManager = dbManager; this.migrationsDir = migrationsDir ?? join(__dirname, "migrations"); // Define migrations in order this.migrations = [ { version: 1, name: "initial_schema", upFile: "001_initial_schema.sql", downFile: "001_initial_schema_down.sql", }, { version: 2, name: "create_indexes", upFile: "002_create_indexes.sql", downFile: "002_create_indexes_down.sql", }, { version: 3, name: "reinforcement_history", upFile: "003_reinforcement_history.sql", downFile: "003_reinforcement_history_down.sql", }, { version: 4, name: "full_text_search", upFile: "004_full_text_search.sql", downFile: "004_full_text_search_down.sql", }, ]; } /** * Initialize migration tracking table */ private async initializeMigrationTable(client: PoolClient): Promise<void> { await client.query(` CREATE TABLE IF NOT EXISTS schema_migrations ( version INTEGER PRIMARY KEY, name TEXT NOT NULL, applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) `); } /** * Get current schema version */ async getCurrentVersion(): Promise<number> { const client = await this.dbManager.getConnection(); try { await this.initializeMigrationTable(client); const result = await client.query("SELECT MAX(version) as version FROM schema_migrations"); return result.rows[0]?.version ?? 0; } finally { this.dbManager.releaseConnection(client); } } /** * Run all pending migrations */ async runMigrations(): Promise<void> { const currentVersion = await this.getCurrentVersion(); // Find migrations that need to be applied const pendingMigrations = this.migrations.filter((m) => m.version > currentVersion); if (pendingMigrations.length === 0) { // All migrations are up to date return; } // Running pending migrations for (const migration of pendingMigrations) { await this.applyMigration(migration); } // All migrations completed successfully } /** * Apply a single migration */ private async applyMigration(migration: Migration): Promise<void> { const client = await this.dbManager.beginTransaction(); try { Logger.info(` ⏳ Applying migration ${migration.version}: ${migration.name}...`); // Read migration SQL const sqlPath = join(this.migrationsDir, migration.upFile); const sql = readFileSync(sqlPath, "utf-8"); // Execute the entire SQL file as a single statement // PostgreSQL can handle multiple statements separated by semicolons await client.query(sql); // Record migration await client.query("INSERT INTO schema_migrations (version, name) VALUES ($1, $2)", [ migration.version, migration.name, ]); await this.dbManager.commitTransaction(client); Logger.info(` ✅ Migration ${migration.version} applied successfully`); } catch (error) { await this.dbManager.rollbackTransaction(client); throw new Error( `Failed to apply migration ${migration.version}: ${error instanceof Error ? error.message : String(error)}` ); } } /** * Rollback to a specific version */ async rollbackMigration(targetVersion: number): Promise<void> { const currentVersion = await this.getCurrentVersion(); if (targetVersion >= currentVersion) { Logger.info("✅ Already at or below target version"); return; } // Find migrations to rollback (in reverse order) const migrationsToRollback = this.migrations .filter((m) => m.version > targetVersion && m.version <= currentVersion) .reverse(); if (migrationsToRollback.length === 0) { Logger.info("✅ No migrations to rollback"); return; } Logger.info(`📦 Rolling back ${migrationsToRollback.length} migrations...`); for (const migration of migrationsToRollback) { await this.rollbackSingleMigration(migration); } Logger.info("✅ Rollback completed successfully"); } /** * Rollback a single migration */ private async rollbackSingleMigration(migration: Migration): Promise<void> { const client = await this.dbManager.beginTransaction(); try { Logger.info(` ⏳ Rolling back migration ${migration.version}: ${migration.name}...`); // Read and execute rollback SQL const sqlPath = join(this.migrationsDir, migration.downFile); const sql = readFileSync(sqlPath, "utf-8"); await client.query(sql); // Remove migration record await client.query("DELETE FROM schema_migrations WHERE version = $1", [migration.version]); await this.dbManager.commitTransaction(client); Logger.info(` ✅ Migration ${migration.version} rolled back successfully`); } catch (error) { await this.dbManager.rollbackTransaction(client); throw new Error( `Failed to rollback migration ${migration.version}: ${error instanceof Error ? error.message : String(error)}` ); } } /** * Create memories table */ async createMemoriesTable(): Promise<void> { // This is handled by migration 001 await this.runMigrations(); } /** * Create embeddings table */ async createEmbeddingsTable(): Promise<void> { // This is handled by migration 001 await this.runMigrations(); } /** * Create links table */ async createLinksTable(): Promise<void> { // This is handled by migration 001 await this.runMigrations(); } /** * Create metadata table */ async createMetadataTable(): Promise<void> { // This is handled by migration 001 await this.runMigrations(); } /** * Create emotions table */ async createEmotionsTable(): Promise<void> { // This is handled by migration 001 await this.runMigrations(); } /** * Create all indexes */ async createIndexes(): Promise<void> { // This is handled by migration 002 await this.runMigrations(); } /** * Enable pgvector extension */ async enablePgvectorExtension(): Promise<void> { // This is handled by migration 001 await this.runMigrations(); } /** * Reset database (drop all tables and rerun migrations) */ async resetDatabase(): Promise<void> { Logger.info("🗑️ Resetting database..."); // Rollback all migrations await this.rollbackMigration(0); // Rerun all migrations await this.runMigrations(); Logger.info("✅ Database reset complete"); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keyurgolani/ThoughtMcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server