graph-repo.js•19 kB
/**
* @file postgres/graph-repo.js
* @description
* PostgreSQL implementation of the knowledge graph repository (strict pgvector).
*/
/**
* @implements {import('../graph-repository.js').GraphRepository}
*/
export class PostgresGraphRepository {
/** @type {*|null} */
#pool = null;
/**
* Creates a new PostgresGraphRepository.
* @param {Pool} pool - PostgreSQL connection pool.
*/
constructor(pool) {
this.#pool = pool;
}
/**
* Converts Buffer to pgvector-compatible string format.
* @private
* @param {Buffer} buffer - Buffer containing float32 array.
* @returns {string} Vector string in format "[x,y,z,...]".
*/
#bufferToVector(buffer) {
const floats = Array.from(new Float32Array(buffer.buffer, buffer.byteOffset, buffer.byteLength / 4));
return `[${floats.join(',')}]`;
}
/**
* Executes a SQL query and returns rows.
* @async
* @private
* @param {string} sql - SQL query string.
* @param {Array} [params=[]] - Query parameters.
* @returns {Promise<Array>} Array of result rows.
*/
async #query(sql, params = []) {
const pool = /** @type {any} */ (this.#pool);
const res = await pool.query(sql, params);
return res.rows;
}
/**
* Performs semantic search using vector similarity.
* @async
* @private
* @param {number[]} unitVector - Embedding vector as Buffer.
* @param {number} limit - Maximum number of results to return.
* @returns {Promise<Array<{entity_id: number, distance: number, similarity}>>}
* Array of entity IDs with their similarity distances.
*/
async #semanticRows(unitVector, limit) {
const vectorLiteral = (values) => {
const s = values.join(',');
return `[${s}]`;
}
const rows = await this.#query(`SELECT entity_id,
embedding <=> $1::vector AS distance
FROM obs_vec
WHERE embedding IS NOT NULL
ORDER BY embedding <=> $1::vector LIMIT $2`,
[ vectorLiteral(unitVector), limit ]
);
return rows.map(r => {
const distance = Number(r.distance)
const similarity = 1 - distance
return {
entity_id: Number(r.entity_id),
distance,
similarity
}
});
}
/**
* Retrieves entity ID by name.
* @async
* @param {string} name - Entity name.
* @returns {Promise<number|null>}
* Entity ID if found, null otherwise.
*/
async getEntityId(name) {
const rows = await this.#query('SELECT id FROM entities WHERE name = $1', [ name ]);
return rows.length
? rows[0].id
: null;
}
/**
* Creates a new entity.
* @async
* @param {string} name - Entity name.
* @param {string} entityType - Entity type.
* @returns {Promise<number>} The ID of the created entity.
*/
async createEntity(name, entityType) {
const rows = await this.#query(
`INSERT INTO entities(name, entitytype)
VALUES ($1, $2) RETURNING id`,
[ name, entityType ]
);
return rows[0].id;
}
/**
* Gets or creates an entity ID.
* @async
* @param {string} name - Entity name.
* @param {string} entityType - Entity type.
* @returns {Promise<number>}
* Existing or newly created entity ID.
*/
async getOrCreateEntityId(name, entityType) {
const existing = await this.getEntityId(name);
if (existing !== null) return existing;
const rows = await this.#query(
`INSERT INTO entities(name, entitytype)
VALUES ($1, $2) ON CONFLICT(name)
DO
UPDATE SET entitytype = EXCLUDED.entitytype
RETURNING id`,
[ name, entityType ]
);
return rows[0].id;
}
/**
* Inserts an observation for an entity.
* @async
* @param {number} entityId - Entity ID.
* @param {string} content - Observation content.
* @returns {Promise<{inserted: boolean, observationId: number|null}>}
* Object indicating if observation was inserted and its ID.
*/
async insertObservation(entityId, content) {
const rows = await this.#query(
`INSERT INTO observations(entity_id, content)
VALUES ($1, $2) ON CONFLICT(entity_id, content) DO NOTHING
RETURNING id`,
[ entityId, content ]
);
if (!rows.length) {
const existing = await this.#query(
`SELECT id
FROM observations
WHERE entity_id = $1
AND content = $2`,
[ entityId, content ]
);
return { inserted: false, observationId: existing.length ? existing[0].id : null };
}
return { inserted: true, observationId: rows[0].id };
}
/**
* Inserts or updates observation embeddings in the vector table.
* @async
* @param {Array<{observationId: number, entityId: number, embedding: Buffer}>} rows
* Array of observation vectors to insert.
* @returns {Promise<void>}
* @throws {Error} If pgvector is not available.
*/
async insertObservationVectors(rows) {
if (!rows.length) {
return;
}
const client = /** @type {import('pg').Client} */ await this.#pool.connect();
try {
await client.query('BEGIN');
for (const row of rows) {
const embeddingParam = this.#bufferToVector(row.embedding);
await client.query(
`INSERT INTO obs_vec(observation_id, entity_id, embedding)
VALUES ($1, $2, $3::vector) ON CONFLICT(observation_id)
DO
UPDATE SET embedding = EXCLUDED.embedding,
entity_id = EXCLUDED.entity_id`,
[ row.observationId, row.entityId, embeddingParam ]
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
/**
* Creates a relation between two entities.
* @async
* @param {number} fromId - Source entity ID.
* @param {number} toId - Target entity ID.
* @param {string} relationType - Type of relation.
* @returns {Promise<boolean>}
* True if relation was created, false if it already exists.
*/
async createRelation(fromId, toId, relationType) {
const rows = await this.#query(
`INSERT INTO relations(from_id, to_id, relationtype)
VALUES ($1, $2, $3) ON CONFLICT(from_id, to_id, relationtype) DO NOTHING
RETURNING id`,
[ fromId, toId, relationType ]
);
return rows.length > 0;
}
/**
* Deletes entities by their names.
* @async
* @param {string[]} names - Array of entity names to delete.
* @returns {Promise<void>}
*/
async deleteEntities(names) {
if (!names.length) {
return;
}
await this.#query(
`DELETE
FROM entities
WHERE name = ANY ($1)`,
[ names ]
);
}
/**
* Deletes relations between entities.
* @async
* @param {Array<{from: string, to: string, relationType: string}>} relations
* Array of relations to delete with entity names and relation type.
* @returns {Promise<void>}
*/
async deleteRelations(relations) {
for (const relation of relations) {
const fromId = await this.getEntityId(relation.from);
const toId = await this.getEntityId(relation.to);
if (!fromId || !toId) {
continue;
}
await this.#query(
`DELETE
FROM relations
WHERE from_id = $1
AND to_id = $2
AND relationtype = $3`,
[ fromId, toId, relation.relationType ]
);
}
}
/**
* Deletes specific observations from an entity.
* @async
* @param {number} entityId - Entity ID from which to delete observations.
* @param {string[]} observations - Array of observation content strings to delete.
* @returns {Promise<void>}
*/
async deleteObservations(entityId, observations) {
if (!observations.length) {
return;
}
await this.#query(
`DELETE
FROM observations
WHERE entity_id = $1
AND content = ANY ($2)`,
[ entityId, observations ]
);
}
/**
* Retrieves the complete knowledge graph including all entities and relations.
* @async
* @returns {Promise<{entities: Array<{name: string, entityType: string, observations: string[]}>, relations: Array<{from: string, to: string, relationType: string}>}>}
* Object containing all entities with their observations and all relations.
*/
async readGraph() {
/**
* @type {[{entitytype:string, name:string, id}]}
*/
const entities = await this.#query('SELECT * FROM entities', []);
const observations = await this.#query('SELECT entity_id, content FROM observations', []);
/**
* @type {[{from_name: string, to_name: string, relationtype: string}]}
*/
const relations = await this.#query(
`SELECT r.from_id,
r.to_id,
r.relationtype,
ef.name AS from_name,
et.name AS to_name
FROM relations r
JOIN entities ef ON ef.id = r.from_id
JOIN entities et ON et.id = r.to_id`,
[]
);
return {
entities: entities.map(e => ({
name: e.name,
entityType: e.entitytype,
observations: observations
.filter(o => o.entity_id === e.id)
.map(o => o.content)
})),
relations: relations.map(rel => ({
from: rel.from_name,
to: rel.to_name,
relationType: rel.relationtype
}))
};
}
/**
* Retrieves detailed information for specified entities by their names.
* @async
* @param {string[]} names - Array of entity names to retrieve.
* @returns {Promise<{entities: Array<{name: string, entityType: string, observations: string[]}>, relations: Array<{from: string, to: string, relationType: string}>}>}
* Object containing specified entities with observations and relations between them.
*/
async openNodes(names) {
if (!names.length) {
return { entities: [], relations: [] };
}
/**
* @type {[{entitytype, id, name}]}
*/
const entities = await this.#query(
`SELECT *
FROM entities
WHERE name = ANY ($1)`,
[ names ]
);
if (!entities.length) {
return { entities: [], relations: [] };
}
const ids = entities.map(e => e.id);
const observations = await this.#query(
`SELECT entity_id, content
FROM observations
WHERE entity_id = ANY ($1)`,
[ ids ]
);
/**
* @type {[{name, from_name, to_name, relationtype}]}
*/
const relations = await this.#query(
`SELECT r.from_id,
r.to_id,
r.relationtype,
ef.name AS from_name,
et.name AS to_name
FROM relations r
JOIN entities ef ON ef.id = r.from_id
JOIN entities et ON et.id = r.to_id
WHERE r.from_id = ANY ($1)
AND r.to_id = ANY ($1)`,
[ ids ]
);
return {
entities: entities.map(e => ({
name: e.name,
entityType: e.entitytype,
observations: observations
.filter(o => o.entity_id === e.id)
.map(o => o.content)
})),
relations: relations.map(rel => ({
from: rel.from_name,
to: rel.to_name,
relationType: rel.relationtype
}))
};
}
/**
* Performs semantic search using vector similarity.
* @async
* @param {number[]} unitVector - Embedding vector.
* @param {number} topK - Maximum number of results to return.
* @returns {Promise<Array<{entity_id: number, distance: number}>>}
* Array of entity IDs with their similarity distances.
*/
async semanticSearch(unitVector, topK) {
return this.#semanticRows(unitVector, topK);
}
/**
* Fetches detailed metadata for specified entities including access statistics.
* @async
* @param {number[]} entityIds - Array of entity IDs to fetch details for.
* @returns {Promise<Array<{entity_id: number, name: string, entitytype: string, created_at: Date, last_accessed: Date, access_count: number, importance: string}>>}
* Array of entities with their metadata and access statistics.
*/
async fetchEntitiesWithDetails(entityIds) {
if (!entityIds.length) {
return [];
}
const normalizedIds = entityIds.map(id => Number(id));
return await this.#query(
`SELECT e.id AS entity_id,
e.name,
e.entitytype,
MIN(o.created_at) AS created_at,
MAX(o.last_accessed) AS last_accessed,
SUM(o.access_count) AS access_count,
COALESCE(
(SELECT o2.importance
FROM observations o2
WHERE o2.entity_id = e.id
ORDER BY o2.last_accessed DESC
LIMIT 1),
'normal'
) AS importance
FROM entities e
LEFT JOIN observations o ON o.entity_id = e.id
WHERE e.id = ANY ($1)
GROUP BY e.id, e.name, e.entitytype`,
[ normalizedIds ]
);
}
/**
* Retrieves entity IDs that were recently accessed, sorted by last access time.
* @async
* @param {number} limit - Maximum number of entity IDs to return.
* @returns {Promise<number[]>}
* Array of recently accessed entity IDs, most recent first.
*/
async getRecentlyAccessedEntities(limit) {
const rows = await this.#query(
`SELECT DISTINCT entity_id
FROM observations
WHERE last_accessed IS NOT NULL
ORDER BY last_accessed DESC
LIMIT $1`,
[ limit ]
);
return rows.map(r => r.entity_id);
}
/**
* Updates access statistics for specified entities.
* @async
* @param {number[]} entityIds - Array of entity IDs to update.
* @returns {Promise<void>}
*/
async updateAccessStats(entityIds) {
if (!entityIds.length) {
return;
}
const normalizedIds = entityIds.map(id => Number(id));
await this.#query(
`UPDATE observations
SET access_count = COALESCE(access_count, 0) + 1,
last_accessed = NOW()
WHERE entity_id = ANY ($1)`,
[ normalizedIds ]
);
}
/**
* Sets the importance level for all observations of an entity.
* @async
* @param {number} entityId - Entity ID to update importance for.
* @param {string} importance - Importance level (e.g., 'critical', 'important', 'normal').
* @returns {Promise<boolean>}
* True if any observations were updated, false otherwise.
*/
async setImportance(entityId, importance) {
const rows = await this.#query(
`UPDATE observations
SET importance = $1
WHERE entity_id = $2 RETURNING id`,
[ importance, entityId ]
);
return rows.length > 0;
}
/**
* Retrieves entity IDs for a list of entity names.
* @async
* @param {string[]} names - Array of entity names to look up.
* @returns {Promise<Map<string, string>>}
* Map of entity names to their IDs as strings.
*/
async getEntityIdsByNames(names) {
if (!names.length) {
return new Map();
}
const rows = await this.#query(
`SELECT name, id
FROM entities
WHERE name = ANY ($1)`,
[ names ]
);
const map = new Map();
for (const row of rows) {
map.set(row.name, row.id.toString());
}
return map;
}
/**
* Retrieves entity names for a list of entity IDs.
* @async
* @param {number[]} ids - Array of entity IDs to look up.
* @returns {Promise<Map<string, string>>}
* Map of entity IDs (as strings) to their names.
*/
async getEntityNamesByIds(ids) {
if (!ids.length) {
return new Map();
}
const normalizedIds = ids.map(id => Number(id));
const rows = await this.#query(
`SELECT id, name
FROM entities
WHERE id = ANY ($1)`,
[ normalizedIds ]
);
const map = new Map();
for (const row of rows) {
map.set(row.id.toString(), row.name);
}
return map;
}
/**
* Retrieves all relations involving specified entities.
* @async
* @param {number[]} entityIds - Array of entity IDs to get relations for.
* @returns {Promise<Array<{from_id: number, to_id: number}>>}
* Array of relations where entity is either source or target.
*/
async getRelationsForEntityIds(entityIds) {
if (!entityIds.length) {
return [];
}
const normalizedIds = entityIds.map(id => Number(id));
return this.#query(
`SELECT from_id, to_id
FROM relations
WHERE from_id = ANY ($1)
OR to_id = ANY ($1)`,
[ normalizedIds ]
);
}
}