/**
* Database service for PT-MCP Knowledge Graph
*
* Uses sql.js (SQLite WASM) for pure JavaScript SQLite implementation
* without native dependencies.
*/
import initSqlJs, { Database as SqlJsDatabase } from 'sql.js';
import { readFile, writeFile, mkdir } from 'fs/promises';
import { existsSync } from 'fs';
import { join, dirname } from 'path';
import {
SCHEMA_SQL,
Entity,
YAGOMapping,
SchemaAnnotation,
QueryCache,
ProgrammingConcept,
RDFTriple,
} from './schema.js';
/**
* Database manager for knowledge graph storage
*/
export class KnowledgeGraphDatabase {
private db: SqlJsDatabase | null = null;
private dbPath: string;
private SQL: any;
constructor(dbPath?: string) {
this.dbPath = dbPath || join(process.cwd(), 'data', 'knowledge-graph.db');
}
/**
* Initialize the database
*/
async initialize(): Promise<void> {
// Initialize sql.js
this.SQL = await initSqlJs();
// Load existing database or create new one
if (existsSync(this.dbPath)) {
const buffer = await readFile(this.dbPath);
this.db = new this.SQL.Database(buffer);
} else {
this.db = new this.SQL.Database();
await this.createSchema();
await this.save();
}
}
/**
* Create database schema
*/
private async createSchema(): Promise<void> {
if (!this.db) throw new Error('Database not initialized');
this.db.exec(SCHEMA_SQL);
}
/**
* Save database to file
*/
async save(): Promise<void> {
if (!this.db) throw new Error('Database not initialized');
const data = this.db.export();
const buffer = Buffer.from(data);
// Ensure directory exists
const dir = dirname(this.dbPath);
if (!existsSync(dir)) {
await mkdir(dir, { recursive: true });
}
await writeFile(this.dbPath, buffer);
}
/**
* Close database connection
*/
close(): void {
if (this.db) {
this.db.close();
this.db = null;
}
}
// ===== Entity Operations =====
/**
* Insert entity
*/
async insertEntity(entity: Entity): Promise<number> {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare(
'INSERT INTO entities (name, type, source_file, metadata) VALUES (?, ?, ?, ?)'
);
stmt.run([
entity.name,
entity.type,
entity.source_file || null,
entity.metadata ? JSON.stringify(entity.metadata) : null,
]);
stmt.free();
const result = this.db.exec('SELECT last_insert_rowid() as id');
await this.save();
return result[0].values[0][0] as number;
}
/**
* Find entity by name
*/
findEntityByName(name: string): Entity | null {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare('SELECT * FROM entities WHERE name = ?');
stmt.bind([name]);
if (stmt.step()) {
const row = stmt.getAsObject();
stmt.free();
return this.parseEntity(row);
}
stmt.free();
return null;
}
/**
* Find entity by ID
*/
findEntityById(id: number): Entity | null {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare('SELECT * FROM entities WHERE id = ?');
stmt.bind([id]);
if (stmt.step()) {
const row = stmt.getAsObject();
stmt.free();
return this.parseEntity(row);
}
stmt.free();
return null;
}
/**
* Find entities by type
*/
findEntitiesByType(type: string): Entity[] {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare('SELECT * FROM entities WHERE type = ?');
stmt.bind([type]);
const entities: Entity[] = [];
while (stmt.step()) {
entities.push(this.parseEntity(stmt.getAsObject()));
}
stmt.free();
return entities;
}
private parseEntity(row: any): Entity {
return {
id: row.id,
name: row.name,
type: row.type,
source_file: row.source_file,
metadata: row.metadata ? JSON.parse(row.metadata) : undefined,
created_at: row.created_at ? new Date(row.created_at) : undefined,
};
}
// ===== YAGO Mapping Operations =====
/**
* Insert or update YAGO mapping
*/
async upsertYAGOMapping(mapping: YAGOMapping): Promise<void> {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare(`
INSERT INTO yago_mappings (entity_id, yago_uri, yago_type, confidence, facts)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(entity_id) DO UPDATE SET
yago_uri = excluded.yago_uri,
yago_type = excluded.yago_type,
confidence = excluded.confidence,
facts = excluded.facts,
cached_at = CURRENT_TIMESTAMP
`);
stmt.run([
mapping.entity_id,
mapping.yago_uri,
mapping.yago_type || null,
mapping.confidence,
mapping.facts ? JSON.stringify(mapping.facts) : null,
]);
stmt.free();
await this.save();
}
/**
* Find YAGO mapping by entity ID
*/
findYAGOMappingByEntityId(entityId: number): YAGOMapping | null {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare('SELECT * FROM yago_mappings WHERE entity_id = ?');
stmt.bind([entityId]);
if (stmt.step()) {
const row = stmt.getAsObject();
stmt.free();
return this.parseYAGOMapping(row);
}
stmt.free();
return null;
}
private parseYAGOMapping(row: any): YAGOMapping {
return {
entity_id: row.entity_id,
yago_uri: row.yago_uri,
yago_type: row.yago_type,
confidence: row.confidence,
facts: row.facts ? JSON.parse(row.facts) : undefined,
cached_at: row.cached_at ? new Date(row.cached_at) : undefined,
};
}
// ===== Schema.org Annotation Operations =====
/**
* Insert or update Schema.org annotation
*/
async upsertSchemaAnnotation(annotation: SchemaAnnotation): Promise<void> {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare(`
INSERT INTO schema_annotations (entity_id, schema_type, properties, context_url)
VALUES (?, ?, ?, ?)
ON CONFLICT(entity_id) DO UPDATE SET
schema_type = excluded.schema_type,
properties = excluded.properties,
context_url = excluded.context_url
`);
stmt.run([
annotation.entity_id,
annotation.schema_type,
JSON.stringify(annotation.properties),
annotation.context_url || 'https://schema.org',
]);
stmt.free();
await this.save();
}
/**
* Find Schema.org annotation by entity ID
*/
findSchemaAnnotationByEntityId(entityId: number): SchemaAnnotation | null {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare('SELECT * FROM schema_annotations WHERE entity_id = ?');
stmt.bind([entityId]);
if (stmt.step()) {
const row = stmt.getAsObject();
stmt.free();
return this.parseSchemaAnnotation(row);
}
stmt.free();
return null;
}
private parseSchemaAnnotation(row: any): SchemaAnnotation {
return {
entity_id: row.entity_id,
schema_type: row.schema_type,
properties: JSON.parse(row.properties),
context_url: row.context_url,
};
}
// ===== Query Cache Operations =====
/**
* Cache query result
*/
async cacheQuery(cache: QueryCache): Promise<void> {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare(`
INSERT INTO query_cache (query_hash, query, results, expires_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(query_hash) DO UPDATE SET
results = excluded.results,
created_at = CURRENT_TIMESTAMP,
expires_at = excluded.expires_at
`);
stmt.run([
cache.query_hash,
cache.query,
JSON.stringify(cache.results),
cache.expires_at.toISOString(),
]);
stmt.free();
await this.save();
}
/**
* Get cached query result
*/
getCachedQuery(queryHash: string): QueryCache | null {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare(`
SELECT * FROM query_cache
WHERE query_hash = ? AND expires_at > datetime('now')
`);
stmt.bind([queryHash]);
if (stmt.step()) {
const row = stmt.getAsObject();
stmt.free();
return this.parseQueryCache(row);
}
stmt.free();
return null;
}
/**
* Clean expired cache entries
*/
async cleanExpiredCache(): Promise<number> {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare(`DELETE FROM query_cache WHERE expires_at < datetime('now')`);
stmt.run();
const changes = this.db.getRowsModified();
stmt.free();
await this.save();
return changes;
}
private parseQueryCache(row: any): QueryCache {
return {
query_hash: row.query_hash,
query: row.query,
results: JSON.parse(row.results),
created_at: row.created_at ? new Date(row.created_at) : undefined,
expires_at: new Date(row.expires_at),
};
}
// ===== Programming Concept Operations =====
/**
* Insert programming concept
*/
async insertProgrammingConcept(concept: ProgrammingConcept): Promise<number> {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare(`
INSERT INTO programming_concepts (concept_name, category, description, related_concepts, examples, metadata)
VALUES (?, ?, ?, ?, ?, ?)
`);
stmt.run([
concept.concept_name,
concept.category,
concept.description || null,
concept.related_concepts ? JSON.stringify(concept.related_concepts) : null,
concept.examples ? JSON.stringify(concept.examples) : null,
concept.metadata ? JSON.stringify(concept.metadata) : null,
]);
stmt.free();
const result = this.db.exec('SELECT last_insert_rowid() as id');
await this.save();
return result[0].values[0][0] as number;
}
/**
* Find programming concept by name
*/
findProgrammingConceptByName(name: string): ProgrammingConcept | null {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare('SELECT * FROM programming_concepts WHERE concept_name = ?');
stmt.bind([name]);
if (stmt.step()) {
const row = stmt.getAsObject();
stmt.free();
return this.parseProgrammingConcept(row);
}
stmt.free();
return null;
}
private parseProgrammingConcept(row: any): ProgrammingConcept {
return {
id: row.id,
concept_name: row.concept_name,
category: row.category,
description: row.description,
related_concepts: row.related_concepts ? JSON.parse(row.related_concepts) : undefined,
examples: row.examples ? JSON.parse(row.examples) : undefined,
metadata: row.metadata ? JSON.parse(row.metadata) : undefined,
};
}
// ===== RDF Triple Operations =====
/**
* Insert RDF triple
*/
async insertRDFTriple(triple: RDFTriple): Promise<number> {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare(`
INSERT INTO rdf_triples (subject, predicate, object, graph)
VALUES (?, ?, ?, ?)
`);
stmt.run([triple.subject, triple.predicate, triple.object, triple.graph || 'default']);
stmt.free();
const result = this.db.exec('SELECT last_insert_rowid() as id');
await this.save();
return result[0].values[0][0] as number;
}
/**
* Find RDF triples by subject
*/
findRDFTriplesBySubject(subject: string): RDFTriple[] {
if (!this.db) throw new Error('Database not initialized');
const stmt = this.db.prepare('SELECT * FROM rdf_triples WHERE subject = ?');
stmt.bind([subject]);
const triples: RDFTriple[] = [];
while (stmt.step()) {
triples.push(this.parseRDFTriple(stmt.getAsObject()));
}
stmt.free();
return triples;
}
private parseRDFTriple(row: any): RDFTriple {
return {
id: row.id,
subject: row.subject,
predicate: row.predicate,
object: row.object,
graph: row.graph,
created_at: row.created_at ? new Date(row.created_at) : undefined,
};
}
}
/**
* Singleton instance
*/
let dbInstance: KnowledgeGraphDatabase | null = null;
/**
* Get or create database instance
*/
export async function getDatabase(dbPath?: string): Promise<KnowledgeGraphDatabase> {
if (!dbInstance) {
dbInstance = new KnowledgeGraphDatabase(dbPath);
await dbInstance.initialize();
}
return dbInstance;
}