Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
graph_builder_test.go22.2 kB
package linkpredict import ( "context" "fmt" "os" "path/filepath" "runtime" "sync" "sync/atomic" "testing" "time" "github.com/orneryd/nornicdb/pkg/storage" ) // ============================================================================= // GRAPH BUILDER TESTS // ============================================================================= func TestGraphBuilder_Basic(t *testing.T) { engine := storage.NewMemoryEngine() setupTestGraphForBuilder(t, engine) config := DefaultBuildConfig() builder := NewGraphBuilder(engine, config) graph, err := builder.Build(context.Background()) if err != nil { t.Fatalf("Build failed: %v", err) } if len(graph) == 0 { t.Error("Expected non-empty graph") } // Verify stats stats := builder.Stats() if stats.BuildsCompleted != 1 { t.Errorf("Expected 1 build, got %d", stats.BuildsCompleted) } if stats.LastBuildNodes == 0 { t.Error("Expected node count in stats") } t.Logf("Built graph: %d nodes, %d edges in %v", stats.LastBuildNodes, stats.LastBuildEdges, stats.LastBuildTime) } func TestGraphBuilder_Parallel(t *testing.T) { engine := storage.NewMemoryEngine() // Create larger graph for parallelization test nodeCount := 100 for i := 0; i < nodeCount; i++ { engine.CreateNode(&storage.Node{ ID: storage.NodeID(fmt.Sprintf("node-%d", i)), Labels: []string{"Test"}, }) } // Create random edges edgeCount := 0 for i := 0; i < nodeCount; i++ { for j := i + 1; j < nodeCount; j++ { if (i+j)%7 == 0 { // Sparse edges engine.CreateEdge(&storage.Edge{ ID: storage.EdgeID(fmt.Sprintf("e-%d", edgeCount)), StartNode: storage.NodeID(fmt.Sprintf("node-%d", i)), EndNode: storage.NodeID(fmt.Sprintf("node-%d", j)), Type: "CONNECTS", }) edgeCount++ } } } // Test with different worker counts workerCounts := []int{1, 2, 4, runtime.NumCPU()} for _, workers := range workerCounts { t.Run(fmt.Sprintf("workers_%d", workers), func(t *testing.T) { config := DefaultBuildConfig() config.WorkerCount = workers config.ChunkSize = 25 builder := NewGraphBuilder(engine, config) start := time.Now() graph, err := builder.Build(context.Background()) elapsed := time.Since(start) if err != nil { t.Fatalf("Build failed: %v", err) } if len(graph) != nodeCount { t.Errorf("Expected %d nodes, got %d", nodeCount, len(graph)) } t.Logf("Workers=%d: %v", workers, elapsed) }) } } func TestGraphBuilder_ChunkedProcessing(t *testing.T) { engine := storage.NewMemoryEngine() // Create graph nodeCount := 50 for i := 0; i < nodeCount; i++ { engine.CreateNode(&storage.Node{ ID: storage.NodeID(fmt.Sprintf("n%d", i)), }) } chunksProcessed := int32(0) config := DefaultBuildConfig() config.ChunkSize = 10 config.ProgressCallback = func(processed, total int, elapsed time.Duration) { atomic.AddInt32(&chunksProcessed, 1) t.Logf("Progress: %d/%d (%.2fs)", processed, total, elapsed.Seconds()) } builder := NewGraphBuilder(engine, config) _, err := builder.Build(context.Background()) if err != nil { t.Fatalf("Build failed: %v", err) } // Should have processed 5 chunks (50 nodes / 10 per chunk) if chunksProcessed < 5 { t.Errorf("Expected at least 5 chunks, got %d", chunksProcessed) } } func TestGraphBuilder_ContextCancellation(t *testing.T) { engine := storage.NewMemoryEngine() // Create graph for i := 0; i < 100; i++ { engine.CreateNode(&storage.Node{ ID: storage.NodeID(fmt.Sprintf("n%d", i)), }) } config := DefaultBuildConfig() config.ChunkSize = 10 builder := NewGraphBuilder(engine, config) // Cancel context immediately ctx, cancel := context.WithCancel(context.Background()) cancel() _, err := builder.Build(ctx) if err == nil { t.Error("Expected error for cancelled context") } if err != context.Canceled { t.Errorf("Expected context.Canceled, got %v", err) } } func TestGraphBuilder_GCAfterChunk(t *testing.T) { engine := storage.NewMemoryEngine() // Create graph for i := 0; i < 30; i++ { engine.CreateNode(&storage.Node{ ID: storage.NodeID(fmt.Sprintf("n%d", i)), }) } // Test with GC enabled config := DefaultBuildConfig() config.ChunkSize = 10 config.GCAfterChunk = true builder := NewGraphBuilder(engine, config) _, err := builder.Build(context.Background()) if err != nil { t.Fatalf("Build with GC failed: %v", err) } // Test with GC disabled config.GCAfterChunk = false builder = NewGraphBuilder(engine, config) _, err = builder.Build(context.Background()) if err != nil { t.Fatalf("Build without GC failed: %v", err) } } // ============================================================================= // CACHING TESTS // ============================================================================= func TestGraphBuilder_DiskCache(t *testing.T) { engine := storage.NewMemoryEngine() setupTestGraphForBuilder(t, engine) // Create temp directory for cache cacheDir := t.TempDir() config := DefaultBuildConfig() config.CachePath = cacheDir config.CacheTTL = time.Hour builder := NewGraphBuilder(engine, config) // First build - should miss cache graph1, err := builder.Build(context.Background()) if err != nil { t.Fatalf("First build failed: %v", err) } stats := builder.Stats() if stats.CacheHits != 0 { t.Error("Expected 0 cache hits on first build") } if stats.CacheMisses != 1 { t.Errorf("Expected 1 cache miss, got %d", stats.CacheMisses) } // Verify cache file exists cachePath := filepath.Join(cacheDir, "graph_cache.gob") if _, err := os.Stat(cachePath); os.IsNotExist(err) { t.Error("Cache file not created") } // Second build - should hit cache graph2, err := builder.Build(context.Background()) if err != nil { t.Fatalf("Second build failed: %v", err) } stats = builder.Stats() if stats.CacheHits != 1 { t.Errorf("Expected 1 cache hit, got %d", stats.CacheHits) } // Verify graphs are equivalent if len(graph1) != len(graph2) { t.Errorf("Graph sizes differ: %d vs %d", len(graph1), len(graph2)) } t.Logf("Cache test: hits=%d, misses=%d", stats.CacheHits, stats.CacheMisses) } func TestGraphBuilder_CacheInvalidation(t *testing.T) { engine := storage.NewMemoryEngine() setupTestGraphForBuilder(t, engine) cacheDir := t.TempDir() config := DefaultBuildConfig() config.CachePath = cacheDir builder := NewGraphBuilder(engine, config) // Build and cache _, err := builder.Build(context.Background()) if err != nil { t.Fatalf("Build failed: %v", err) } // Invalidate cache err = builder.InvalidateCache() if err != nil { t.Fatalf("InvalidateCache failed: %v", err) } // Verify cache file is gone cachePath := filepath.Join(cacheDir, "graph_cache.gob") if _, err := os.Stat(cachePath); !os.IsNotExist(err) { t.Error("Cache file should be deleted") } // Next build should miss cache _, err = builder.Build(context.Background()) if err != nil { t.Fatalf("Rebuild failed: %v", err) } stats := builder.Stats() if stats.CacheMisses != 2 { t.Errorf("Expected 2 cache misses, got %d", stats.CacheMisses) } } func TestGraphBuilder_CacheExpiration(t *testing.T) { engine := storage.NewMemoryEngine() setupTestGraphForBuilder(t, engine) cacheDir := t.TempDir() config := DefaultBuildConfig() config.CachePath = cacheDir config.CacheTTL = 1 * time.Millisecond // Very short TTL builder := NewGraphBuilder(engine, config) // Build and cache _, err := builder.Build(context.Background()) if err != nil { t.Fatalf("Build failed: %v", err) } // Wait for cache to expire time.Sleep(10 * time.Millisecond) // Build again - should miss due to expiration _, err = builder.Build(context.Background()) if err != nil { t.Fatalf("Second build failed: %v", err) } stats := builder.Stats() // Should have 2 misses (first + expired) if stats.CacheMisses != 2 { t.Errorf("Expected 2 cache misses (expired), got %d", stats.CacheMisses) } } // ============================================================================= // INCREMENTAL UPDATE TESTS // ============================================================================= func TestGraphBuilder_ApplyDelta_AddNodes(t *testing.T) { engine := storage.NewMemoryEngine() config := DefaultBuildConfig() builder := NewGraphBuilder(engine, config) // Start with empty graph graph := make(Graph) // Add nodes via delta delta := &GraphDelta{ AddedNodes: []storage.NodeID{"a", "b", "c"}, } graph = builder.ApplyDelta(graph, delta) if len(graph) != 3 { t.Errorf("Expected 3 nodes, got %d", len(graph)) } for _, nodeID := range delta.AddedNodes { if _, exists := graph[nodeID]; !exists { t.Errorf("Node %s not found", nodeID) } } } func TestGraphBuilder_ApplyDelta_AddEdges(t *testing.T) { engine := storage.NewMemoryEngine() config := DefaultBuildConfig() config.Undirected = true builder := NewGraphBuilder(engine, config) // Start with nodes graph := Graph{ "a": make(NodeSet), "b": make(NodeSet), "c": make(NodeSet), } // Add edges via delta delta := &GraphDelta{ AddedEdges: []EdgeChange{ {From: "a", To: "b"}, {From: "b", To: "c"}, }, } graph = builder.ApplyDelta(graph, delta) // Check edges if !graph["a"].Contains("b") { t.Error("Edge a->b not found") } if !graph["b"].Contains("a") { t.Error("Edge b->a not found (undirected)") } if !graph["b"].Contains("c") { t.Error("Edge b->c not found") } } func TestGraphBuilder_ApplyDelta_RemoveNodes(t *testing.T) { engine := storage.NewMemoryEngine() config := DefaultBuildConfig() builder := NewGraphBuilder(engine, config) // Start with connected graph graph := Graph{ "a": NodeSet{"b": {}, "c": {}}, "b": NodeSet{"a": {}, "c": {}}, "c": NodeSet{"a": {}, "b": {}}, } // Remove node b delta := &GraphDelta{ RemovedNodes: []storage.NodeID{"b"}, } graph = builder.ApplyDelta(graph, delta) // b should be gone if _, exists := graph["b"]; exists { t.Error("Node b should be removed") } // Edges to b should be gone if graph["a"].Contains("b") { t.Error("Edge a->b should be removed") } if graph["c"].Contains("b") { t.Error("Edge c->b should be removed") } // a and c should still be connected if len(graph) != 2 { t.Errorf("Expected 2 nodes, got %d", len(graph)) } } func TestGraphBuilder_ApplyDelta_RemoveEdges(t *testing.T) { engine := storage.NewMemoryEngine() config := DefaultBuildConfig() config.Undirected = true builder := NewGraphBuilder(engine, config) // Start with connected graph graph := Graph{ "a": NodeSet{"b": {}, "c": {}}, "b": NodeSet{"a": {}, "c": {}}, "c": NodeSet{"a": {}, "b": {}}, } // Remove edge a-b delta := &GraphDelta{ RemovedEdges: []EdgeChange{ {From: "a", To: "b"}, }, } graph = builder.ApplyDelta(graph, delta) // a-b edge should be gone (both directions for undirected) if graph["a"].Contains("b") { t.Error("Edge a->b should be removed") } if graph["b"].Contains("a") { t.Error("Edge b->a should be removed") } // Other edges should remain if !graph["a"].Contains("c") { t.Error("Edge a->c should remain") } if !graph["b"].Contains("c") { t.Error("Edge b->c should remain") } } func TestGraphBuilder_ApplyDelta_Complex(t *testing.T) { engine := storage.NewMemoryEngine() config := DefaultBuildConfig() config.Undirected = true builder := NewGraphBuilder(engine, config) // Initial graph graph := Graph{ "a": NodeSet{"b": {}}, "b": NodeSet{"a": {}}, } // Complex delta: add nodes, add edges, remove edges delta := &GraphDelta{ AddedNodes: []storage.NodeID{"c", "d"}, AddedEdges: []EdgeChange{ {From: "a", To: "c"}, {From: "c", To: "d"}, }, RemovedEdges: []EdgeChange{ {From: "a", To: "b"}, }, } graph = builder.ApplyDelta(graph, delta) // Verify structure if len(graph) != 4 { t.Errorf("Expected 4 nodes, got %d", len(graph)) } if graph["a"].Contains("b") { t.Error("Edge a-b should be removed") } if !graph["a"].Contains("c") { t.Error("Edge a-c should exist") } if !graph["c"].Contains("d") { t.Error("Edge c-d should exist") } } // ============================================================================= // PARALLEL ALGORITHM TESTS // ============================================================================= func TestParallelCommonNeighbors(t *testing.T) { engine := storage.NewMemoryEngine() setupTestGraphForBuilder(t, engine) config := DefaultBuildConfig() builder := NewGraphBuilder(engine, config) graph, _ := builder.Build(context.Background()) sources := []storage.NodeID{"alice", "bob", "charlie"} results := ParallelCommonNeighbors(context.Background(), graph, sources, 5, nil) if len(results) != len(sources) { t.Errorf("Expected %d results, got %d", len(sources), len(results)) } for _, source := range sources { if _, exists := results[source]; !exists { t.Errorf("Missing results for %s", source) } } } func TestParallelAdamicAdar(t *testing.T) { engine := storage.NewMemoryEngine() setupTestGraphForBuilder(t, engine) config := DefaultBuildConfig() builder := NewGraphBuilder(engine, config) graph, _ := builder.Build(context.Background()) sources := []storage.NodeID{"alice", "bob"} results := ParallelAdamicAdar(context.Background(), graph, sources, 5, nil) if len(results) != len(sources) { t.Errorf("Expected %d results, got %d", len(sources), len(results)) } } func TestParallelJaccard(t *testing.T) { engine := storage.NewMemoryEngine() setupTestGraphForBuilder(t, engine) config := DefaultBuildConfig() builder := NewGraphBuilder(engine, config) graph, _ := builder.Build(context.Background()) sources := []storage.NodeID{"alice", "bob", "charlie"} results := ParallelJaccard(context.Background(), graph, sources, 5, nil) if len(results) != len(sources) { t.Errorf("Expected %d results, got %d", len(sources), len(results)) } } func TestParallelAlgorithms_Cancellation(t *testing.T) { engine := storage.NewMemoryEngine() // Create larger graph for i := 0; i < 100; i++ { engine.CreateNode(&storage.Node{ ID: storage.NodeID(fmt.Sprintf("n%d", i)), }) } config := DefaultBuildConfig() builder := NewGraphBuilder(engine, config) graph, _ := builder.Build(context.Background()) sources := make([]storage.NodeID, 50) for i := range sources { sources[i] = storage.NodeID(fmt.Sprintf("n%d", i)) } // Cancel immediately ctx, cancel := context.WithCancel(context.Background()) cancel() results := ParallelCommonNeighbors(ctx, graph, sources, 5, nil) // Should return early with partial results if len(results) >= len(sources) { t.Log("Cancellation may not have stopped all workers") } } // ============================================================================= // STREAMING TESTS // ============================================================================= func TestGraphStreamer(t *testing.T) { engine := storage.NewMemoryEngine() setupTestGraphForBuilder(t, engine) config := DefaultBuildConfig() streamer := NewGraphStreamer(engine, config) nodeCount := 0 err := streamer.StreamNodes(context.Background(), func(node *storage.Node) error { nodeCount++ return nil }) if err != nil { t.Fatalf("StreamNodes failed: %v", err) } if nodeCount == 0 { t.Error("Expected to stream some nodes") } t.Logf("Streamed %d nodes", nodeCount) } func TestGraphStreamer_Cancellation(t *testing.T) { engine := storage.NewMemoryEngine() for i := 0; i < 50; i++ { engine.CreateNode(&storage.Node{ ID: storage.NodeID(fmt.Sprintf("n%d", i)), }) } config := DefaultBuildConfig() streamer := NewGraphStreamer(engine, config) ctx, cancel := context.WithCancel(context.Background()) processed := 0 err := streamer.StreamNodes(ctx, func(node *storage.Node) error { processed++ if processed >= 10 { cancel() } return nil }) if err != context.Canceled { t.Errorf("Expected context.Canceled, got %v", err) } } // ============================================================================= // EXPORT/IMPORT TESTS // ============================================================================= func TestExportImport(t *testing.T) { // Create graph original := Graph{ "a": NodeSet{"b": {}, "c": {}}, "b": NodeSet{"a": {}, "c": {}}, "c": NodeSet{"a": {}, "b": {}}, } // Export to temp file tmpFile, err := os.CreateTemp("", "graph_*.tsv") if err != nil { t.Fatalf("Failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) err = ExportToWriter(original, tmpFile) if err != nil { t.Fatalf("Export failed: %v", err) } tmpFile.Close() // Import file, err := os.Open(tmpFile.Name()) if err != nil { t.Fatalf("Failed to open file: %v", err) } defer file.Close() imported, err := ImportFromReader(file) if err != nil { t.Fatalf("Import failed: %v", err) } // Verify if len(imported) != len(original) { t.Errorf("Node count mismatch: %d vs %d", len(imported), len(original)) } for nodeID, neighbors := range original { importedNeighbors, exists := imported[nodeID] if !exists { t.Errorf("Node %s missing from import", nodeID) continue } for neighbor := range neighbors { if !importedNeighbors.Contains(neighbor) { t.Errorf("Edge %s->%s missing from import", nodeID, neighbor) } } } } // ============================================================================= // BENCHMARK TESTS // ============================================================================= func BenchmarkGraphBuilder_Small(b *testing.B) { engine := storage.NewMemoryEngine() for i := 0; i < 100; i++ { engine.CreateNode(&storage.Node{ID: storage.NodeID(fmt.Sprintf("n%d", i))}) } config := DefaultBuildConfig() builder := NewGraphBuilder(engine, config) ctx := context.Background() b.ResetTimer() for i := 0; i < b.N; i++ { builder.InvalidateCache() builder.Build(ctx) } } func BenchmarkGraphBuilder_Parallel(b *testing.B) { engine := storage.NewMemoryEngine() for i := 0; i < 500; i++ { engine.CreateNode(&storage.Node{ID: storage.NodeID(fmt.Sprintf("n%d", i))}) } // Add edges for i := 0; i < 500; i++ { for j := i + 1; j < 500; j++ { if (i+j)%10 == 0 { engine.CreateEdge(&storage.Edge{ ID: storage.EdgeID(fmt.Sprintf("e%d-%d", i, j)), StartNode: storage.NodeID(fmt.Sprintf("n%d", i)), EndNode: storage.NodeID(fmt.Sprintf("n%d", j)), Type: "CONNECTS", }) } } } config := DefaultBuildConfig() config.WorkerCount = runtime.NumCPU() builder := NewGraphBuilder(engine, config) ctx := context.Background() b.ResetTimer() for i := 0; i < b.N; i++ { builder.InvalidateCache() builder.Build(ctx) } } func BenchmarkParallelAdamicAdar(b *testing.B) { engine := storage.NewMemoryEngine() nodeCount := 200 for i := 0; i < nodeCount; i++ { engine.CreateNode(&storage.Node{ID: storage.NodeID(fmt.Sprintf("n%d", i))}) } for i := 0; i < nodeCount; i++ { for j := i + 1; j < nodeCount; j++ { if (i+j)%5 == 0 { engine.CreateEdge(&storage.Edge{ ID: storage.EdgeID(fmt.Sprintf("e%d-%d", i, j)), StartNode: storage.NodeID(fmt.Sprintf("n%d", i)), EndNode: storage.NodeID(fmt.Sprintf("n%d", j)), Type: "CONNECTS", }) } } } config := DefaultBuildConfig() builder := NewGraphBuilder(engine, config) graph, _ := builder.Build(context.Background()) sources := make([]storage.NodeID, 50) for i := range sources { sources[i] = storage.NodeID(fmt.Sprintf("n%d", i)) } ctx := context.Background() b.ResetTimer() for i := 0; i < b.N; i++ { ParallelAdamicAdar(ctx, graph, sources, 10, nil) } } // ============================================================================= // HELPER FUNCTIONS // ============================================================================= func setupTestGraphForBuilder(t *testing.T, engine storage.Engine) { t.Helper() nodes := []string{"alice", "bob", "charlie", "diana", "eve"} for _, name := range nodes { engine.CreateNode(&storage.Node{ ID: storage.NodeID(name), Labels: []string{"Person"}, }) } edges := []struct{ from, to string }{ {"alice", "bob"}, {"alice", "charlie"}, {"bob", "charlie"}, {"bob", "diana"}, {"charlie", "diana"}, {"diana", "eve"}, } for i, e := range edges { engine.CreateEdge(&storage.Edge{ ID: storage.EdgeID(fmt.Sprintf("e%d", i)), StartNode: storage.NodeID(e.from), EndNode: storage.NodeID(e.to), Type: "KNOWS", }) } } // ============================================================================= // CONCURRENT SAFETY TESTS // ============================================================================= func TestGraphBuilder_ConcurrentBuilds(t *testing.T) { engine := storage.NewMemoryEngine() setupTestGraphForBuilder(t, engine) cacheDir := t.TempDir() config := DefaultBuildConfig() config.CachePath = cacheDir builder := NewGraphBuilder(engine, config) var wg sync.WaitGroup errors := make(chan error, 10) // Run 10 concurrent builds for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() _, err := builder.Build(context.Background()) if err != nil { errors <- err } }() } wg.Wait() close(errors) for err := range errors { t.Errorf("Concurrent build error: %v", err) } } func TestGraphBuilder_ApplyDelta_Concurrent(t *testing.T) { engine := storage.NewMemoryEngine() config := DefaultBuildConfig() builder := NewGraphBuilder(engine, config) graph := Graph{ "a": make(NodeSet), "b": make(NodeSet), } var wg sync.WaitGroup var mu sync.Mutex // Protect graph access // Concurrent delta applications with proper synchronization for i := 0; i < 10; i++ { wg.Add(1) go func(idx int) { defer wg.Done() delta := &GraphDelta{ AddedNodes: []storage.NodeID{storage.NodeID(fmt.Sprintf("node-%d", idx))}, AddedEdges: []EdgeChange{ {From: "a", To: storage.NodeID(fmt.Sprintf("node-%d", idx))}, }, } // Protect graph access with mutex mu.Lock() graph = builder.ApplyDelta(graph, delta) mu.Unlock() }(i) } wg.Wait() // Verify all nodes were added if len(graph) < 12 { // a, b, + 10 new nodes t.Errorf("Expected at least 12 nodes, got %d", len(graph)) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server