Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
GraphManager.ts58.3 kB
/** * @module managers/GraphManager * @description Core graph database manager for Neo4j operations */ import neo4j, { Driver } from 'neo4j-driver'; import type { IGraphManager, Node, Edge, NodeType, EdgeType, SearchOptions, BatchDeleteResult, GraphStats, Subgraph, ClearType } from '../types/index.js'; import { EmbeddingsService } from '../indexing/EmbeddingsService.js'; import { UnifiedSearchService } from './UnifiedSearchService.js'; import { flattenForMCP } from '../tools/mcp/flattenForMCP.js'; import { LLMConfigLoader } from '../config/LLMConfigLoader.js'; /** * GraphManager - Core interface to Neo4j graph database * * @description Provides high-level interface for all graph database operations * including CRUD for nodes and edges, search, transactions, and schema management. * Automatically handles vector embeddings for semantic search and supports * multi-agent coordination with optimistic locking. * * Features: * - Unified node model (all nodes use Node label) * - Automatic embedding generation for semantic search * - Hybrid search (vector + BM25 full-text) * - Optimistic locking for concurrent access * - Batch operations for performance * - Graph traversal and subgraph extraction * * @example * ```typescript * // Create and initialize * const manager = new GraphManager( * 'bolt://localhost:7687', * 'neo4j', * 'password' * ); * await manager.initialize(); * ``` * * @example * ```typescript * // Add a node with automatic embeddings * const node = await manager.addNode('memory', { * title: 'Important Decision', * content: 'We decided to use PostgreSQL for better ACID compliance' * }); * ``` * * @example * ```typescript * // Search by meaning (semantic search) * const results = await manager.searchNodes('database decisions', { * limit: 10, * types: ['memory'] * }); * ``` */ export class GraphManager implements IGraphManager { private driver: Driver; private nodeCounter = 0; private edgeCounter = 0; private embeddingsService: EmbeddingsService | null = null; private unifiedSearchService: UnifiedSearchService; private isNornicDB: boolean = false; private providerDetected: boolean = false; constructor(uri: string, user: string, password: string) { this.driver = neo4j.driver( uri, neo4j.auth.basic(user, password), { maxConnectionPoolSize: 50, connectionAcquisitionTimeout: 60000, connectionTimeout: 30000 } ); // Note: Embeddings service initialization deferred until after provider detection // This happens in initialize() to avoid wasting resources on NornicDB connections // Initialize unified search service this.unifiedSearchService = new UnifiedSearchService(this.driver); this.unifiedSearchService.initialize().catch(err => { console.warn('⚠️ Failed to initialize unified search service:', err.message); }); } /** * Get the Neo4j driver instance for direct database access * * Use this when you need to execute custom Cypher queries or manage * transactions that aren't covered by the GraphManager API. * * @returns Neo4j driver instance * * @example * // Execute custom Cypher query * const driver = graphManager.getDriver(); * const session = driver.session(); * try { * const result = await session.run( * 'MATCH (n:Node) WHERE n.created > $date RETURN count(n)', * { date: '2024-01-01' } * ); * console.log('Nodes created:', result.records[0].get(0)); * } finally { * await session.close(); * } * * @example * // Create custom transaction * const driver = graphManager.getDriver(); * const session = driver.session(); * const tx = session.beginTransaction(); * try { * await tx.run('CREATE (n:CustomNode {id: $id})', { id: 'custom-1' }); * await tx.run('CREATE (n:CustomNode {id: $id})', { id: 'custom-2' }); * await tx.commit(); * } catch (error) { * await tx.rollback(); * throw error; * } finally { * await session.close(); * } */ getDriver(): Driver { return this.driver; } /** * Check if connected to NornicDB (vs Neo4j) * * Both NornicDB and Neo4j return cosine similarity (0-1 range) from * db.index.vector.queryNodes. NornicDB additionally supports server-side * embedding generation, allowing string queries without client-side embeddings. * * @returns true if connected to NornicDB, false for Neo4j */ getIsNornicDB(): boolean { return this.isNornicDB; } /** * Detect whether we're connected to NornicDB or Neo4j * * Uses multiple detection methods in priority order: * 1. Manual override via MIMIR_DATABASE_PROVIDER env var * 2. Server metadata/version string detection * 3. Presence of NornicDB-specific procedures * * @private * @returns Promise that resolves when detection is complete */ private async detectDatabaseProvider(): Promise<void> { // Method 1: Check for manual override const manualProvider = process.env.MIMIR_DATABASE_PROVIDER?.toLowerCase(); if (manualProvider === 'nornicdb') { console.log('🔧 Database provider manually set to NornicDB via MIMIR_DATABASE_PROVIDER'); this.isNornicDB = true; return; } else if (manualProvider === 'neo4j') { console.log('🔧 Database provider manually set to Neo4j via MIMIR_DATABASE_PROVIDER'); this.isNornicDB = false; return; } // Method 2: Auto-detect via server metadata const session = this.driver.session(); try { // Execute a simple query and check server metadata const result = await session.run('RETURN 1 as test'); const summary = result.summary; // Check server agent string const serverInfo = summary.server; const serverAgent = serverInfo?.agent || ''; const serverVersion = serverInfo?.protocolVersion?.toString() || ''; // NornicDB identifies itself in the server agent if (serverAgent.toLowerCase().includes('nornicdb')) { console.log(`🗄️ Detected NornicDB (${serverAgent})`); this.isNornicDB = true; return; } // Neo4j standard identification if (serverAgent.toLowerCase().includes('neo4j')) { console.log(`🗄️ Detected Neo4j (${serverAgent})`); this.isNornicDB = false; return; } // Fallback: assume Neo4j if we can't detect NornicDB console.log(`🗄️ Unable to detect database provider, defaulting to Neo4j (agent: ${serverAgent})`); this.isNornicDB = false; } catch (error: any) { console.warn(`⚠️ Database provider detection failed: ${error.message}`); console.log('🗄️ Defaulting to Neo4j'); this.isNornicDB = false; } finally { await session.close(); } } /** * Initialize database schema: create indexes, constraints, and vector indexes * * This method sets up the Neo4j database with all necessary indexes and constraints * for optimal performance. It's idempotent and safe to call multiple times. * * Creates: * - Unique constraint on node IDs * - Full-text search indexes * - Vector indexes for semantic search * - File indexing schema * - Type indexes for fast filtering * * @returns Promise that resolves when initialization is complete * @throws {Error} If database connection fails or schema creation fails * * @example * // Initialize on server startup * const graphManager = new GraphManager( * 'bolt://localhost:7687', * 'neo4j', * 'password' * ); * await graphManager.initialize(); * console.log('Database schema initialized'); * * @example * // Initialize with error handling * try { * await graphManager.initialize(); * console.log('✅ Database ready'); * } catch (error) { * console.error('Failed to initialize database:', error); * process.exit(1); * } * * @example * // Safe to call multiple times (idempotent) * await graphManager.initialize(); // First call creates schema * await graphManager.initialize(); // Second call is no-op * await graphManager.initialize(); // Still safe */ async initialize(): Promise<void> { const session = this.driver.session(); try { // Detect database provider (NornicDB vs Neo4j) on first initialization if (!this.providerDetected) { await this.detectDatabaseProvider(); this.providerDetected = true; // Initialize embeddings service only if NOT using NornicDB if (!this.isNornicDB) { this.embeddingsService = new EmbeddingsService(); await this.embeddingsService.initialize().catch(err => { console.warn('⚠️ Failed to initialize embeddings service:', err.message); this.embeddingsService = null; }); } else { console.log('🗄️ NornicDB detected - embeddings will be handled by database'); } } // Unique constraint on node IDs await session.run(` CREATE CONSTRAINT node_id_unique IF NOT EXISTS FOR (n:Node) REQUIRE n.id IS UNIQUE `); // Full-text search index await session.run(` CREATE FULLTEXT INDEX node_search IF NOT EXISTS FOR (n:Node) ON EACH [n.properties] `); // Type index for fast filtering await session.run(` CREATE INDEX node_type IF NOT EXISTS FOR (n:Node) ON (n.type) `); // ───────────────────────────────────────────────────────────── // File Indexing Schema (Phase 1) // ───────────────────────────────────────────────────────────── // WatchConfig unique ID constraint await session.run(` CREATE CONSTRAINT watch_config_id_unique IF NOT EXISTS FOR (w:WatchConfig) REQUIRE w.id IS UNIQUE `); // WatchConfig path index await session.run(` CREATE INDEX watch_config_path IF NOT EXISTS FOR (w:WatchConfig) ON (w.path) `); // File path index (for fast lookups and updates) await session.run(` CREATE INDEX file_path IF NOT EXISTS FOR (f:File) ON (f.path) `); // Full-text search on file metadata and chunks // Drop old index if it exists (migration from v1 architecture) try { await session.run(`DROP INDEX file_content_search IF EXISTS`); } catch (error) { // Ignore if index doesn't exist } await session.run(` CREATE FULLTEXT INDEX file_metadata_search IF NOT EXISTS FOR (f:File) ON EACH [f.path, f.name, f.language] `); // Full-text search on file chunk content await session.run(` CREATE FULLTEXT INDEX file_chunk_content_search IF NOT EXISTS FOR (c:FileChunk) ON EACH [c.text] `); // Vector index for semantic search - dimensions from config // Get dimensions from embeddings config (default: 768 for nomic-embed-text) const configLoader = LLMConfigLoader.getInstance(); const embeddingsConfig = await configLoader.getEmbeddingsConfig(); const dimensions = embeddingsConfig?.dimensions || 768; console.log(`🔧 Creating vector index with ${dimensions} dimensions`); await session.run(` CREATE VECTOR INDEX node_embedding_index IF NOT EXISTS FOR (n:Node) ON (n.embedding) OPTIONS {indexConfig: { \`vector.dimensions\`: ${dimensions}, \`vector.similarity_function\`: 'cosine' }} `); // Migration: Add Node label to existing File nodes and set type property await session.run(` MATCH (f:File) WHERE NOT f:Node SET f:Node, f.type = 'file' `); console.log('✅ Neo4j schema initialized (with file indexing support)'); } catch (error: any) { console.error('❌ Schema initialization failed:', error.message); throw error; } finally { await session.close(); } } /** * Test connection */ async testConnection(): Promise<boolean> { const session = this.driver.session(); try { await session.run('RETURN 1'); return true; } catch (error) { return false; } finally { await session.close(); } } // ============================================================================ // SINGLE OPERATIONS // ============================================================================ /** * Extract text content from node properties for embedding generation * Prioritizes: content, description, title, then concatenates all string values */ private extractTextContent(properties: Record<string, any>): string { const parts: string[] = []; // Priority fields first if (properties.title && typeof properties.title === 'string') { parts.push(`Title: ${properties.title}`); } if (properties.name && typeof properties.name === 'string') { parts.push(`Name: ${properties.name}`); } if (properties.description && typeof properties.description === 'string') { parts.push(`Description: ${properties.description}`); } if (properties.content && typeof properties.content === 'string') { parts.push(`Content: ${properties.content}`); } // Now include ALL other properties (stringified) const systemFields = new Set(['id', 'type', 'created', 'updated', 'title', 'name', 'description', 'content', 'embedding', 'embedding_dimensions', 'embedding_model', 'has_embedding']); for (const [key, value] of Object.entries(properties)) { // Skip system fields and already-included fields if (systemFields.has(key)) { continue; } // Skip null/undefined if (value === null || value === undefined) { continue; } // Stringify the value let stringValue: string; if (typeof value === 'string') { stringValue = value; } else if (typeof value === 'number' || typeof value === 'boolean') { stringValue = String(value); } else if (Array.isArray(value)) { stringValue = value.join(', '); } else if (typeof value === 'object') { try { stringValue = JSON.stringify(value); } catch { stringValue = String(value); } } else { stringValue = String(value); } if (stringValue.trim().length > 0) { parts.push(`${key}: ${stringValue}`); } } return parts.join('\n'); } /** * Creates chunks for a node if content is large enough to warrant chunking * Returns chunk data or null if content is small enough for a single embedding */ private async createNodeChunks( nodeId: string, textContent: string, session: any ): Promise<{ chunkCount: number; totalChars: number } | null> { const chunkSize = parseInt(process.env.MIMIR_EMBEDDINGS_CHUNK_SIZE || '768', 10); // If content is small, don't chunk - return null to signal single embedding if (textContent.length <= chunkSize) { return null; } console.log(`📦 Creating chunks for node ${nodeId} (${textContent.length} chars)...`); try { // Generate chunk embeddings const chunks = await this.embeddingsService!.generateChunkEmbeddings(textContent); // Create chunk nodes and relationships for (const chunk of chunks) { const chunkId = `chunk-${nodeId}-${chunk.chunkIndex}`; await session.run( ` MATCH (n:Node {id: $nodeId}) MERGE (c:NodeChunk:Node {id: $chunkId}) ON CREATE SET c.chunk_index = $chunkIndex, c.text = $text, c.start_offset = $startOffset, c.end_offset = $endOffset, c.embedding = $embedding, c.embedding_dimensions = $dimensions, c.embedding_model = $model, c.type = 'node_chunk', c.indexed_date = datetime(), c.parentNodeId = $nodeId, c.has_embedding = true ON MATCH SET c.chunk_index = $chunkIndex, c.text = $text, c.start_offset = $startOffset, c.end_offset = $endOffset, c.embedding = $embedding, c.embedding_dimensions = $dimensions, c.embedding_model = $model, c.indexed_date = datetime() MERGE (n)-[:HAS_CHUNK {index: $chunkIndex}]->(c) RETURN c.id AS chunk_id `, { nodeId, chunkId, chunkIndex: chunk.chunkIndex, text: chunk.text, startOffset: chunk.startOffset, endOffset: chunk.endOffset, embedding: chunk.embedding, dimensions: chunk.dimensions, model: chunk.model } ); } console.log(`✅ Created ${chunks.length} chunks for node ${nodeId}`); return { chunkCount: chunks.length, totalChars: textContent.length }; } catch (error: any) { console.error(`⚠️ Failed to create chunks for node ${nodeId}: ${error.message}`); throw error; } } /** * Add a new node to the knowledge graph with automatic embedding generation * * Creates a node with the specified type and properties. Automatically generates * vector embeddings from text content (title, description, content fields) for * semantic search. Supports chunking for large content (>768 chars by default). * * @param type - Node type (todo, file, concept, memory, etc.) or properties object * @param properties - Node properties (title, description, content, status, etc.) * @returns Created node with generated ID and embeddings * @throws {Error} If node creation fails * * @example * // Create a TODO task * const todo = await graphManager.addNode('todo', { * title: 'Implement user authentication', * description: 'Add JWT-based auth with refresh tokens and role-based access', * status: 'pending', * priority: 'high', * assignee: 'worker-agent-1' * }); * console.log('Created:', todo.id); // 'todo-1-1732456789' * console.log('Has embedding:', todo.properties.has_embedding); // true * * @example * // Create a memory node with automatic embedding * const memory = await graphManager.addNode('memory', { * title: 'API Design Pattern', * content: 'Use RESTful conventions with versioned endpoints (/v1/users). ' + * 'Always return consistent error formats with status codes.', * tags: ['api', 'architecture', 'best-practices'], * source: 'team-discussion', * confidence: 0.95 * }); * // Embedding generated from title + content for semantic search * * @example * // Create a file node during indexing * const file = await graphManager.addNode('file', { * path: '/src/auth/login.ts', * name: 'login.ts', * language: 'typescript', * size: 2048, * lines: 87, * lastModified: new Date().toISOString(), * content: '// File content here...' * }); * // Large files automatically chunked for embeddings * * @example * // Create concept node for knowledge graph * const concept = await graphManager.addNode('concept', { * title: 'Microservices Architecture', * description: 'Architectural pattern where application is composed of small, ' + * 'independent services that communicate via APIs', * category: 'architecture', * related_concepts: ['API Gateway', 'Service Discovery', 'Event-Driven'] * }); * * @example * // Flexible API: pass properties as first argument * const node = await graphManager.addNode({ * type: 'memory', * title: 'Quick note', * content: 'Remember to update docs' * }); */ async addNode(type?: NodeType | Record<string, any>, properties?: Record<string, any>): Promise<Node> { const session = this.driver.session(); try { // Handle flexible arguments: addNode(properties) or addNode(type, properties) let actualType: NodeType; let actualProperties: Record<string, any>; if (typeof type === 'object' && type !== null) { // First arg is properties, use default type actualType = 'memory'; actualProperties = type as Record<string, any>; } else { // Standard: type specified actualType = type || 'memory'; actualProperties = properties || {}; } // Use provided ID if present, otherwise generate one const id = actualProperties.id || `${actualType}-${++this.nodeCounter}-${Date.now()}`; const now = new Date().toISOString(); // Flatten properties into the node (Neo4j doesn't support nested objects) const flattenedProps = flattenForMCP(actualProperties || {}); const nodeProps = { id, type: actualType, created: now, updated: now, ...flattenedProps, has_embedding: false // Will be updated after embedding generation }; // Create the node first // Note: Using simple RETURN n instead of map projection for NornicDB compatibility const query = 'CREATE (n:Node $props) RETURN n'; const createResult = await session.run(query, { props: nodeProps }); // Now generate embeddings if enabled and not already provided // Skip embedding generation for NornicDB (database handles it natively) const hasExistingEmbedding = actualProperties.embedding || actualProperties.has_embedding === true; if (!this.isNornicDB && this.embeddingsService && !hasExistingEmbedding) { // Ensure embeddings service is initialized if (!this.embeddingsService.isEnabled()) { await this.embeddingsService.initialize(); } if (this.embeddingsService.isEnabled()) { // Extract text content for embedding generation const textContent = this.extractTextContent(actualProperties); if (textContent && textContent.trim().length > 0) { const chunkSize = parseInt(process.env.MIMIR_EMBEDDINGS_CHUNK_SIZE || '768', 10); try { // Check if content needs chunking if (textContent.length > chunkSize) { // Large content - use chunking console.log(`📦 Node ${id} has large content (${textContent.length} chars), creating chunks...`); await this.createNodeChunks(id, textContent, session); // Update node to mark it has chunks (no single embedding on parent node) await session.run( `MATCH (n:Node {id: $id}) SET n.has_embedding = true, n.has_chunks = true`, { id } ); } else { // Small content - single embedding const result = await this.embeddingsService.generateEmbedding(textContent); await session.run( `MATCH (n:Node {id: $id}) SET n.embedding = $embedding, n.embedding_dimensions = $dimensions, n.embedding_model = $model, n.has_embedding = true`, { id, embedding: result.embedding, dimensions: result.dimensions, model: result.model } ); console.log(`✅ Generated single embedding for ${actualType} node: ${id} (${result.dimensions} dimensions)`); } } catch (error: any) { console.error(`⚠️ Failed to generate embedding for ${actualType} node: ${error.message}`); } } } } // Return the created node return this.nodeFromRecord(createResult.records[0].get('n'), undefined, false); } finally { await session.close(); } } /** * Retrieve a node by its ID with full properties * * Fetches a single node from the graph database. Returns null if not found. * Includes all properties except embedding vectors (for performance). * * @param id - Unique node identifier * @returns Node object with all properties, or null if not found * * @example * // Get a TODO by ID * const todo = await graphManager.getNode('todo-1-1732456789'); * if (todo) { * console.log('Title:', todo.properties.title); * console.log('Status:', todo.properties.status); * console.log('Created:', todo.properties.created); * } else { * console.log('TODO not found'); * } * * @example * // Check if node exists before updating * const existing = await graphManager.getNode('memory-123'); * if (!existing) { * throw new Error('Memory node not found'); * } * await graphManager.updateNode('memory-123', { * content: 'Updated content' * }); * * @example * // Get file node and check metadata * const file = await graphManager.getNode('file-src-auth-login-ts'); * if (file && file.properties.lastModified) { * const lastMod = new Date(file.properties.lastModified); * const hoursSinceUpdate = (Date.now() - lastMod.getTime()) / (1000 * 60 * 60); * console.log(`File last modified ${hoursSinceUpdate.toFixed(1)} hours ago`); * } */ async getNode(id: string): Promise<Node | null> { const session = this.driver.session(); try { const query = 'MATCH (n:Node {id: $id}) RETURN n'; const result = await session.run(query, { id }); if (result.records.length === 0) { return null; } // Single node query - return full content (don't strip) return this.nodeFromRecord(result.records[0].get('n'), undefined, false); } finally { await session.close(); } } /** * Update an existing node's properties with automatic embedding regeneration * * Merges new properties into existing node. Automatically regenerates embeddings * if content-related fields (content, title, description) are modified. * Updates the 'updated' timestamp automatically. * * @param id - Node ID to update * @param properties - Properties to update (partial update, merges with existing) * @returns Updated node with new properties * @throws {Error} If node not found or update fails * * @example * // Update TODO status * const updated = await graphManager.updateNode('todo-1-1732456789', { * status: 'in_progress', * assignee: 'worker-agent-2', * started_at: new Date().toISOString() * }); * console.log('Status changed to:', updated.properties.status); * * @example * // Update memory content (triggers embedding regeneration) * const memory = await graphManager.updateNode('memory-123', { * content: 'Updated API design: Use GraphQL instead of REST for complex queries', * confidence: 0.98, * last_verified: new Date().toISOString() * }); * // Embedding automatically regenerated from new content * * @example * // Add metadata without changing content * await graphManager.updateNode('file-src-utils-ts', { * lastAccessed: new Date().toISOString(), * accessCount: 42, * tags: ['utility', 'helper', 'core'] * }); * // No embedding regeneration (content unchanged) * * @example * // Partial update - only specified fields change * const todo = await graphManager.getNode('todo-1'); * console.log('Before:', todo.properties); // { title: 'Task', status: 'pending', priority: 'high' } * * await graphManager.updateNode('todo-1', { status: 'completed' }); * * const updated = await graphManager.getNode('todo-1'); * console.log('After:', updated.properties); // { title: 'Task', status: 'completed', priority: 'high' } * // Only status changed, other fields preserved * * @example * // Error handling * try { * await graphManager.updateNode('nonexistent-id', { status: 'done' }); * } catch (error) { * console.error('Update failed:', error.message); // 'Node not found: nonexistent-id' * } */ async updateNode(id: string, properties: Partial<Record<string, any>>): Promise<Node> { const session = this.driver.session(); try { const now = new Date().toISOString(); // Build SET clauses for each property (flatten nested structures) const setProperties = { ...flattenForMCP(properties as Record<string, any>), updated: now }; const query = `MATCH (n:Node {id: $id}) SET n += $properties RETURN n`; const result = await session.run(query, { id, properties: setProperties }); if (result.records.length === 0) { throw new Error(`Node not found: ${id}`); } const updatedNode = result.records[0].get('n'); // Regenerate embeddings if content changed and embeddings service is enabled // Skip embedding regeneration for NornicDB (database handles it natively) const contentChanged = properties.content !== undefined || properties.text !== undefined || properties.title !== undefined || properties.description !== undefined; if (!this.isNornicDB && contentChanged && this.embeddingsService) { // Ensure embeddings service is initialized if (!this.embeddingsService.isEnabled()) { await this.embeddingsService.initialize(); } if (this.embeddingsService.isEnabled()) { // Extract text content for embedding generation const textContent = this.extractTextContent(updatedNode); if (textContent && textContent.trim().length > 0) { const chunkSize = parseInt(process.env.MIMIR_EMBEDDINGS_CHUNK_SIZE || '768', 10); try { // Check if content needs chunking if (textContent.length > chunkSize) { // Large content - use chunking console.log(`📦 Node ${id} has large content (${textContent.length} chars), regenerating chunks...`); // Delete existing chunks first await session.run( `MATCH (n:Node {id: $id}) OPTIONAL MATCH (n)-[r:HAS_CHUNK]->(chunk:NodeChunk) DELETE r, chunk`, { id } ); await this.createNodeChunks(id, textContent, session); // Update node to mark it has chunks (no single embedding on parent node) await session.run( `MATCH (n:Node {id: $id}) SET n.has_embedding = true, n.has_chunks = true REMOVE n.embedding`, { id } ); } else { // Small content - single embedding const embeddingResult = await this.embeddingsService.generateEmbedding(textContent); // Delete existing chunks if any await session.run( `MATCH (n:Node {id: $id}) SET n.embedding = $embedding, n.embedding_dimensions = $dimensions, n.embedding_model = $model, n.has_embedding = true, n.has_chunks = false WITH n OPTIONAL MATCH (n)-[r:HAS_CHUNK]->(chunk:NodeChunk) DELETE r, chunk`, { id, embedding: embeddingResult.embedding, dimensions: embeddingResult.dimensions, model: embeddingResult.model } ); console.log(`✅ Regenerated embedding for node ${id} (${embeddingResult.dimensions} dimensions)`); } } catch (error: any) { console.error(`⚠️ Failed to regenerate embedding for node ${id}: ${error.message}`); } } } } // Single node operation - return full content (don't strip) return this.nodeFromRecord(result.records[0].get('n'), undefined, false); } finally { await session.close(); } } async deleteNode(id: string): Promise<boolean> { const session = this.driver.session(); try { const result = await session.run( ` MATCH (n:Node {id: $id}) DETACH DELETE n RETURN count(n) as deleted `, { id } ); const deleted = result.records[0]?.get('deleted').toNumber() || 0; return deleted > 0; } finally { await session.close(); } } async addEdge( source: string, target: string, type: EdgeType, properties: Record<string, any> = {} ): Promise<Edge> { const session = this.driver.session(); try { // Validate todoList relationships - can only connect to todo nodes const validationResult = await session.run( ` MATCH (s:Node {id: $source}) MATCH (t:Node {id: $target}) RETURN s.type AS sourceType, t.type AS targetType `, { source, target } ); if (validationResult.records.length === 0) { throw new Error(`Failed to create edge: source or target not found`); } const sourceType = validationResult.records[0].get('sourceType'); const targetType = validationResult.records[0].get('targetType'); // Enforce todoList constraint: can only contain relationships to todo nodes if (sourceType === 'todoList' && targetType !== 'todo') { throw new Error(`todoList nodes can only have relationships to todo nodes. Target type: ${targetType}`); } const id = `edge-${++this.edgeCounter}-${Date.now()}`; const now = new Date().toISOString(); // Flatten properties directly onto the edge (Neo4j doesn't support nested objects) const flattenedEdgeProps = flattenForMCP(properties || {}); const edgeProps = { id, type, created: now, ...flattenedEdgeProps // User properties flattened at the same level }; const result = await session.run( ` MATCH (s:Node {id: $source}) MATCH (t:Node {id: $target}) CREATE (s)-[e:EDGE $edgeProps]->(t) RETURN e `, { source, target, edgeProps } ); return this.edgeFromRecord(result.records[0].get('e'), source, target); } finally { await session.close(); } } async deleteEdge(edgeId: string): Promise<boolean> { const session = this.driver.session(); try { const result = await session.run( ` MATCH ()-[e:EDGE {id: $edgeId}]->() DELETE e RETURN count(e) as deleted `, { edgeId } ); const deleted = result.records[0]?.get('deleted').toNumber() || 0; return deleted > 0; } finally { await session.close(); } } // ============================================================================ // BATCH OPERATIONS // ============================================================================ async addNodes(nodes: Array<{ type: NodeType; properties: Record<string, any> }>): Promise<Node[]> { const session = this.driver.session(); try { const now = new Date().toISOString(); // First, prepare all nodes with IDs and base properties const preparedNodes = nodes.map((n) => { const flatProps = flattenForMCP(n.properties || {}); return { id: `${n.type}-${++this.nodeCounter}-${Date.now()}`, type: n.type, created: now, updated: now, ...flatProps, has_embedding: false }; }); // Create all nodes in bulk const createResult = await session.run( ` UNWIND $nodes as node CREATE (n:Node) SET n = node RETURN n `, { nodes: preparedNodes } ); // Now generate embeddings for each node if service is enabled if (this.embeddingsService) { if (!this.embeddingsService.isEnabled()) { await this.embeddingsService.initialize(); } if (this.embeddingsService.isEnabled()) { const chunkSize = parseInt(process.env.MIMIR_EMBEDDINGS_CHUNK_SIZE || '768', 10); // Process each node's embeddings for (let i = 0; i < nodes.length; i++) { const originalNode = nodes[i]; const nodeId = preparedNodes[i].id; const textContent = this.extractTextContent(originalNode.properties || {}); if (textContent && textContent.trim().length > 0) { try { // Check if content needs chunking if (textContent.length > chunkSize) { // Large content - use chunking console.log(`📦 Bulk node ${nodeId} has large content (${textContent.length} chars), creating chunks...`); await this.createNodeChunks(nodeId, textContent, session); // Update node to mark it has chunks await session.run( `MATCH (n:Node {id: $id}) SET n.has_embedding = true, n.has_chunks = true`, { id: nodeId } ); } else { // Small content - single embedding const result = await this.embeddingsService.generateEmbedding(textContent); await session.run( `MATCH (n:Node {id: $id}) SET n.embedding = $embedding, n.embedding_dimensions = $dimensions, n.embedding_model = $model, n.has_embedding = true`, { id: nodeId, embedding: result.embedding, dimensions: result.dimensions, model: result.model } ); console.log(`✅ Generated embedding for ${originalNode.type} node (${result.dimensions} dimensions)`); } } catch (error: any) { console.error(`⚠️ Failed to generate embedding for ${originalNode.type} node: ${error.message}`); } } } } } return createResult.records.map(r => this.nodeFromRecord(r.get('n'))); } finally { await session.close(); } } async updateNodes(updates: Array<{ id: string; properties: Partial<Record<string, any>> }>): Promise<Node[]> { const session = this.driver.session(); try { const now = new Date().toISOString(); // Add updated timestamp to each update const updatesWithTimestamp = updates.map(u => ({ id: u.id, properties: { ...flattenForMCP(u.properties || {}), updated: now } })); const result = await session.run( ` UNWIND $updates as update MATCH (n:Node {id: update.id}) SET n += update.properties RETURN n `, { updates: updatesWithTimestamp } ); return result.records.map(r => this.nodeFromRecord(r.get('n'))); } finally { await session.close(); } } async deleteNodes(ids: string[]): Promise<BatchDeleteResult> { const session = this.driver.session(); try { const result = await session.run( ` UNWIND $ids as id MATCH (n:Node {id: id}) DETACH DELETE n RETURN count(n) as deleted `, { ids } ); const deleted = result.records[0]?.get('deleted').toNumber() || 0; return { deleted, errors: [] // Neo4j handles missing nodes gracefully }; } finally { await session.close(); } } async addEdges(edges: Array<{ source: string; target: string; type: EdgeType; properties?: Record<string, any>; }>): Promise<Edge[]> { const session = this.driver.session(); try { const now = new Date().toISOString(); // Flatten properties at top level (Neo4j doesn't support nested objects in relationships) const edgesWithIds = edges.map(e => ({ id: `edge-${++this.edgeCounter}-${Date.now()}`, source: e.source, target: e.target, type: e.type, created: now, ...flattenForMCP(e.properties || {}) // Spread flattened user properties at top level })); const result = await session.run( ` UNWIND $edges as edge MATCH (s:Node {id: edge.source}) MATCH (t:Node {id: edge.target}) CREATE (s)-[e:EDGE]->(t) SET e = edge RETURN e, edge.source as source, edge.target as target `, { edges: edgesWithIds } ); return result.records.map(r => this.edgeFromRecord(r.get('e'), r.get('source'), r.get('target')) ); } finally { await session.close(); } } async deleteEdges(edgeIds: string[]): Promise<BatchDeleteResult> { const session = this.driver.session(); try { const result = await session.run( ` UNWIND $edgeIds as edgeId MATCH ()-[e:EDGE {id: edgeId}]->() DELETE e RETURN count(e) as deleted `, { edgeIds } ); const deleted = result.records[0]?.get('deleted').toNumber() || 0; return { deleted, errors: [] }; } finally { await session.close(); } } // ============================================================================ // SEARCH & QUERY // ============================================================================ async queryNodes(type?: NodeType, filters?: Record<string, any>): Promise<Node[]> { const session = this.driver.session(); try { let query = 'MATCH (n)'; const params: any = {}; if (type) { // Match both the label (case-insensitive) and the type property // This handles both unified nodes (Node label + type property) and direct labeled nodes (File, WatchConfig, etc.) const capitalizedType = type.charAt(0).toUpperCase() + type.slice(1); query += ` WHERE (n.type = $type OR $capitalizedType IN labels(n))`; params.type = type; params.capitalizedType = capitalizedType; } if (filters && Object.keys(filters).length > 0) { const filterConditions = Object.entries(filters).map(([key, value], i) => { params[`filter${i}`] = value; return `(n.${key} = $filter${i} OR n.properties.${key} = $filter${i})`; }); query += type ? ' AND ' : ' WHERE '; query += filterConditions.join(' AND '); } // Use simple RETURN for NornicDB compatibility - content stripping handled in nodeFromRecord query += ` RETURN n`; const result = await session.run(query, params); // Pass stripContent=true to handle large content in application layer return result.records.map(r => this.nodeFromRecord(r.get('n'), undefined, true)); } finally { await session.close(); } } async searchNodes(query: string, options: SearchOptions = {}): Promise<Node[]> { // Use UnifiedSearchService for automatic semantic/fulltext fallback await this.unifiedSearchService.initialize(); const searchResult = await this.unifiedSearchService.search(query, { types: options.types, limit: options.limit || 100, minSimilarity: options.minSimilarity || 0.75, // Default threshold offset: options.offset || 0 }); // Convert SearchResult[] to Node[] format // Note: UnifiedSearchService returns formatted results, we need to fetch full nodes if (searchResult.results.length === 0) { return []; } const session = this.driver.session(); try { // Get full node data for the IDs returned by search const ids = searchResult.results.map(r => r.id); const result = await session.run( ` MATCH (n) WHERE (n.id IN $ids OR n.path IN $ids) RETURN n `, { ids, query } ); // Handle content stripping in application layer for NornicDB compatibility return result.records.map(r => this.nodeFromRecord(r.get('n'), undefined, true)); } finally { await session.close(); } } async getEdges(nodeId: string, direction: 'in' | 'out' | 'both' = 'both'): Promise<Edge[]> { const session = this.driver.session(); try { let query = ''; if (direction === 'out') { query = 'MATCH (n:Node {id: $nodeId})-[e:EDGE]->(t:Node) RETURN e, n.id as source, t.id as target'; } else if (direction === 'in') { query = 'MATCH (s:Node)-[e:EDGE]->(n:Node {id: $nodeId}) RETURN e, s.id as source, n.id as target'; } else { query = 'MATCH (n:Node {id: $nodeId})-[e:EDGE]-(o:Node) RETURN e, n.id as source, o.id as target'; } const result = await session.run(query, { nodeId }); return result.records.map(r => this.edgeFromRecord(r.get('e'), r.get('source'), r.get('target')) ); } finally { await session.close(); } } // ============================================================================ // GRAPH OPERATIONS // ============================================================================ async getNeighbors(nodeId: string, edgeType?: EdgeType, depth: number = 1): Promise<Node[]> { const session = this.driver.session(); try { const result = await session.run( ` MATCH (start:Node {id: $nodeId})-[e:EDGE*1..${depth}]-(neighbor:Node) ${edgeType ? 'WHERE ALL(rel IN e WHERE rel.type = $edgeType)' : ''} RETURN DISTINCT neighbor `, { nodeId, edgeType } ); return result.records.map(r => this.nodeFromRecord(r.get('neighbor'), undefined, true)); } finally { await session.close(); } } async getSubgraph(nodeId: string, depth: number = 2): Promise<Subgraph> { const session = this.driver.session(); try { const result = await session.run( ` MATCH path = (start:Node {id: $nodeId})-[e:EDGE*0..${depth}]-(connected:Node) WITH nodes(path) as pathNodes, relationships(path) as pathEdges UNWIND pathNodes as node WITH collect(DISTINCT node) as allNodes, pathEdges UNWIND pathEdges as edge WITH allNodes, collect(DISTINCT edge) as allEdges RETURN allNodes, allEdges `, { nodeId } ); if (result.records.length === 0) { return { nodes: [], edges: [] }; } const record = result.records[0]; // Handle content stripping in application layer for NornicDB compatibility const nodes = record.get('allNodes').map((n: any) => this.nodeFromRecord(n, undefined, true)); const edges = record.get('allEdges').map((e: any) => this.edgeFromRecord(e, e.start, e.end) ); return { nodes, edges }; } finally { await session.close(); } } // ============================================================================ // UTILITY // ============================================================================ async getStats(): Promise<GraphStats> { const session = this.driver.session(); try { const result = await session.run(` MATCH (n:Node) RETURN count(n) as nodeCount, collect(DISTINCT n.type) as types `); const edgeResult = await session.run(` MATCH ()-[e:EDGE]->() RETURN count(e) as edgeCount `); const nodeCount = result.records[0]?.get('nodeCount').toNumber() || 0; const edgeCount = edgeResult.records[0]?.get('edgeCount').toNumber() || 0; const types = result.records[0]?.get('types') || []; // Get count per type const typeCountResult = await session.run(` MATCH (n:Node) RETURN n.type as type, count(n) as count `); const typeCounts: Record<string, number> = {}; typeCountResult.records.forEach(r => { typeCounts[r.get('type')] = r.get('count').toNumber(); }); return { nodeCount, edgeCount, types: typeCounts }; } finally { await session.close(); } } async clear(type?: ClearType): Promise<{ deletedNodes: number; deletedEdges: number }> { const session = this.driver.session(); try { // Safety: prevent accidental full-database clears during automated tests unless explicitly allowed. const isTestEnv = process.env.NODE_ENV === 'test' || process.env.VITEST === 'true' || !!(globalThis as any).vitest; const allowClearInTest = process.env.ALLOW_CLEAR_ALL_IN_TEST === 'true'; if (isTestEnv && type === 'ALL' && !allowClearInTest) { console.warn('⚠️ Skipping clear(\'ALL\') in test environment to avoid wiping real database. Set ALLOW_CLEAR_ALL_IN_TEST=true to override.'); return { deletedNodes: 0, deletedEdges: 0 }; } let query: string; let params: any = {}; if (type === 'ALL') { // Clear ALL data - explicit "ALL" required for safety query = ` MATCH (n) OPTIONAL MATCH (n)-[r]-() WITH count(DISTINCT n) as nodeCount, count(DISTINCT r) as edgeCount MATCH (n) DETACH DELETE n RETURN nodeCount as deletedNodes, edgeCount as deletedEdges `; const result = await session.run(query, params); const deletedNodes = result.records[0]?.get('deletedNodes').toNumber() || 0; const deletedEdges = result.records[0]?.get('deletedEdges').toNumber() || 0; // Reset counters this.nodeCounter = 0; this.edgeCounter = 0; console.log(`✅ Cleared ALL data from graph: ${deletedNodes} nodes, ${deletedEdges} edges`); return { deletedNodes, deletedEdges }; } else if (type) { // Clear specific type (check both n.type property and Neo4j labels) const capitalizedType = type.charAt(0).toUpperCase() + type.slice(1); query = ` MATCH (n) WHERE (n.type = $type OR $capitalizedType IN labels(n)) WITH n, COUNT { (n)-[]-() } as edgeCount DETACH DELETE n RETURN count(n) as deletedNodes, sum(edgeCount) as deletedEdges `; params = { type, capitalizedType }; const result = await session.run(query, params); const deletedNodes = result.records[0]?.get('deletedNodes').toNumber() || 0; const deletedEdges = result.records[0]?.get('deletedEdges').toNumber() || 0; console.log(`✅ Cleared ${deletedNodes} nodes of type '${type}' and ${deletedEdges} edges`); return { deletedNodes, deletedEdges }; } else { // No type provided - return zero (safety default) console.log(`⚠️ No type provided to clear(). Use clear('ALL') to clear entire graph.`); return { deletedNodes: 0, deletedEdges: 0 }; } } finally { await session.close(); } } async close(): Promise<void> { await this.driver.close(); } // ============================================================================ // PRIVATE HELPERS // ============================================================================ /** * Convert Neo4j record to Node object * Content stripping is now handled at the Neo4j query level for efficiency * Handles both node objects (with .properties) and map projections (plain objects) */ private nodeFromRecord(record: any, searchQuery?: string, stripContent: boolean = true): Node { // Handle map projection (plain object) vs node object (with .properties) const props = record.properties || record; // Extract system properties const { id, type, created, updated, ...userProperties } = props; // Note: embedding is already stripped at the database level in all queries // Keep metadata about embeddings (has_embedding, embedding_dimensions, embedding_model) return { id, type, properties: userProperties, // All other properties (content already stripped at query level if needed) created, updated }; } private edgeFromRecord(record: any, source: string, target: string): Edge { const props = record.properties; // Extract system properties and treat the rest as user properties const { id, type, created, ...userProperties } = props; return { id, source, target, type, properties: userProperties, created }; } // ======================================================================== // MULTI-AGENT LOCKING // ======================================================================== /** * Acquire exclusive lock on a node (typically a TODO) for multi-agent coordination * Uses optimistic locking with automatic expiry * * @param nodeId - Node ID to lock * @param agentId - Agent claiming the lock * @param timeoutMs - Lock expiry in milliseconds (default 300000 = 5 min) * @returns true if lock acquired, false if already locked by another agent */ async lockNode(nodeId: string, agentId: string, timeoutMs: number = 300000): Promise<boolean> { const session = this.driver.session(); try { const now = new Date(); const lockExpiresAt = new Date(now.getTime() + timeoutMs); // Try to acquire lock using conditional update const result = await session.run( ` MATCH (n:Node {id: $nodeId}) WHERE n.lockedBy IS NULL OR n.lockedBy = $agentId OR (n.lockExpiresAt IS NOT NULL AND n.lockExpiresAt < $now) SET n.lockedBy = $agentId, n.lockedAt = $now, n.lockExpiresAt = $lockExpiresAt, n.version = COALESCE(n.version, 0) + 1 RETURN n `, { nodeId, agentId, now: now.toISOString(), lockExpiresAt: lockExpiresAt.toISOString() } ); // If no records returned, lock was held by another agent return result.records.length > 0; } finally { await session.close(); } } /** * Release lock on a node * * @param nodeId - Node ID to unlock * @param agentId - Agent releasing the lock (must match lock owner) * @returns true if lock released, false if not locked or locked by different agent */ async unlockNode(nodeId: string, agentId: string): Promise<boolean> { const session = this.driver.session(); try { const result = await session.run( ` MATCH (n:Node {id: $nodeId}) WHERE n.lockedBy = $agentId REMOVE n.lockedBy, n.lockedAt, n.lockExpiresAt RETURN n `, { nodeId, agentId } ); return result.records.length > 0; } finally { await session.close(); } } /** * Query nodes filtered by lock status * * @param type - Optional node type filter * @param filters - Additional property filters * @param includeAvailableOnly - If true, only return unlocked or expired-lock nodes * @returns Array of nodes */ async queryNodesWithLockStatus( type?: NodeType, filters?: Record<string, any>, includeAvailableOnly?: boolean ): Promise<Node[]> { const session = this.driver.session(); try { let cypher = 'MATCH (n:Node)'; const params: any = {}; // Type filter if (type) { cypher += ' WHERE n.type = $type'; params.type = type; } // Property filters if (filters && Object.keys(filters).length > 0) { const whereClause = type ? ' AND ' : ' WHERE '; const conditions = Object.entries(filters).map(([key, value], index) => { params[`filter_${index}`] = value; return `n.${key} = $filter_${index}`; }); cypher += whereClause + conditions.join(' AND '); } // Lock status filter if (includeAvailableOnly) { const lockClause = (type || filters) ? ' AND ' : ' WHERE '; cypher += lockClause + `(n.lockedBy IS NULL OR (n.lockExpiresAt IS NOT NULL AND n.lockExpiresAt < $now))`; params.now = new Date().toISOString(); } // Use simple RETURN for NornicDB compatibility - content stripping handled in nodeFromRecord cypher += ` RETURN n`; const result = await session.run(cypher, params); return result.records.map(record => this.nodeFromRecord(record.get('n'), undefined, true)); } finally { await session.close(); } } /** * Clean up expired locks across all nodes * Should be called periodically by the server * * @returns Number of locks cleaned up */ async cleanupExpiredLocks(): Promise<number> { const session = this.driver.session(); try { const result = await session.run( ` MATCH (n:Node) WHERE n.lockedBy IS NOT NULL AND n.lockExpiresAt IS NOT NULL AND n.lockExpiresAt < $now REMOVE n.lockedBy, n.lockExpiresAt RETURN count(n) as cleaned `, { now: new Date().toISOString() } ); return result.records[0]?.get('cleaned').toNumber() || 0; } finally { await session.close(); } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server