Skip to main content
Glama

documcp

by tosin2013
kg-storage.ts16.3 kB
/** * Knowledge Graph Storage Module * Implements Phase 1.1: Enhanced Storage Format * * Provides persistent storage for knowledge graph entities and relationships * using separate JSONL files with safety mechanisms. */ import { promises as fs } from "fs"; import { join, dirname } from "path"; import { GraphNode, GraphEdge } from "./knowledge-graph.js"; import { SCHEMA_METADATA } from "./schemas.js"; // File markers for safety const ENTITY_FILE_MARKER = `# DOCUMCP_KNOWLEDGE_GRAPH_ENTITIES v${SCHEMA_METADATA.version}`; const RELATIONSHIP_FILE_MARKER = `# DOCUMCP_KNOWLEDGE_GRAPH_RELATIONSHIPS v${SCHEMA_METADATA.version}`; export interface KGStorageConfig { storageDir: string; backupOnWrite?: boolean; validateOnRead?: boolean; } export interface KGStorageStats { entityCount: number; relationshipCount: number; lastModified: string; schemaVersion: string; fileSize: { entities: number; relationships: number; }; } export class KGStorage { private config: Required<KGStorageConfig>; private entityFilePath: string; private relationshipFilePath: string; private backupDir: string; constructor(config: KGStorageConfig) { this.config = { backupOnWrite: true, validateOnRead: true, ...config, }; this.entityFilePath = join( config.storageDir, "knowledge-graph-entities.jsonl", ); this.relationshipFilePath = join( config.storageDir, "knowledge-graph-relationships.jsonl", ); this.backupDir = join(config.storageDir, "backups"); } /** * Initialize storage (create directories and files if needed) */ async initialize(): Promise<void> { try { // Create storage directory await fs.mkdir(this.config.storageDir, { recursive: true }); // Create backup directory if (this.config.backupOnWrite) { await fs.mkdir(this.backupDir, { recursive: true }); } // Initialize entity file await this.initializeFile(this.entityFilePath, ENTITY_FILE_MARKER); // Initialize relationship file await this.initializeFile( this.relationshipFilePath, RELATIONSHIP_FILE_MARKER, ); } catch (error) { throw new Error( `Failed to initialize KG storage: ${ error instanceof Error ? error.message : String(error) }`, ); } } /** * Initialize a JSONL file with marker */ private async initializeFile( filePath: string, marker: string, ): Promise<void> { try { // Check if file exists await fs.access(filePath); // File exists, verify marker const firstLine = await this.readFirstLine(filePath); if (!firstLine.startsWith("# DOCUMCP_KNOWLEDGE_GRAPH")) { throw new Error( `File ${filePath} is not a DocuMCP knowledge graph file. ` + `Refusing to overwrite to prevent data loss.`, ); } } catch (error: any) { if (error.code === "ENOENT") { // File doesn't exist, create it with marker await fs.writeFile(filePath, marker + "\n", "utf-8"); } else { throw error; } } } /** * Read the first line of a file */ private async readFirstLine(filePath: string): Promise<string> { const content = await fs.readFile(filePath, "utf-8"); return content.split("\n")[0]; } /** * Save entities to storage */ async saveEntities(entities: GraphNode[]): Promise<void> { try { // Ensure parent directory exists await fs.mkdir(dirname(this.entityFilePath), { recursive: true }); // Create backup if enabled if (this.config.backupOnWrite) { await this.backupFile(this.entityFilePath, "entities"); } // Write to temporary file first (atomic write) const tempFile = `${this.entityFilePath}.tmp`; // Write marker await fs.writeFile(tempFile, ENTITY_FILE_MARKER + "\n", "utf-8"); // Append entities as JSONL for (const entity of entities) { const line = JSON.stringify(entity) + "\n"; await fs.appendFile(tempFile, line, "utf-8"); } // Atomic rename await fs.rename(tempFile, this.entityFilePath); } catch (error) { throw new Error( `Failed to save entities: ${ error instanceof Error ? error.message : String(error) }`, ); } } /** * Load entities from storage */ async loadEntities(): Promise<GraphNode[]> { try { // Check if file exists await fs.access(this.entityFilePath); const content = await fs.readFile(this.entityFilePath, "utf-8"); const lines = content.split("\n").filter((line) => line.trim()); // Skip marker line const dataLines = lines.slice(1); const entities: GraphNode[] = []; for (const line of dataLines) { try { const entity = JSON.parse(line) as GraphNode; // Validate if enabled if (this.config.validateOnRead) { this.validateEntity(entity); } entities.push(entity); } catch (error) { console.error(`Failed to parse entity line: ${line}`, error); } } return entities; } catch (error: any) { if (error.code === "ENOENT") { return []; // File doesn't exist yet } throw new Error( `Failed to load entities: ${ error instanceof Error ? error.message : String(error) }`, ); } } /** * Save relationships to storage */ async saveRelationships(relationships: GraphEdge[]): Promise<void> { try { // Ensure parent directory exists await fs.mkdir(dirname(this.relationshipFilePath), { recursive: true }); // Create backup if enabled if (this.config.backupOnWrite) { await this.backupFile(this.relationshipFilePath, "relationships"); } // Write to temporary file first (atomic write) const tempFile = `${this.relationshipFilePath}.tmp`; // Write marker await fs.writeFile(tempFile, RELATIONSHIP_FILE_MARKER + "\n", "utf-8"); // Append relationships as JSONL for (const relationship of relationships) { const line = JSON.stringify(relationship) + "\n"; await fs.appendFile(tempFile, line, "utf-8"); } // Atomic rename await fs.rename(tempFile, this.relationshipFilePath); } catch (error) { throw new Error( `Failed to save relationships: ${ error instanceof Error ? error.message : String(error) }`, ); } } /** * Load relationships from storage */ async loadRelationships(): Promise<GraphEdge[]> { try { // Check if file exists await fs.access(this.relationshipFilePath); const content = await fs.readFile(this.relationshipFilePath, "utf-8"); const lines = content.split("\n").filter((line) => line.trim()); // Skip marker line const dataLines = lines.slice(1); const relationships: GraphEdge[] = []; for (const line of dataLines) { try { const relationship = JSON.parse(line) as GraphEdge; // Validate if enabled if (this.config.validateOnRead) { this.validateRelationship(relationship); } relationships.push(relationship); } catch (error) { console.error(`Failed to parse relationship line: ${line}`, error); } } return relationships; } catch (error: any) { if (error.code === "ENOENT") { return []; // File doesn't exist yet } throw new Error( `Failed to load relationships: ${ error instanceof Error ? error.message : String(error) }`, ); } } /** * Save complete graph (entities + relationships) */ async saveGraph( entities: GraphNode[], relationships: GraphEdge[], ): Promise<void> { await Promise.all([ this.saveEntities(entities), this.saveRelationships(relationships), ]); } /** * Load complete graph (entities + relationships) */ async loadGraph(): Promise<{ entities: GraphNode[]; relationships: GraphEdge[]; }> { const [entities, relationships] = await Promise.all([ this.loadEntities(), this.loadRelationships(), ]); return { entities, relationships }; } /** * Create a backup of a file */ private async backupFile( filePath: string, type: "entities" | "relationships", ): Promise<void> { try { // Check if file exists await fs.access(filePath); // Ensure backup directory exists await fs.mkdir(this.backupDir, { recursive: true }); // Create backup filename with timestamp const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); const backupFilename = `${type}-${timestamp}.jsonl`; const backupPath = join(this.backupDir, backupFilename); // Copy file await fs.copyFile(filePath, backupPath); // Clean up old backups (keep last 10) await this.cleanupOldBackups(type); } catch (error: any) { if (error.code !== "ENOENT") { // Only warn if it's not a "file not found" error console.warn(`Failed to backup file ${filePath}:`, error); } } } /** * Clean up old backup files (keep last N) */ private async cleanupOldBackups( type: "entities" | "relationships", keepCount: number = 10, ): Promise<void> { try { // Ensure backup directory exists before reading await fs.mkdir(this.backupDir, { recursive: true }); const files = await fs.readdir(this.backupDir); // Filter files by type const typeFiles = files .filter((file) => file.startsWith(type)) .map((file) => join(this.backupDir, file)); // Sort by modification time (newest first) const filesWithStats = await Promise.all( typeFiles.map(async (file) => { try { const stats = await fs.stat(file); return { file, mtime: stats.mtime.getTime() }; } catch (error) { // File might have been deleted, return null return null; } }), ); // Filter out null values and sort const validFiles = filesWithStats.filter((f) => f !== null) as Array<{ file: string; mtime: number; }>; validFiles.sort((a, b) => b.mtime - a.mtime); // Delete old files const filesToDelete = validFiles.slice(keepCount); await Promise.all( filesToDelete.map(({ file }) => fs.unlink(file).catch(() => {})), ); } catch (error) { // Only log if it's not a missing directory error if ((error as any).code !== "ENOENT") { console.warn(`Failed to cleanup old backups:`, error); } } } /** * Get storage statistics */ async getStatistics(): Promise<KGStorageStats> { const [entities, relationships] = await Promise.all([ this.loadEntities(), this.loadRelationships(), ]); const [entitiesStats, relationshipsStats] = await Promise.all([ fs .stat(this.entityFilePath) .catch(() => ({ size: 0, mtime: new Date() })), fs .stat(this.relationshipFilePath) .catch(() => ({ size: 0, mtime: new Date() })), ]); const lastModified = new Date( Math.max( entitiesStats.mtime.getTime(), relationshipsStats.mtime.getTime(), ), ).toISOString(); return { entityCount: entities.length, relationshipCount: relationships.length, lastModified, schemaVersion: SCHEMA_METADATA.version, fileSize: { entities: entitiesStats.size, relationships: relationshipsStats.size, }, }; } /** * Restore from backup */ async restoreFromBackup( type: "entities" | "relationships", timestamp?: string, ): Promise<void> { try { const files = await fs.readdir(this.backupDir); // Filter backup files by type const backupFiles = files.filter((file) => file.startsWith(type)); if (backupFiles.length === 0) { throw new Error(`No backups found for ${type}`); } let backupFile: string; if (timestamp) { // Find backup with specific timestamp backupFile = backupFiles.find((file) => file.includes(timestamp)) || ""; if (!backupFile) { throw new Error(`Backup with timestamp ${timestamp} not found`); } } else { // Use most recent backup const filesWithStats = await Promise.all( backupFiles.map(async (file) => { const stats = await fs.stat(join(this.backupDir, file)); return { file, mtime: stats.mtime.getTime() }; }), ); filesWithStats.sort((a, b) => b.mtime - a.mtime); backupFile = filesWithStats[0].file; } const backupPath = join(this.backupDir, backupFile); const targetPath = type === "entities" ? this.entityFilePath : this.relationshipFilePath; // Restore backup await fs.copyFile(backupPath, targetPath); // Log restoration success (can be monitored) if (process.env.DEBUG) { // eslint-disable-next-line no-console console.log(`Restored ${type} from backup: ${backupFile}`); } } catch (error) { throw new Error( `Failed to restore from backup: ${ error instanceof Error ? error.message : String(error) }`, ); } } /** * Validate entity structure */ private validateEntity(entity: GraphNode): void { if (!entity.id || !entity.type || !entity.label) { throw new Error(`Invalid entity structure: missing required fields`); } } /** * Validate relationship structure */ private validateRelationship(relationship: GraphEdge): void { if ( !relationship.id || !relationship.source || !relationship.target || !relationship.type ) { throw new Error( `Invalid relationship structure: missing required fields`, ); } } /** * Verify integrity of stored data */ async verifyIntegrity(): Promise<{ valid: boolean; errors: string[]; warnings: string[]; }> { const errors: string[] = []; const warnings: string[] = []; try { // Load all data const { entities, relationships } = await this.loadGraph(); // Check for orphaned relationships const entityIds = new Set(entities.map((e) => e.id)); for (const relationship of relationships) { if (!entityIds.has(relationship.source)) { warnings.push( `Relationship ${relationship.id} references missing source entity: ${relationship.source}`, ); } if (!entityIds.has(relationship.target)) { warnings.push( `Relationship ${relationship.id} references missing target entity: ${relationship.target}`, ); } } // Check for duplicate entities const idCounts = new Map<string, number>(); for (const entity of entities) { idCounts.set(entity.id, (idCounts.get(entity.id) || 0) + 1); } for (const [id, count] of idCounts) { if (count > 1) { errors.push(`Duplicate entity ID found: ${id} (${count} instances)`); } } return { valid: errors.length === 0, errors, warnings, }; } catch (error) { errors.push( `Integrity check failed: ${ error instanceof Error ? error.message : String(error) }`, ); return { valid: false, errors, warnings }; } } /** * Export graph as JSON (for inspection/debugging) */ async exportAsJSON(): Promise<string> { const { entities, relationships } = await this.loadGraph(); return JSON.stringify( { metadata: { version: SCHEMA_METADATA.version, exportDate: new Date().toISOString(), entityCount: entities.length, relationshipCount: relationships.length, }, entities, relationships, }, null, 2, ); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tosin2013/documcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server