Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
graph_builder.go19.9 kB
// graph_builder.go - Optimized parallel graph construction with caching and persistence // // This file provides high-performance graph construction for link prediction: // - Streaming construction with chunked processing (fixes memory spikes) // - Parallel edge fetching with worker pool (4-8x speedup) // - Context cancellation and progress callbacks // - Disk-based graph caching (gob serialization) // - Incremental graph updates (delta changes) // // Memory Optimization: // - Processes nodes in chunks to avoid loading all at once // - Explicit GC hints between chunks // - Pre-allocated maps with capacity hints // // Performance: // - Parallel edge fetching with configurable worker count // - Lock-free score accumulation for algorithms // - Cached graph persistence to avoid rebuilds package linkpredict import ( "context" "encoding/gob" "errors" "fmt" "io" "os" "path/filepath" "runtime" "sync" "sync/atomic" "time" "github.com/orneryd/nornicdb/pkg/storage" ) // ============================================================================= // CONFIGURATION // ============================================================================= // BuildConfig controls graph construction behavior. type BuildConfig struct { // ChunkSize controls how many nodes are processed at once. // Smaller = less memory, larger = faster. // Default: 1000 ChunkSize int // WorkerCount controls parallel edge fetching. // Default: runtime.NumCPU() WorkerCount int // Undirected treats edges as bidirectional. // Default: true Undirected bool // GCAfterChunk triggers garbage collection after each chunk. // Reduces peak memory at cost of some speed. // Default: true GCAfterChunk bool // ProgressCallback is called after each chunk with progress info. // Optional - set to nil to disable. ProgressCallback func(processed, total int, elapsed time.Duration) // CachePath is the directory for persisting cached graphs. // Empty string disables caching. CachePath string // CacheTTL is how long a cached graph is valid. // Default: 1 hour CacheTTL time.Duration } // DefaultBuildConfig returns sensible defaults. func DefaultBuildConfig() *BuildConfig { return &BuildConfig{ ChunkSize: 1000, WorkerCount: runtime.NumCPU(), Undirected: true, GCAfterChunk: true, ProgressCallback: nil, CachePath: "", CacheTTL: time.Hour, } } // ============================================================================= // GRAPH BUILDER // ============================================================================= // GraphBuilder provides optimized graph construction with caching. type GraphBuilder struct { config *BuildConfig storage storage.Engine // Stats lastBuildTime time.Duration lastBuildNodes int lastBuildEdges int buildsCompleted int64 cacheHits int64 cacheMisses int64 } // NewGraphBuilder creates a new optimized graph builder. func NewGraphBuilder(engine storage.Engine, config *BuildConfig) *GraphBuilder { if config == nil { config = DefaultBuildConfig() } if config.ChunkSize <= 0 { config.ChunkSize = 1000 } if config.WorkerCount <= 0 { config.WorkerCount = runtime.NumCPU() } if config.CacheTTL <= 0 { config.CacheTTL = time.Hour } return &GraphBuilder{ config: config, storage: engine, } } // Build constructs a graph with all optimizations. // // This method: // 1. Checks for valid cached graph (if caching enabled) // 2. Streams nodes in chunks to avoid memory spikes // 3. Fetches edges in parallel with worker pool // 4. Respects context cancellation // 5. Saves graph to cache (if caching enabled) // // Returns the constructed graph or error if cancelled/failed. func (b *GraphBuilder) Build(ctx context.Context) (Graph, error) { // Handle nil storage if b.storage == nil { return nil, errors.New("storage engine is nil") } startTime := time.Now() // Check cache first if b.config.CachePath != "" { if cached, err := b.loadFromCache(); err == nil && cached != nil { atomic.AddInt64(&b.cacheHits, 1) return cached, nil } atomic.AddInt64(&b.cacheMisses, 1) } // Build graph with streaming + parallelization graph, nodeCount, edgeCount, err := b.buildStreaming(ctx) if err != nil { return nil, err } // Update stats b.lastBuildTime = time.Since(startTime) b.lastBuildNodes = nodeCount b.lastBuildEdges = edgeCount atomic.AddInt64(&b.buildsCompleted, 1) // Save to cache if b.config.CachePath != "" { if err := b.saveToCache(graph); err != nil { // Log but don't fail - cache is optional // TODO: Add proper logging } } return graph, nil } // buildStreaming constructs graph with chunked processing and parallel edge fetching. func (b *GraphBuilder) buildStreaming(ctx context.Context) (Graph, int, int, error) { // Get all nodes (we need IDs, but won't hold all data at once) nodes, err := b.storage.AllNodes() if err != nil { return nil, 0, 0, fmt.Errorf("failed to get nodes: %w", err) } totalNodes := len(nodes) if totalNodes == 0 { return make(Graph), 0, 0, nil } // Pre-allocate graph with capacity hints graph := make(Graph, totalNodes) var graphMu sync.RWMutex var totalEdges int64 // Initialize all node entries (fast, minimal memory per entry) for _, node := range nodes { graph[node.ID] = make(NodeSet) } // Process nodes in chunks startTime := time.Now() processed := 0 for start := 0; start < totalNodes; start += b.config.ChunkSize { // Check for cancellation select { case <-ctx.Done(): return nil, 0, 0, ctx.Err() default: } end := start + b.config.ChunkSize if end > totalNodes { end = totalNodes } chunk := nodes[start:end] // Process chunk with parallel edge fetching edgesInChunk := b.processChunkParallel(ctx, graph, &graphMu, chunk) atomic.AddInt64(&totalEdges, int64(edgesInChunk)) processed = end // Progress callback if b.config.ProgressCallback != nil { b.config.ProgressCallback(processed, totalNodes, time.Since(startTime)) } // GC hint to reduce peak memory if b.config.GCAfterChunk { runtime.GC() } } return graph, totalNodes, int(totalEdges), nil } // processChunkParallel fetches edges for a chunk of nodes in parallel. func (b *GraphBuilder) processChunkParallel( ctx context.Context, graph Graph, graphMu *sync.RWMutex, chunk []*storage.Node, ) int { var wg sync.WaitGroup var edgeCount int64 // Semaphore to limit concurrent workers sem := make(chan struct{}, b.config.WorkerCount) for _, node := range chunk { // Check cancellation select { case <-ctx.Done(): return int(edgeCount) default: } wg.Add(1) sem <- struct{}{} // Acquire semaphore go func(nodeID storage.NodeID) { defer wg.Done() defer func() { <-sem }() // Release semaphore edges, err := b.storage.GetOutgoingEdges(nodeID) if err != nil { return } // Lock graph for writing graphMu.Lock() for _, edge := range edges { graph[edge.StartNode][edge.EndNode] = struct{}{} if b.config.Undirected { // Ensure target node exists in graph if _, exists := graph[edge.EndNode]; exists { graph[edge.EndNode][edge.StartNode] = struct{}{} } } atomic.AddInt64(&edgeCount, 1) } graphMu.Unlock() }(node.ID) } wg.Wait() return int(edgeCount) } // ============================================================================= // CACHING (GOB SERIALIZATION) // ============================================================================= // CachedGraph is the serialized format for graph caching. type CachedGraph struct { Graph map[string][]string // Simplified format for gob Timestamp time.Time NodeCount int EdgeCount int } func init() { // Register types for gob encoding gob.Register(&CachedGraph{}) } // cacheFilePath returns the path to the cache file. func (b *GraphBuilder) cacheFilePath() string { return filepath.Join(b.config.CachePath, "graph_cache.gob") } // loadFromCache attempts to load a cached graph. func (b *GraphBuilder) loadFromCache() (Graph, error) { if b.config.CachePath == "" { return nil, errors.New("caching disabled") } path := b.cacheFilePath() file, err := os.Open(path) if err != nil { return nil, err } defer file.Close() var cached CachedGraph decoder := gob.NewDecoder(file) if err := decoder.Decode(&cached); err != nil { return nil, err } // Check TTL if time.Since(cached.Timestamp) > b.config.CacheTTL { return nil, errors.New("cache expired") } // Convert back to Graph format graph := make(Graph, len(cached.Graph)) for nodeID, neighbors := range cached.Graph { nodeSet := make(NodeSet, len(neighbors)) for _, neighbor := range neighbors { nodeSet[storage.NodeID(neighbor)] = struct{}{} } graph[storage.NodeID(nodeID)] = nodeSet } return graph, nil } // saveToCache persists the graph to disk. func (b *GraphBuilder) saveToCache(graph Graph) error { if b.config.CachePath == "" { return errors.New("caching disabled") } // Ensure directory exists if err := os.MkdirAll(b.config.CachePath, 0755); err != nil { return err } // Convert to serializable format cached := CachedGraph{ Graph: make(map[string][]string, len(graph)), Timestamp: time.Now(), NodeCount: len(graph), } edgeCount := 0 for nodeID, neighbors := range graph { neighborList := make([]string, 0, len(neighbors)) for neighbor := range neighbors { neighborList = append(neighborList, string(neighbor)) } cached.Graph[string(nodeID)] = neighborList edgeCount += len(neighbors) } cached.EdgeCount = edgeCount // Write to temp file first, then rename (atomic) path := b.cacheFilePath() tempPath := path + ".tmp" file, err := os.Create(tempPath) if err != nil { return err } encoder := gob.NewEncoder(file) if err := encoder.Encode(&cached); err != nil { file.Close() os.Remove(tempPath) return err } if err := file.Close(); err != nil { os.Remove(tempPath) return err } // Atomic rename return os.Rename(tempPath, path) } // InvalidateCache removes the cached graph. func (b *GraphBuilder) InvalidateCache() error { if b.config.CachePath == "" { return nil } path := b.cacheFilePath() if err := os.Remove(path); err != nil && !os.IsNotExist(err) { return err } return nil } // ============================================================================= // INCREMENTAL UPDATES // ============================================================================= // GraphDelta represents changes to apply to an existing graph. type GraphDelta struct { AddedNodes []storage.NodeID RemovedNodes []storage.NodeID AddedEdges []EdgeChange RemovedEdges []EdgeChange } // EdgeChange represents an edge addition or removal. type EdgeChange struct { From storage.NodeID To storage.NodeID } // ApplyDelta updates an existing graph with changes. // // This is much faster than rebuilding when only a few changes occurred. // For large deltas (>10% of graph), consider rebuilding instead. func (b *GraphBuilder) ApplyDelta(graph Graph, delta *GraphDelta) Graph { if graph == nil { graph = make(Graph) } // Remove nodes first (also removes their edges) for _, nodeID := range delta.RemovedNodes { // Remove all edges to this node neighbors := graph[nodeID] for neighbor := range neighbors { delete(graph[neighbor], nodeID) } // Remove the node delete(graph, nodeID) } // Add new nodes for _, nodeID := range delta.AddedNodes { if _, exists := graph[nodeID]; !exists { graph[nodeID] = make(NodeSet) } } // Remove edges for _, edge := range delta.RemovedEdges { if neighbors, exists := graph[edge.From]; exists { delete(neighbors, edge.To) } if b.config.Undirected { if neighbors, exists := graph[edge.To]; exists { delete(neighbors, edge.From) } } } // Add edges for _, edge := range delta.AddedEdges { if _, exists := graph[edge.From]; !exists { graph[edge.From] = make(NodeSet) } graph[edge.From][edge.To] = struct{}{} if b.config.Undirected { if _, exists := graph[edge.To]; !exists { graph[edge.To] = make(NodeSet) } graph[edge.To][edge.From] = struct{}{} } } return graph } // ============================================================================= // STATS // ============================================================================= // BuildStats contains statistics about graph building. type BuildStats struct { LastBuildTime time.Duration LastBuildNodes int LastBuildEdges int BuildsCompleted int64 CacheHits int64 CacheMisses int64 } // Stats returns build statistics. func (b *GraphBuilder) Stats() BuildStats { return BuildStats{ LastBuildTime: b.lastBuildTime, LastBuildNodes: b.lastBuildNodes, LastBuildEdges: b.lastBuildEdges, BuildsCompleted: atomic.LoadInt64(&b.buildsCompleted), CacheHits: atomic.LoadInt64(&b.cacheHits), CacheMisses: atomic.LoadInt64(&b.cacheMisses), } } // ============================================================================= // LEGACY WRAPPER // ============================================================================= // BuildGraphFromEngineOptimized is the optimized replacement for BuildGraphFromEngine. // // This version provides: // - Streaming construction (fixes memory spikes) // - Parallel edge fetching (4-8x speedup) // - Context cancellation support // - Optional progress callbacks // // Parameters: // - ctx: Context for cancellation // - engine: Storage engine // - config: Build configuration (nil uses defaults) // // Example: // // config := &linkpredict.BuildConfig{ // ChunkSize: 500, // WorkerCount: 4, // Undirected: true, // ProgressCallback: func(p, t int, e time.Duration) { // fmt.Printf("Progress: %d/%d (%.1fs)\n", p, t, e.Seconds()) // }, // } // graph, err := linkpredict.BuildGraphFromEngineOptimized(ctx, engine, config) func BuildGraphFromEngineOptimized( ctx context.Context, engine storage.Engine, config *BuildConfig, ) (Graph, error) { builder := NewGraphBuilder(engine, config) return builder.Build(ctx) } // ============================================================================= // PARALLEL ALGORITHM HELPERS // ============================================================================= // ParallelScoreConfig controls parallel score computation. type ParallelScoreConfig struct { WorkerCount int BatchSize int } // DefaultParallelScoreConfig returns defaults for parallel scoring. func DefaultParallelScoreConfig() *ParallelScoreConfig { return &ParallelScoreConfig{ WorkerCount: runtime.NumCPU(), BatchSize: 100, } } // ParallelCommonNeighbors computes common neighbors scores in parallel. // // This is useful when computing scores for many source nodes at once. func ParallelCommonNeighbors( ctx context.Context, graph Graph, sources []storage.NodeID, topK int, config *ParallelScoreConfig, ) map[storage.NodeID][]Prediction { if config == nil { config = DefaultParallelScoreConfig() } results := make(map[storage.NodeID][]Prediction) var resultsMu sync.Mutex var wg sync.WaitGroup sem := make(chan struct{}, config.WorkerCount) for _, source := range sources { select { case <-ctx.Done(): return results default: } wg.Add(1) sem <- struct{}{} go func(src storage.NodeID) { defer wg.Done() defer func() { <-sem }() preds := CommonNeighbors(graph, src, topK) resultsMu.Lock() results[src] = preds resultsMu.Unlock() }(source) } wg.Wait() return results } // ParallelAdamicAdar computes Adamic-Adar scores in parallel. func ParallelAdamicAdar( ctx context.Context, graph Graph, sources []storage.NodeID, topK int, config *ParallelScoreConfig, ) map[storage.NodeID][]Prediction { if config == nil { config = DefaultParallelScoreConfig() } results := make(map[storage.NodeID][]Prediction) var resultsMu sync.Mutex var wg sync.WaitGroup sem := make(chan struct{}, config.WorkerCount) for _, source := range sources { select { case <-ctx.Done(): return results default: } wg.Add(1) sem <- struct{}{} go func(src storage.NodeID) { defer wg.Done() defer func() { <-sem }() preds := AdamicAdar(graph, src, topK) resultsMu.Lock() results[src] = preds resultsMu.Unlock() }(source) } wg.Wait() return results } // ParallelJaccard computes Jaccard scores in parallel. func ParallelJaccard( ctx context.Context, graph Graph, sources []storage.NodeID, topK int, config *ParallelScoreConfig, ) map[storage.NodeID][]Prediction { if config == nil { config = DefaultParallelScoreConfig() } results := make(map[storage.NodeID][]Prediction) var resultsMu sync.Mutex var wg sync.WaitGroup sem := make(chan struct{}, config.WorkerCount) for _, source := range sources { select { case <-ctx.Done(): return results default: } wg.Add(1) sem <- struct{}{} go func(src storage.NodeID) { defer wg.Done() defer func() { <-sem }() preds := Jaccard(graph, src, topK) resultsMu.Lock() results[src] = preds resultsMu.Unlock() }(source) } wg.Wait() return results } // ============================================================================= // STREAMING GRAPH READER (for very large graphs) // ============================================================================= // GraphStreamer provides node-by-node graph iteration without loading all into memory. type GraphStreamer struct { engine storage.Engine config *BuildConfig } // NewGraphStreamer creates a streamer for very large graphs. func NewGraphStreamer(engine storage.Engine, config *BuildConfig) *GraphStreamer { if config == nil { config = DefaultBuildConfig() } return &GraphStreamer{ engine: engine, config: config, } } // StreamNodes iterates over all nodes without loading all into memory. func (s *GraphStreamer) StreamNodes(ctx context.Context, fn func(node *storage.Node) error) error { nodes, err := s.engine.AllNodes() if err != nil { return err } for i, node := range nodes { select { case <-ctx.Done(): return ctx.Err() default: } if err := fn(node); err != nil { return err } // GC hint every chunk if s.config.GCAfterChunk && (i+1)%s.config.ChunkSize == 0 { runtime.GC() } } return nil } // StreamEdges iterates over all edges for a node. func (s *GraphStreamer) StreamEdges(ctx context.Context, nodeID storage.NodeID, fn func(edge *storage.Edge) error) error { edges, err := s.engine.GetOutgoingEdges(nodeID) if err != nil { return err } for _, edge := range edges { select { case <-ctx.Done(): return ctx.Err() default: } if err := fn(edge); err != nil { return err } } return nil } // ============================================================================= // EXPORT HELPERS // ============================================================================= // ExportToWriter writes the graph in a simple text format. func ExportToWriter(graph Graph, w io.Writer) error { for nodeID, neighbors := range graph { for neighbor := range neighbors { if _, err := fmt.Fprintf(w, "%s\t%s\n", nodeID, neighbor); err != nil { return err } } } return nil } // ImportFromReader reads a graph from simple text format (tab-separated node pairs). func ImportFromReader(r io.Reader) (Graph, error) { graph := make(Graph) var from, to string for { _, err := fmt.Fscanf(r, "%s\t%s\n", &from, &to) if err == io.EOF { break } if err != nil { return nil, err } fromID := storage.NodeID(from) toID := storage.NodeID(to) if _, exists := graph[fromID]; !exists { graph[fromID] = make(NodeSet) } if _, exists := graph[toID]; !exists { graph[toID] = make(NodeSet) } graph[fromID][toID] = struct{}{} } return graph, nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server