db-manager.js•6.92 kB
/**
* @file postgresql/db-manager.js
* @description
* Provides database initialization and management.
*/
import { Pool } from 'pg';
import { PostgresGraphRepository } from './graph-repo.js';
import { PostgresMigrationManager } from './migration-manager.js';
import { migrations } from '../../migrations/postgresql/index.js';
/**
* Manages PostgreSQL database connections and schema.
* @class
*/
export class PostgresDbManager {
/** @type {*} */
#pool = null;
/** @type {import('pg').PoolConfig} */
#config;
/** @type {PostgresGraphRepository|null} */
#repository = null;
/**
* Creates a new PostgresDbManager.
* @param {object} [config={}]
* PostgreSQL connection configuration object.
*/
constructor(config = {}) {
this.#config = this.#sanitizeConfig(config);
}
/**
* Returns an initialized PostgreSQL pool.
* @async
* @returns {Promise<*>}
* The initialized PostgreSQL connection pool.
*/
async db() {
if (!this.#pool) {
this.#pool = new Pool(this.#config);
await this.#initialize();
}
return this.#pool;
}
/**
* Returns a PostgreSQL graph repository.
* @async
* @returns {Promise<PostgresGraphRepository>}
* The initialized graph repository instance.
*/
async graphRepository() {
if (!this.#repository) {
const pool = await this.db();
this.#repository = new PostgresGraphRepository(pool);
}
return this.#repository;
}
/**
* Initializes the database schema and extensions.
* @async
* @private
* @returns {Promise<void>}
* @throws {Error} If pool is not initialized or operations fail.
*/
async #initialize() {
const pool = this.#pool;
if (!pool) {
throw new Error('Postgres pool has not been initialized. Call db() first.');
}
const client = await /** @type {import('pg').Client} */ pool.connect();
try {
await this.#ensureExtensions(client);
await client.query('BEGIN');
await this.#createTables(client);
await this.#createIndexes(client);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
await this.#applyMigrations();
}
/**
* Ensures required PostgreSQL extensions are installed.
* @async
* @private
* @param {import('pg').Client} client - Database client instance.
* @returns {Promise<void>}
* @throws {Error} If pgvector extension is not available.
*/
async #ensureExtensions(client) {
try {
await client.query('CREATE EXTENSION IF NOT EXISTS vector;');
} catch (e) {
throw new Error(
'pgvector is not available in this database. ' +
`Original: ${e.message}`
);
}
}
/**
* Creates necessary database tables for the knowledge graph.
* @async
* @private
* @param {import('pg').Client} client - Database client instance.
* @returns {Promise<void>}
*/
async #createTables(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS entities
(
id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
entitytype TEXT NOT NULL
)
`);
await client.query(`
CREATE TABLE IF NOT EXISTS observations (
id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
entity_id BIGINT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
content TEXT NOT NULL,
UNIQUE (entity_id, content)
);
`);
await client.query(`
CREATE TABLE IF NOT EXISTS relations (
id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
from_id BIGINT NOT NULL
REFERENCES entities(id) ON DELETE CASCADE,
to_id BIGINT NOT NULL
REFERENCES entities(id) ON DELETE CASCADE,
relationtype TEXT NOT NULL,
UNIQUE (from_id, to_id, relationtype)
)
`);
await client.query(`
CREATE TABLE IF NOT EXISTS obs_vec (
observation_id BIGINT PRIMARY KEY
REFERENCES observations(id) ON DELETE CASCADE,
entity_id BIGINT NOT NULL
REFERENCES entities(id) ON DELETE CASCADE,
embedding vector(1024)
)
`);
}
/**
* Creates database indexes for optimized search and queries.
* @async
* @private
* @param {import('pg').Client} client - Database client instance.
* @returns {Promise<void>}
*/
async #createIndexes(client) {
await client.query(`
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE schemaname = current_schema() AND indexname = 'obs_vec_embedding_hnsw'
)
THEN
EXECUTE '
CREATE INDEX obs_vec_embedding_hnsw
ON obs_vec USING hnsw (embedding vector_cosine_ops)
';
END IF;
END
$$;
`);
}
/**
* Sanitizes and normalizes database configuration.
* @private
* @param {object} config - Raw configuration object.
* @returns {object} Sanitized configuration object.
*/
#sanitizeConfig(config) {
const sanitized = {};
if (config.connectionString) {
sanitized.connectionString = config.connectionString;
}
for (const [ key, value ] of Object.entries(config)) {
if (key === 'connectionString') {
continue;
}
if (value === undefined || value === null || value === '') {
continue;
}
sanitized[key] = key === 'port' ? Number(value) : value;
}
return sanitized;
}
/**
* Apply database migrations.
* @returns {Promise<void>}
* @private
*/
async #applyMigrations() {
const migrationManager = new PostgresMigrationManager(this.#pool);
await migrationManager.initialize();
return migrationManager.migrate(migrations, null, true);
}
}