Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
parallel.go13 kB
// Package cypher provides parallel query execution support for NornicDB. // // Parallel execution significantly improves query performance on large datasets // by distributing work across multiple CPU cores. This is especially beneficial for: // - Filtering large node sets (WHERE clauses) // - Aggregation operations (COUNT, SUM, AVG, COLLECT) // - Relationship traversal // - UNION queries (parallel branch execution) // // # Neo4j Compatibility // // Neo4j Enterprise uses parallel execution for query processing. This implementation // brings similar capabilities to NornicDB, enabling comparable performance for // analytical workloads. // // # Configuration // // Parallel execution can be configured via ParallelConfig: // // config := cypher.ParallelConfig{ // Enabled: true, // MaxWorkers: 8, // Use 8 cores max // MinBatchSize: 500, // Parallelize when >500 items // } // // # ELI12 (Explain Like I'm 12) // // Imagine you have 1000 books to check for a specific word. Instead of checking // one by one, you get 4 friends to help. Each friend takes 250 books and checks // them at the same time. That's parallel execution - doing work simultaneously! package cypher import ( "runtime" "sync" "github.com/orneryd/nornicdb/pkg/storage" ) // ParallelConfig controls parallel execution behavior. type ParallelConfig struct { // Enabled enables/disables parallel execution globally Enabled bool // MaxWorkers is the maximum number of goroutines to use // Default: runtime.NumCPU() MaxWorkers int // MinBatchSize is the minimum number of items before parallelizing // Below this threshold, sequential execution is used (overhead not worth it) // Default: 1000 MinBatchSize int } // DefaultParallelConfig returns the default parallel execution configuration. func DefaultParallelConfig() ParallelConfig { return ParallelConfig{ Enabled: true, MaxWorkers: runtime.NumCPU(), MinBatchSize: 1000, // Reverted: 200 caused contention on Mac M-series } } // parallelConfig is the active configuration var parallelConfig = DefaultParallelConfig() // SetParallelConfig updates the parallel execution configuration. func SetParallelConfig(config ParallelConfig) { if config.MaxWorkers <= 0 { config.MaxWorkers = runtime.NumCPU() } if config.MinBatchSize <= 0 { config.MinBatchSize = 1000 } parallelConfig = config } // GetParallelConfig returns the current parallel execution configuration. func GetParallelConfig() ParallelConfig { return parallelConfig } // ============================================================================= // Parallel Node Filtering // ============================================================================= // FilterFunc is a function that tests whether a node matches filter criteria. type FilterFunc func(node *storage.Node) bool // parallelFilterNodes filters nodes in parallel using multiple workers. // Falls back to sequential filtering if the dataset is small. // // Parameters: // - nodes: The slice of nodes to filter // - filterFn: Function that returns true if node should be included // // Returns: // - Slice of nodes that passed the filter func parallelFilterNodes(nodes []*storage.Node, filterFn FilterFunc) []*storage.Node { if !parallelConfig.Enabled || len(nodes) < parallelConfig.MinBatchSize { // Sequential fallback for small datasets return sequentialFilterNodes(nodes, filterFn) } numWorkers := parallelConfig.MaxWorkers if numWorkers > len(nodes) { numWorkers = len(nodes) } chunkSize := (len(nodes) + numWorkers - 1) / numWorkers var wg sync.WaitGroup results := make([][]*storage.Node, numWorkers) for i := 0; i < numWorkers; i++ { start := i * chunkSize end := start + chunkSize if start >= len(nodes) { break } if end > len(nodes) { end = len(nodes) } wg.Add(1) go func(workerID int, chunk []*storage.Node) { defer wg.Done() filtered := make([]*storage.Node, 0, len(chunk)/4) // Estimate 25% pass rate for _, node := range chunk { if filterFn(node) { filtered = append(filtered, node) } } results[workerID] = filtered }(i, nodes[start:end]) } wg.Wait() // Merge results totalSize := 0 for _, r := range results { totalSize += len(r) } merged := make([]*storage.Node, 0, totalSize) for _, r := range results { merged = append(merged, r...) } return merged } // sequentialFilterNodes filters nodes sequentially (fallback for small datasets). func sequentialFilterNodes(nodes []*storage.Node, filterFn FilterFunc) []*storage.Node { result := make([]*storage.Node, 0, len(nodes)/4) for _, node := range nodes { if filterFn(node) { result = append(result, node) } } return result } // ============================================================================= // Parallel Aggregation // ============================================================================= // AggregateResult holds partial aggregation results from a worker. type AggregateResult struct { Count int64 Sum float64 Values []interface{} // For COLLECT Min interface{} Max interface{} HasData bool } // parallelCount counts nodes matching a filter in parallel. func parallelCount(nodes []*storage.Node, filterFn FilterFunc) int64 { if !parallelConfig.Enabled || len(nodes) < parallelConfig.MinBatchSize { var count int64 for _, node := range nodes { if filterFn == nil || filterFn(node) { count++ } } return count } numWorkers := parallelConfig.MaxWorkers chunkSize := (len(nodes) + numWorkers - 1) / numWorkers var wg sync.WaitGroup counts := make([]int64, numWorkers) for i := 0; i < numWorkers; i++ { start := i * chunkSize end := start + chunkSize if start >= len(nodes) { break } if end > len(nodes) { end = len(nodes) } wg.Add(1) go func(workerID int, chunk []*storage.Node) { defer wg.Done() var localCount int64 for _, node := range chunk { if filterFn == nil || filterFn(node) { localCount++ } } counts[workerID] = localCount }(i, nodes[start:end]) } wg.Wait() var total int64 for _, c := range counts { total += c } return total } // parallelSum computes sum of a property across nodes in parallel. func parallelSum(nodes []*storage.Node, property string) float64 { if !parallelConfig.Enabled || len(nodes) < parallelConfig.MinBatchSize { var sum float64 for _, node := range nodes { if val, ok := node.Properties[property]; ok { if f, ok := toFloat64(val); ok { sum += f } } } return sum } numWorkers := parallelConfig.MaxWorkers chunkSize := (len(nodes) + numWorkers - 1) / numWorkers var wg sync.WaitGroup sums := make([]float64, numWorkers) for i := 0; i < numWorkers; i++ { start := i * chunkSize end := start + chunkSize if start >= len(nodes) { break } if end > len(nodes) { end = len(nodes) } wg.Add(1) go func(workerID int, chunk []*storage.Node) { defer wg.Done() var localSum float64 for _, node := range chunk { if val, ok := node.Properties[property]; ok { if f, ok := toFloat64(val); ok { localSum += f } } } sums[workerID] = localSum }(i, nodes[start:end]) } wg.Wait() var total float64 for _, s := range sums { total += s } return total } // parallelCollect collects property values from nodes in parallel. func parallelCollect(nodes []*storage.Node, property string) []interface{} { if !parallelConfig.Enabled || len(nodes) < parallelConfig.MinBatchSize { result := make([]interface{}, 0, len(nodes)) for _, node := range nodes { if val, ok := node.Properties[property]; ok { result = append(result, val) } } return result } numWorkers := parallelConfig.MaxWorkers chunkSize := (len(nodes) + numWorkers - 1) / numWorkers var wg sync.WaitGroup results := make([][]interface{}, numWorkers) for i := 0; i < numWorkers; i++ { start := i * chunkSize end := start + chunkSize if start >= len(nodes) { break } if end > len(nodes) { end = len(nodes) } wg.Add(1) go func(workerID int, chunk []*storage.Node) { defer wg.Done() localResult := make([]interface{}, 0, len(chunk)) for _, node := range chunk { if val, ok := node.Properties[property]; ok { localResult = append(localResult, val) } } results[workerID] = localResult }(i, nodes[start:end]) } wg.Wait() // Merge results totalSize := 0 for _, r := range results { totalSize += len(r) } merged := make([]interface{}, 0, totalSize) for _, r := range results { merged = append(merged, r...) } return merged } // ============================================================================= // Parallel Map Operations // ============================================================================= // MapFunc transforms a node into a result value. type MapFunc func(node *storage.Node) interface{} // parallelMap applies a function to all nodes in parallel. func parallelMap(nodes []*storage.Node, mapFn MapFunc) []interface{} { if !parallelConfig.Enabled || len(nodes) < parallelConfig.MinBatchSize { result := make([]interface{}, len(nodes)) for i, node := range nodes { result[i] = mapFn(node) } return result } numWorkers := parallelConfig.MaxWorkers chunkSize := (len(nodes) + numWorkers - 1) / numWorkers var wg sync.WaitGroup results := make([][]interface{}, numWorkers) for i := 0; i < numWorkers; i++ { start := i * chunkSize end := start + chunkSize if start >= len(nodes) { break } if end > len(nodes) { end = len(nodes) } wg.Add(1) go func(workerID int, chunk []*storage.Node) { defer wg.Done() localResult := make([]interface{}, len(chunk)) for j, node := range chunk { localResult[j] = mapFn(node) } results[workerID] = localResult }(i, nodes[start:end]) } wg.Wait() // Merge results maintaining order merged := make([]interface{}, 0, len(nodes)) for _, r := range results { merged = append(merged, r...) } return merged } // ============================================================================= // Parallel Edge Operations // ============================================================================= // EdgeFilterFunc tests whether an edge matches filter criteria. type EdgeFilterFunc func(edge *storage.Edge) bool // parallelFilterEdges filters edges in parallel. func parallelFilterEdges(edges []*storage.Edge, filterFn EdgeFilterFunc) []*storage.Edge { if !parallelConfig.Enabled || len(edges) < parallelConfig.MinBatchSize { result := make([]*storage.Edge, 0, len(edges)/4) for _, edge := range edges { if filterFn(edge) { result = append(result, edge) } } return result } numWorkers := parallelConfig.MaxWorkers chunkSize := (len(edges) + numWorkers - 1) / numWorkers var wg sync.WaitGroup results := make([][]*storage.Edge, numWorkers) for i := 0; i < numWorkers; i++ { start := i * chunkSize end := start + chunkSize if start >= len(edges) { break } if end > len(edges) { end = len(edges) } wg.Add(1) go func(workerID int, chunk []*storage.Edge) { defer wg.Done() filtered := make([]*storage.Edge, 0, len(chunk)/4) for _, edge := range chunk { if filterFn(edge) { filtered = append(filtered, edge) } } results[workerID] = filtered }(i, edges[start:end]) } wg.Wait() // Merge results totalSize := 0 for _, r := range results { totalSize += len(r) } merged := make([]*storage.Edge, 0, totalSize) for _, r := range results { merged = append(merged, r...) } return merged } // ============================================================================= // Worker Pool for Complex Operations // ============================================================================= // WorkerPool manages a pool of worker goroutines for parallel execution. type WorkerPool struct { numWorkers int jobs chan func() wg sync.WaitGroup started bool mu sync.Mutex } // NewWorkerPool creates a new worker pool with the specified number of workers. func NewWorkerPool(numWorkers int) *WorkerPool { if numWorkers <= 0 { numWorkers = runtime.NumCPU() } return &WorkerPool{ numWorkers: numWorkers, jobs: make(chan func(), numWorkers*2), } } // Start starts the worker goroutines. func (p *WorkerPool) Start() { p.mu.Lock() defer p.mu.Unlock() if p.started { return } for i := 0; i < p.numWorkers; i++ { go func() { for job := range p.jobs { job() p.wg.Done() } }() } p.started = true } // Submit submits a job to the worker pool. func (p *WorkerPool) Submit(job func()) { p.wg.Add(1) p.jobs <- job } // Wait waits for all submitted jobs to complete. func (p *WorkerPool) Wait() { p.wg.Wait() } // Stop stops the worker pool and waits for all jobs to complete. func (p *WorkerPool) Stop() { p.mu.Lock() defer p.mu.Unlock() if !p.started { return } close(p.jobs) p.wg.Wait() p.started = false }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server