// Package storage provides storage engine implementations for NornicDB.
//
// BadgerEngine provides persistent disk-based storage using BadgerDB.
// It implements the Engine interface with full ACID transaction support.
package storage
import (
"bufio"
"bytes"
"context"
"encoding/gob"
"fmt"
"log"
"os"
"strings"
"sync"
"sync/atomic"
"github.com/dgraph-io/badger/v4"
)
// Key prefixes for BadgerDB storage organization
// Using single-byte prefixes for efficiency
const (
prefixNode = byte(0x01) // nodes:nodeID -> Node
prefixEdge = byte(0x02) // edges:edgeID -> Edge
prefixLabelIndex = byte(0x03) // label:labelName:nodeID -> []byte{}
prefixOutgoingIndex = byte(0x04) // outgoing:nodeID:edgeID -> []byte{}
prefixIncomingIndex = byte(0x05) // incoming:nodeID:edgeID -> []byte{}
prefixEdgeTypeIndex = byte(0x06) // edgetype:type:edgeID -> []byte{} (NEW: for fast type lookups)
)
// BadgerEngine provides persistent storage using BadgerDB.
//
// Features:
// - ACID transactions for all operations
// - Persistent storage to disk
// - Secondary indexes for efficient queries
// - Thread-safe concurrent access
// - Automatic crash recovery
//
// Key Structure:
// - Nodes: 0x01 + nodeID -> JSON(Node)
// - Edges: 0x02 + edgeID -> JSON(Edge)
// - Label Index: 0x03 + label + 0x00 + nodeID -> empty
// - Outgoing Index: 0x04 + nodeID + 0x00 + edgeID -> empty
// - Incoming Index: 0x05 + nodeID + 0x00 + edgeID -> empty
//
// Example:
//
// engine, err := storage.NewBadgerEngine("/path/to/data")
// if err != nil {
// log.Fatal(err)
// }
// defer engine.Close()
//
// node := &storage.Node{
// ID: "user-123",
// Labels: []string{"User"},
// Properties: map[string]any{"name": "Alice"},
// }
// engine.CreateNode(node)
type BadgerEngine struct {
db *badger.DB
schema *SchemaManager
mu sync.RWMutex // Protects schema operations
closed bool
inMemory bool // True if running in memory-only mode (testing)
// Hot node cache for frequently accessed nodes
// Dramatically speeds up repeated MATCH lookups
nodeCache map[NodeID]*Node
nodeCacheMu sync.RWMutex
cacheHits int64
cacheMisses int64
// Edge type cache for mutual relationship queries
// Caches edges by type for O(1) lookup
edgeTypeCache map[string][]*Edge // edgeType -> edges of that type
edgeTypeCacheMu sync.RWMutex
}
// IsInMemory returns true if the engine is running in memory-only mode.
// In-memory mode is used for testing - there's no disk to fsync to.
func (b *BadgerEngine) IsInMemory() bool {
return b.inMemory
}
// BadgerOptions configures the BadgerDB engine.
type BadgerOptions struct {
// DataDir is the directory for storing data files.
// Required.
DataDir string
// InMemory runs BadgerDB in memory-only mode.
// Useful for testing. Data is not persisted.
InMemory bool
// SyncWrites forces fsync after each write.
// Slower but more durable.
SyncWrites bool
// Logger for BadgerDB internal logging.
// If nil, BadgerDB's default logger is used.
Logger badger.Logger
// LowMemory enables memory-constrained settings.
// Reduces MemTableSize and other buffers to use less RAM.
LowMemory bool
// HighPerformance enables aggressive caching and larger buffers.
// Uses more RAM but significantly faster writes/reads.
HighPerformance bool
}
// NewBadgerEngine creates a new persistent storage engine with default settings.
//
// This is the simplest way to create a storage engine. The engine uses BadgerDB
// for persistent disk storage with ACID transaction guarantees. All data is
// stored in the specified directory and persists across restarts.
//
// Parameters:
// - dataDir: Directory path for storing data files. Created if it doesn't exist.
//
// Returns:
// - *BadgerEngine on success
// - error if database cannot be opened (e.g., permissions, disk space)
//
// Example 1 - Basic Usage:
//
// engine, err := storage.NewBadgerEngine("./data/nornicdb")
// if err != nil {
// log.Fatal(err)
// }
// defer engine.Close()
//
// // Engine is ready - create nodes
// node := &storage.Node{
// ID: "user-1",
// Labels: []string{"User"},
// Properties: map[string]any{"name": "Alice"},
// }
// engine.CreateNode(node)
//
// Example 2 - Production Application:
//
// // Use absolute path for production
// dataDir := filepath.Join(os.Getenv("APP_HOME"), "data", "nornicdb")
// engine, err := storage.NewBadgerEngine(dataDir)
// if err != nil {
// return fmt.Errorf("failed to open database: %w", err)
// }
// defer engine.Close()
//
// Example 3 - Multiple Databases:
//
// // Main application database
// mainDB, _ := storage.NewBadgerEngine("./data/main")
// defer mainDB.Close()
//
// // Test database
// testDB, _ := storage.NewBadgerEngine("./data/test")
// defer testDB.Close()
//
// // Cache database
// cacheDB, _ := storage.NewBadgerEngine("./data/cache")
// defer cacheDB.Close()
//
// ELI12:
//
// Think of NewBadgerEngine like setting up a filing cabinet in your room.
// You tell it "put the cabinet here" (the dataDir), and it creates folders
// and organizes everything. Even if you turn off your computer, the cabinet
// stays there with all your files inside. Next time you start up, all your
// data is still there!
//
// Disk Usage:
// - Approximately 2-3x the size of your actual data
// - Includes write-ahead log and compaction overhead
//
// Thread Safety:
//
// Safe for concurrent use from multiple goroutines.
func NewBadgerEngine(dataDir string) (*BadgerEngine, error) {
return NewBadgerEngineWithOptions(BadgerOptions{
DataDir: dataDir,
})
}
// NewBadgerEngineWithOptions creates a BadgerEngine with custom configuration.
//
// Use this function when you need fine-grained control over the storage engine
// behavior, such as enabling in-memory mode for testing, forcing synchronous
// writes for maximum durability, or reducing memory usage.
//
// Parameters:
// - opts: BadgerOptions struct with configuration settings
//
// Returns:
// - *BadgerEngine on success
// - error if database cannot be opened
//
// Example 1 - In-Memory Database for Testing:
//
// engine, err := storage.NewBadgerEngineWithOptions(storage.BadgerOptions{
// DataDir: "./test", // Still needs a path but won't be used
// InMemory: true, // All data in RAM, lost on shutdown
// })
// defer engine.Close()
//
// // Perfect for unit tests - fast and clean
// testCreateNodes(engine)
//
// Example 2 - Maximum Durability for Financial Data:
//
// engine, err := storage.NewBadgerEngineWithOptions(storage.BadgerOptions{
// DataDir: "./data/transactions",
// SyncWrites: true, // Force fsync after each write (slower but safer)
// })
// // Guaranteed data persistence even if power fails
//
// Example 3 - Low Memory Mode for Embedded Devices:
//
// engine, err := storage.NewBadgerEngineWithOptions(storage.BadgerOptions{
// DataDir: "./data/nornicdb",
// LowMemory: true, // Reduces RAM usage by 50-70%
// })
// // Uses ~50MB instead of ~150MB for typical workloads
//
// Example 4 - Custom Logger Integration:
//
// logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
// engine, err := storage.NewBadgerEngineWithOptions(storage.BadgerOptions{
// DataDir: "./data/nornicdb",
// Logger: &BadgerLogger{zlog: logger}, // Custom logging
// })
//
// ELI12:
//
// NewBadgerEngine is like getting a basic backpack for school.
// NewBadgerEngineWithOptions is like customizing your backpack - you can:
// - Make it waterproof (SyncWrites = true)
// - Make it lighter but less storage (LowMemory = true)
// - Use it as a temporary bag (InMemory = true)
// - Add custom labels (Logger)
//
// Configuration Trade-offs:
// - SyncWrites=true: Slower writes (2-5x) but maximum safety
// - LowMemory=true: Less RAM but slightly slower
// - InMemory=true: Fastest but data lost on shutdown
//
// Thread Safety:
//
// Safe for concurrent use from multiple goroutines.
func NewBadgerEngineWithOptions(opts BadgerOptions) (*BadgerEngine, error) {
badgerOpts := badger.DefaultOptions(opts.DataDir)
if opts.InMemory {
badgerOpts = badgerOpts.WithInMemory(true)
}
if opts.SyncWrites {
badgerOpts = badgerOpts.WithSyncWrites(true)
}
if opts.Logger != nil {
badgerOpts = badgerOpts.WithLogger(opts.Logger)
} else {
// Use a quiet logger by default
badgerOpts = badgerOpts.WithLogger(nil)
}
if opts.HighPerformance {
// HIGH PERFORMANCE MODE: Maximize speed, use more RAM
// Target: Get close to in-memory performance for small-medium datasets
badgerOpts = badgerOpts.
WithMemTableSize(128 << 20). // 128MB memtable (8x default) - fewer flushes
WithValueLogFileSize(256 << 20). // 256MB value log files
WithNumMemtables(5). // 5 memtables for write buffering
WithNumLevelZeroTables(10). // More L0 tables before compaction
WithNumLevelZeroTablesStall(20). // Higher stall threshold
WithValueThreshold(1 << 20). // 1MB - keep most values in LSM tree
WithBlockCacheSize(256 << 20). // 256MB block cache
WithIndexCacheSize(128 << 20). // 128MB index cache
WithNumCompactors(4). // More parallel compaction
WithCompactL0OnClose(false). // Don't compact on close (faster shutdown)
WithDetectConflicts(false) // Skip conflict detection (we handle it)
} else if opts.LowMemory {
// LOW MEMORY MODE: Minimize RAM usage
badgerOpts = badgerOpts.
WithMemTableSize(8 << 20). // 8MB memtable
WithValueLogFileSize(32 << 20). // 32MB value log
WithNumMemtables(1). // Single memtable
WithNumLevelZeroTables(1). // Aggressive compaction
WithNumLevelZeroTablesStall(2).
WithValueThreshold(512). // Small values in LSM
WithBlockCacheSize(8 << 20). // 8MB block cache
WithIndexCacheSize(4 << 20) // 4MB index cache
} else {
// DEFAULT: Balanced settings
badgerOpts = badgerOpts.
WithMemTableSize(64 << 20). // 64MB memtable (default)
WithValueLogFileSize(128 << 20). // 128MB value log
WithNumMemtables(3). // 3 memtables
WithNumLevelZeroTables(5). // Default L0 tables
WithNumLevelZeroTablesStall(10).
WithValueThreshold(64 << 10). // 64KB threshold - allows larger property values
WithBlockCacheSize(64 << 20). // 64MB block cache
WithIndexCacheSize(32 << 20) // 32MB index cache
}
db, err := badger.Open(badgerOpts)
if err != nil {
return nil, fmt.Errorf("failed to open BadgerDB: %w", err)
}
return &BadgerEngine{
db: db,
schema: NewSchemaManager(),
inMemory: opts.InMemory,
nodeCache: make(map[NodeID]*Node, 10000), // Cache up to 10K hot nodes
edgeTypeCache: make(map[string][]*Edge, 100), // Cache edges by type for mutual queries
}, nil
}
// NewBadgerEngineInMemory creates an in-memory BadgerDB for testing.
//
// Data is not persisted and is lost when the engine is closed.
// Useful for unit tests that need persistent storage semantics
// without actual disk I/O.
//
// Example:
//
// engine, err := storage.NewBadgerEngineInMemory()
// if err != nil {
// t.Fatal(err)
// }
// defer engine.Close()
//
// // Use engine for testing...
func NewBadgerEngineInMemory() (*BadgerEngine, error) {
return NewBadgerEngineWithOptions(BadgerOptions{
InMemory: true,
})
}
// ============================================================================
// Key encoding helpers
// ============================================================================
// nodeKey creates a key for storing a node.
func nodeKey(id NodeID) []byte {
return append([]byte{prefixNode}, []byte(id)...)
}
// edgeKey creates a key for storing an edge.
func edgeKey(id EdgeID) []byte {
return append([]byte{prefixEdge}, []byte(id)...)
}
// labelIndexKey creates a key for the label index.
// Format: prefix + label (lowercase) + 0x00 + nodeID
// Labels are normalized to lowercase for case-insensitive matching (Neo4j compatible)
func labelIndexKey(label string, nodeID NodeID) []byte {
normalizedLabel := strings.ToLower(label)
key := make([]byte, 0, 1+len(normalizedLabel)+1+len(nodeID))
key = append(key, prefixLabelIndex)
key = append(key, []byte(normalizedLabel)...)
key = append(key, 0x00) // Separator
key = append(key, []byte(nodeID)...)
return key
}
// labelIndexPrefix returns the prefix for scanning all nodes with a label.
// Labels are normalized to lowercase for case-insensitive matching (Neo4j compatible)
func labelIndexPrefix(label string) []byte {
normalizedLabel := strings.ToLower(label)
key := make([]byte, 0, 1+len(normalizedLabel)+1)
key = append(key, prefixLabelIndex)
key = append(key, []byte(normalizedLabel)...)
key = append(key, 0x00)
return key
}
// outgoingIndexKey creates a key for the outgoing edge index.
func outgoingIndexKey(nodeID NodeID, edgeID EdgeID) []byte {
key := make([]byte, 0, 1+len(nodeID)+1+len(edgeID))
key = append(key, prefixOutgoingIndex)
key = append(key, []byte(nodeID)...)
key = append(key, 0x00)
key = append(key, []byte(edgeID)...)
return key
}
// outgoingIndexPrefix returns the prefix for scanning outgoing edges.
func outgoingIndexPrefix(nodeID NodeID) []byte {
key := make([]byte, 0, 1+len(nodeID)+1)
key = append(key, prefixOutgoingIndex)
key = append(key, []byte(nodeID)...)
key = append(key, 0x00)
return key
}
// incomingIndexKey creates a key for the incoming edge index.
func incomingIndexKey(nodeID NodeID, edgeID EdgeID) []byte {
key := make([]byte, 0, 1+len(nodeID)+1+len(edgeID))
key = append(key, prefixIncomingIndex)
key = append(key, []byte(nodeID)...)
key = append(key, 0x00)
key = append(key, []byte(edgeID)...)
return key
}
// incomingIndexPrefix returns the prefix for scanning incoming edges.
func incomingIndexPrefix(nodeID NodeID) []byte {
key := make([]byte, 0, 1+len(nodeID)+1)
key = append(key, prefixIncomingIndex)
key = append(key, []byte(nodeID)...)
key = append(key, 0x00)
return key
}
// edgeTypeIndexKey creates a key for the edge type index.
// Format: prefix + edgeType (lowercase) + 0x00 + edgeID
func edgeTypeIndexKey(edgeType string, edgeID EdgeID) []byte {
normalizedType := strings.ToLower(edgeType)
key := make([]byte, 0, 1+len(normalizedType)+1+len(edgeID))
key = append(key, prefixEdgeTypeIndex)
key = append(key, []byte(normalizedType)...)
key = append(key, 0x00) // Separator
key = append(key, []byte(edgeID)...)
return key
}
// edgeTypeIndexPrefix returns the prefix for scanning all edges of a type.
// Edge types are normalized to lowercase for case-insensitive matching (Neo4j compatible)
func edgeTypeIndexPrefix(edgeType string) []byte {
normalizedType := strings.ToLower(edgeType)
key := make([]byte, 0, 1+len(normalizedType)+1)
key = append(key, prefixEdgeTypeIndex)
key = append(key, []byte(normalizedType)...)
key = append(key, 0x00)
return key
}
// extractEdgeIDFromIndexKey extracts the edgeID from an index key.
// Format: prefix + nodeID + 0x00 + edgeID
func extractEdgeIDFromIndexKey(key []byte) EdgeID {
// Find the separator (0x00)
for i := 1; i < len(key); i++ {
if key[i] == 0x00 {
return EdgeID(key[i+1:])
}
}
return ""
}
// extractNodeIDFromLabelIndex extracts the nodeID from a label index key.
// Format: prefix + label + 0x00 + nodeID
func extractNodeIDFromLabelIndex(key []byte, labelLen int) NodeID {
// Skip prefix (1) + label (labelLen) + separator (1)
offset := 1 + labelLen + 1
if offset >= len(key) {
return ""
}
return NodeID(key[offset:])
}
// ============================================================================
// Serialization helpers
// ============================================================================
// encodeNode serializes a Node using gob (preserves Go types like int64).
func encodeNode(n *Node) ([]byte, error) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(n); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// decodeNode deserializes a Node from gob.
func decodeNode(data []byte) (*Node, error) {
var node Node
if err := gob.NewDecoder(bytes.NewReader(data)).Decode(&node); err != nil {
return nil, err
}
return &node, nil
}
// encodeEdge serializes an Edge using gob (preserves Go types).
func encodeEdge(e *Edge) ([]byte, error) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(e); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// decodeEdge deserializes an Edge from gob.
func decodeEdge(data []byte) (*Edge, error) {
var edge Edge
if err := gob.NewDecoder(bytes.NewReader(data)).Decode(&edge); err != nil {
return nil, err
}
return &edge, nil
}
// ============================================================================
// Node Operations
// ============================================================================
// CreateNode creates a new node in persistent storage.
func (b *BadgerEngine) CreateNode(node *Node) error {
if node == nil {
return ErrInvalidData
}
if node.ID == "" {
return ErrInvalidID
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
// Check unique constraints for all labels and properties
for _, label := range node.Labels {
for propName, propValue := range node.Properties {
if err := b.schema.CheckUniqueConstraint(label, propName, propValue, ""); err != nil {
return fmt.Errorf("constraint violation: %w", err)
}
}
}
err := b.db.Update(func(txn *badger.Txn) error {
// Check if node already exists
key := nodeKey(node.ID)
_, err := txn.Get(key)
if err == nil {
return ErrAlreadyExists
}
if err != badger.ErrKeyNotFound {
return err
}
// Serialize node
data, err := encodeNode(node)
if err != nil {
return fmt.Errorf("failed to encode node: %w", err)
}
// Store node
if err := txn.Set(key, data); err != nil {
return err
}
// Create label indexes
for _, label := range node.Labels {
labelKey := labelIndexKey(label, node.ID)
if err := txn.Set(labelKey, []byte{}); err != nil {
return err
}
}
return nil
})
// On successful create, update cache and register unique constraint values
if err == nil {
// Store deep copy in cache to prevent external mutation
b.nodeCacheMu.Lock()
b.nodeCache[node.ID] = copyNode(node)
b.nodeCacheMu.Unlock()
// Register unique constraint values
for _, label := range node.Labels {
for propName, propValue := range node.Properties {
b.schema.RegisterUniqueValue(label, propName, propValue, node.ID)
}
}
}
return err
}
// GetNode retrieves a node by ID.
func (b *BadgerEngine) GetNode(id NodeID) (*Node, error) {
if id == "" {
return nil, ErrInvalidID
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return nil, ErrStorageClosed
}
b.mu.RUnlock()
// Check cache first
b.nodeCacheMu.RLock()
if cached, ok := b.nodeCache[id]; ok {
b.nodeCacheMu.RUnlock()
atomic.AddInt64(&b.cacheHits, 1)
// Return copy to prevent external mutation of cache
return copyNode(cached), nil
}
b.nodeCacheMu.RUnlock()
atomic.AddInt64(&b.cacheMisses, 1)
var node *Node
err := b.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(nodeKey(id))
if err == badger.ErrKeyNotFound {
return ErrNotFound
}
if err != nil {
return err
}
return item.Value(func(val []byte) error {
var decodeErr error
node, decodeErr = decodeNode(val)
return decodeErr
})
})
// Cache the result on successful fetch
if err == nil && node != nil {
b.nodeCacheMu.Lock()
// Simple eviction: if cache is too large, clear it
if len(b.nodeCache) > 10000 {
b.nodeCache = make(map[NodeID]*Node, 10000)
}
b.nodeCache[id] = copyNode(node)
b.nodeCacheMu.Unlock()
}
return node, err
}
// UpdateNode updates an existing node.
func (b *BadgerEngine) UpdateNode(node *Node) error {
if node == nil {
return ErrInvalidData
}
if node.ID == "" {
return ErrInvalidID
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
err := b.db.Update(func(txn *badger.Txn) error {
key := nodeKey(node.ID)
// Get existing node for label index updates (if exists)
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
// Node doesn't exist - do an insert (upsert behavior)
data, err := encodeNode(node)
if err != nil {
return fmt.Errorf("failed to encode node: %w", err)
}
if err := txn.Set(key, data); err != nil {
return err
}
// Create label indexes
for _, label := range node.Labels {
if err := txn.Set(labelIndexKey(label, node.ID), []byte{}); err != nil {
return err
}
}
return nil
}
if err != nil {
return err
}
// Node exists - update it
var existing *Node
if err := item.Value(func(val []byte) error {
var decodeErr error
existing, decodeErr = decodeNode(val)
return decodeErr
}); err != nil {
return err
}
// Remove old label indexes
for _, label := range existing.Labels {
if err := txn.Delete(labelIndexKey(label, node.ID)); err != nil {
return err
}
}
// Serialize and store updated node
data, err := encodeNode(node)
if err != nil {
return fmt.Errorf("failed to encode node: %w", err)
}
if err := txn.Set(key, data); err != nil {
return err
}
// Create new label indexes
for _, label := range node.Labels {
if err := txn.Set(labelIndexKey(label, node.ID), []byte{}); err != nil {
return err
}
}
return nil
})
// Update cache on successful update
if err == nil {
b.nodeCacheMu.Lock()
b.nodeCache[node.ID] = node
b.nodeCacheMu.Unlock()
}
return err
}
// DeleteNode removes a node and all its edges.
func (b *BadgerEngine) DeleteNode(id NodeID) error {
if id == "" {
return ErrInvalidID
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
err := b.db.Update(func(txn *badger.Txn) error {
key := nodeKey(id)
// Get node for label cleanup
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
return ErrNotFound
}
if err != nil {
return err
}
var node *Node
if err := item.Value(func(val []byte) error {
var decodeErr error
node, decodeErr = decodeNode(val)
return decodeErr
}); err != nil {
return err
}
// Delete label indexes
for _, label := range node.Labels {
if err := txn.Delete(labelIndexKey(label, id)); err != nil {
return err
}
}
// Delete outgoing edges
outPrefix := outgoingIndexPrefix(id)
if err := b.deleteEdgesWithPrefix(txn, outPrefix); err != nil {
return err
}
// Delete incoming edges
inPrefix := incomingIndexPrefix(id)
if err := b.deleteEdgesWithPrefix(txn, inPrefix); err != nil {
return err
}
// Delete the node
return txn.Delete(key)
})
// Invalidate cache on successful delete
if err == nil {
b.nodeCacheMu.Lock()
delete(b.nodeCache, id)
b.nodeCacheMu.Unlock()
}
return err
}
// deleteEdgesWithPrefix deletes all edges matching a prefix (helper for DeleteNode).
func (b *BadgerEngine) deleteEdgesWithPrefix(txn *badger.Txn, prefix []byte) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()
var edgeIDs []EdgeID
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
edgeID := extractEdgeIDFromIndexKey(it.Item().Key())
edgeIDs = append(edgeIDs, edgeID)
}
for _, edgeID := range edgeIDs {
if err := b.deleteEdgeInTxn(txn, edgeID); err != nil && err != ErrNotFound {
return err
}
}
return nil
}
// ============================================================================
// Edge Operations
// ============================================================================
// CreateEdge creates a new edge between two nodes.
func (b *BadgerEngine) CreateEdge(edge *Edge) error {
if edge == nil {
return ErrInvalidData
}
if edge.ID == "" {
return ErrInvalidID
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
err := b.db.Update(func(txn *badger.Txn) error {
// Check if edge already exists
key := edgeKey(edge.ID)
_, err := txn.Get(key)
if err == nil {
return ErrAlreadyExists
}
if err != badger.ErrKeyNotFound {
return err
}
// Verify start node exists
_, err = txn.Get(nodeKey(edge.StartNode))
if err == badger.ErrKeyNotFound {
return ErrNotFound
}
if err != nil {
return err
}
// Verify end node exists
_, err = txn.Get(nodeKey(edge.EndNode))
if err == badger.ErrKeyNotFound {
return ErrNotFound
}
if err != nil {
return err
}
// Serialize edge
data, err := encodeEdge(edge)
if err != nil {
return fmt.Errorf("failed to encode edge: %w", err)
}
// Store edge
if err := txn.Set(key, data); err != nil {
return err
}
// Create outgoing index
outKey := outgoingIndexKey(edge.StartNode, edge.ID)
if err := txn.Set(outKey, []byte{}); err != nil {
return err
}
// Create incoming index
inKey := incomingIndexKey(edge.EndNode, edge.ID)
if err := txn.Set(inKey, []byte{}); err != nil {
return err
}
// Create edge type index
edgeTypeKey := edgeTypeIndexKey(edge.Type, edge.ID)
if err := txn.Set(edgeTypeKey, []byte{}); err != nil {
return err
}
return nil
})
// Invalidate only this edge type (not entire cache)
if err == nil {
b.InvalidateEdgeTypeCacheForType(edge.Type)
}
return err
}
// GetEdge retrieves an edge by ID.
func (b *BadgerEngine) GetEdge(id EdgeID) (*Edge, error) {
if id == "" {
return nil, ErrInvalidID
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return nil, ErrStorageClosed
}
b.mu.RUnlock()
var edge *Edge
err := b.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(edgeKey(id))
if err == badger.ErrKeyNotFound {
return ErrNotFound
}
if err != nil {
return err
}
return item.Value(func(val []byte) error {
var decodeErr error
edge, decodeErr = decodeEdge(val)
return decodeErr
})
})
return edge, err
}
// UpdateEdge updates an existing edge.
func (b *BadgerEngine) UpdateEdge(edge *Edge) error {
if edge == nil {
return ErrInvalidData
}
if edge.ID == "" {
return ErrInvalidID
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
return b.db.Update(func(txn *badger.Txn) error {
key := edgeKey(edge.ID)
// Get existing edge
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
return ErrNotFound
}
if err != nil {
return err
}
var existing *Edge
if err := item.Value(func(val []byte) error {
var decodeErr error
existing, decodeErr = decodeEdge(val)
return decodeErr
}); err != nil {
return err
}
// If endpoints changed, update indexes
if existing.StartNode != edge.StartNode || existing.EndNode != edge.EndNode {
// Verify new endpoints exist
if _, err := txn.Get(nodeKey(edge.StartNode)); err == badger.ErrKeyNotFound {
return ErrNotFound
}
if _, err := txn.Get(nodeKey(edge.EndNode)); err == badger.ErrKeyNotFound {
return ErrNotFound
}
// Remove old indexes
if err := txn.Delete(outgoingIndexKey(existing.StartNode, edge.ID)); err != nil {
return err
}
if err := txn.Delete(incomingIndexKey(existing.EndNode, edge.ID)); err != nil {
return err
}
// Add new indexes
if err := txn.Set(outgoingIndexKey(edge.StartNode, edge.ID), []byte{}); err != nil {
return err
}
if err := txn.Set(incomingIndexKey(edge.EndNode, edge.ID), []byte{}); err != nil {
return err
}
}
// Store updated edge
data, err := encodeEdge(edge)
if err != nil {
return fmt.Errorf("failed to encode edge: %w", err)
}
return txn.Set(key, data)
})
}
// DeleteEdge removes an edge.
func (b *BadgerEngine) DeleteEdge(id EdgeID) error {
if id == "" {
return ErrInvalidID
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
// Get edge type before deletion for selective cache invalidation
edge, _ := b.GetEdge(id)
var edgeType string
if edge != nil {
edgeType = edge.Type
}
err := b.db.Update(func(txn *badger.Txn) error {
return b.deleteEdgeInTxn(txn, id)
})
// Invalidate only this edge type (not entire cache)
if err == nil && edgeType != "" {
b.InvalidateEdgeTypeCacheForType(edgeType)
}
return err
}
// deleteEdgeInTxn is the internal helper for deleting an edge within a transaction.
func (b *BadgerEngine) deleteEdgeInTxn(txn *badger.Txn, id EdgeID) error {
key := edgeKey(id)
// Get edge for index cleanup
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
return ErrNotFound
}
if err != nil {
return err
}
var edge *Edge
if err := item.Value(func(val []byte) error {
var decodeErr error
edge, decodeErr = decodeEdge(val)
return decodeErr
}); err != nil {
return err
}
// Delete indexes
if err := txn.Delete(outgoingIndexKey(edge.StartNode, id)); err != nil {
return err
}
if err := txn.Delete(incomingIndexKey(edge.EndNode, id)); err != nil {
return err
}
if err := txn.Delete(edgeTypeIndexKey(edge.Type, id)); err != nil {
return err
}
// Delete edge
return txn.Delete(key)
}
// deleteNodeInTxn is the internal helper for deleting a node within a transaction.
func (b *BadgerEngine) deleteNodeInTxn(txn *badger.Txn, id NodeID) error {
key := nodeKey(id)
// Get node for label cleanup
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
return ErrNotFound
}
if err != nil {
return err
}
var node *Node
if err := item.Value(func(val []byte) error {
var decodeErr error
node, decodeErr = decodeNode(val)
return decodeErr
}); err != nil {
return err
}
// Delete label indexes
for _, label := range node.Labels {
if err := txn.Delete(labelIndexKey(label, id)); err != nil {
return err
}
}
// Delete outgoing edges
outPrefix := outgoingIndexPrefix(id)
if err := b.deleteEdgesWithPrefix(txn, outPrefix); err != nil {
return err
}
// Delete incoming edges
inPrefix := incomingIndexPrefix(id)
if err := b.deleteEdgesWithPrefix(txn, inPrefix); err != nil {
return err
}
// Delete the node
return txn.Delete(key)
}
// BulkDeleteNodes removes multiple nodes in a single transaction.
// This is much faster than calling DeleteNode repeatedly.
func (b *BadgerEngine) BulkDeleteNodes(ids []NodeID) error {
if len(ids) == 0 {
return nil
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
err := b.db.Update(func(txn *badger.Txn) error {
for _, id := range ids {
if id == "" {
continue // Skip invalid IDs
}
// Best effort - continue on ErrNotFound
if err := b.deleteNodeInTxn(txn, id); err != nil && err != ErrNotFound {
return err
}
}
return nil
})
// Invalidate cache for deleted nodes
if err == nil {
b.nodeCacheMu.Lock()
for _, id := range ids {
delete(b.nodeCache, id)
}
b.nodeCacheMu.Unlock()
}
return err
}
// BulkDeleteEdges removes multiple edges in a single transaction.
// This is much faster than calling DeleteEdge repeatedly.
func (b *BadgerEngine) BulkDeleteEdges(ids []EdgeID) error {
if len(ids) == 0 {
return nil
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
err := b.db.Update(func(txn *badger.Txn) error {
for _, id := range ids {
if id == "" {
continue // Skip invalid IDs
}
// Best effort - continue on ErrNotFound
if err := b.deleteEdgeInTxn(txn, id); err != nil && err != ErrNotFound {
return err
}
}
return nil
})
// Invalidate edge type cache on successful bulk delete
if err == nil && len(ids) > 0 {
b.InvalidateEdgeTypeCache()
}
return err
}
// ============================================================================
// Query Operations
// ============================================================================
// GetFirstNodeByLabel returns the first node with the specified label.
// This is optimized for MATCH...LIMIT 1 patterns - stops after first match.
func (b *BadgerEngine) GetFirstNodeByLabel(label string) (*Node, error) {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return nil, ErrStorageClosed
}
b.mu.RUnlock()
var node *Node
err := b.db.View(func(txn *badger.Txn) error {
prefix := labelIndexPrefix(label)
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
nodeID := extractNodeIDFromLabelIndex(it.Item().Key(), len(label))
if nodeID == "" {
continue
}
item, err := txn.Get(nodeKey(nodeID))
if err != nil {
continue
}
if err := item.Value(func(val []byte) error {
var decodeErr error
node, decodeErr = decodeNode(val)
return decodeErr
}); err != nil {
continue
}
return nil // Found first node, stop
}
return nil
})
return node, err
}
// GetNodesByLabel returns all nodes with the specified label.
func (b *BadgerEngine) GetNodesByLabel(label string) ([]*Node, error) {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return nil, ErrStorageClosed
}
b.mu.RUnlock()
// Single-pass: iterate label index and fetch nodes in same transaction
// This reduces transaction overhead compared to two-phase approach
var nodes []*Node
err := b.db.View(func(txn *badger.Txn) error {
prefix := labelIndexPrefix(label)
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false // Don't need values from index keys
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
nodeID := extractNodeIDFromLabelIndex(it.Item().Key(), len(label))
if nodeID == "" {
continue
}
// Fetch node data in same transaction
item, err := txn.Get(nodeKey(nodeID))
if err != nil {
continue // Skip if node was deleted
}
var node *Node
if err := item.Value(func(val []byte) error {
var decodeErr error
node, decodeErr = decodeNode(val)
return decodeErr
}); err != nil {
continue
}
nodes = append(nodes, node)
}
return nil
})
if err != nil {
return nil, err
}
return nodes, nil
}
// GetAllNodes returns all nodes in the storage.
func (b *BadgerEngine) GetAllNodes() []*Node {
nodes, _ := b.AllNodes()
return nodes
}
// AllNodes returns all nodes (implements Engine interface).
func (b *BadgerEngine) AllNodes() ([]*Node, error) {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return nil, ErrStorageClosed
}
b.mu.RUnlock()
var nodes []*Node
err := b.db.View(func(txn *badger.Txn) error {
prefix := []byte{prefixNode}
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
var node *Node
if err := it.Item().Value(func(val []byte) error {
var decodeErr error
node, decodeErr = decodeNode(val)
return decodeErr
}); err != nil {
continue
}
nodes = append(nodes, node)
}
return nil
})
return nodes, err
}
// AllEdges returns all edges (implements Engine interface).
func (b *BadgerEngine) AllEdges() ([]*Edge, error) {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return nil, ErrStorageClosed
}
b.mu.RUnlock()
var edges []*Edge
err := b.db.View(func(txn *badger.Txn) error {
prefix := []byte{prefixEdge}
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
var edge *Edge
if err := it.Item().Value(func(val []byte) error {
var decodeErr error
edge, decodeErr = decodeEdge(val)
return decodeErr
}); err != nil {
continue
}
edges = append(edges, edge)
}
return nil
})
return edges, err
}
// GetEdgesByType returns all edges of a specific type using the edge type index.
// This is MUCH faster than AllEdges() for queries like mutual follows.
// Edge types are matched case-insensitively (Neo4j compatible).
// Results are cached per type to speed up repeated queries.
func (b *BadgerEngine) GetEdgesByType(edgeType string) ([]*Edge, error) {
if edgeType == "" {
return b.AllEdges() // No type filter = all edges
}
normalizedType := strings.ToLower(edgeType)
// Check cache first
b.edgeTypeCacheMu.RLock()
if cached, ok := b.edgeTypeCache[normalizedType]; ok {
b.edgeTypeCacheMu.RUnlock()
return cached, nil
}
b.edgeTypeCacheMu.RUnlock()
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return nil, ErrStorageClosed
}
b.mu.RUnlock()
var edges []*Edge
err := b.db.View(func(txn *badger.Txn) error {
prefix := edgeTypeIndexPrefix(edgeType)
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false // We only need the key to get edgeID
it := txn.NewIterator(opts)
defer it.Close()
// Collect edge IDs from index
var edgeIDs []EdgeID
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
key := it.Item().Key()
// Extract edgeID from key: prefix + type + 0x00 + edgeID
sepIdx := bytes.LastIndexByte(key, 0x00)
if sepIdx >= 0 && sepIdx < len(key)-1 {
edgeIDs = append(edgeIDs, EdgeID(key[sepIdx+1:]))
}
}
// Batch fetch edges
edges = make([]*Edge, 0, len(edgeIDs))
for _, edgeID := range edgeIDs {
item, err := txn.Get(edgeKey(edgeID))
if err != nil {
continue
}
var edge *Edge
if err := item.Value(func(val []byte) error {
var decodeErr error
edge, decodeErr = decodeEdge(val)
return decodeErr
}); err != nil {
continue
}
edges = append(edges, edge)
}
return nil
})
if err != nil {
return nil, err
}
// Cache the result (simple LRU-style: clear if too many types)
b.edgeTypeCacheMu.Lock()
if len(b.edgeTypeCache) > 50 {
b.edgeTypeCache = make(map[string][]*Edge, 50)
}
b.edgeTypeCache[normalizedType] = edges
b.edgeTypeCacheMu.Unlock()
return edges, nil
}
// InvalidateEdgeTypeCache clears the entire edge type cache.
// Called after bulk edge mutations to ensure cache consistency.
func (b *BadgerEngine) InvalidateEdgeTypeCache() {
b.edgeTypeCacheMu.Lock()
b.edgeTypeCache = make(map[string][]*Edge, 50)
b.edgeTypeCacheMu.Unlock()
}
// InvalidateEdgeTypeCacheForType removes only the specified edge type from cache.
// Much faster than full invalidation for single-edge operations.
func (b *BadgerEngine) InvalidateEdgeTypeCacheForType(edgeType string) {
if edgeType == "" {
return
}
normalizedType := strings.ToLower(edgeType)
b.edgeTypeCacheMu.Lock()
delete(b.edgeTypeCache, normalizedType)
b.edgeTypeCacheMu.Unlock()
}
// BatchGetNodes fetches multiple nodes in a single transaction.
// Returns a map for O(1) lookup by ID. Missing nodes are not included in the result.
// This is optimized for traversal operations that need to fetch many nodes.
func (b *BadgerEngine) BatchGetNodes(ids []NodeID) (map[NodeID]*Node, error) {
if len(ids) == 0 {
return make(map[NodeID]*Node), nil
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return nil, ErrStorageClosed
}
b.mu.RUnlock()
result := make(map[NodeID]*Node, len(ids))
err := b.db.View(func(txn *badger.Txn) error {
for _, id := range ids {
if id == "" {
continue
}
item, err := txn.Get(nodeKey(id))
if err != nil {
continue // Skip missing nodes
}
var node *Node
if err := item.Value(func(val []byte) error {
var decodeErr error
node, decodeErr = decodeNode(val)
return decodeErr
}); err != nil {
continue
}
result[id] = node
}
return nil
})
if err != nil {
return nil, err
}
return result, nil
}
// GetOutgoingEdges returns all edges where the given node is the source.
func (b *BadgerEngine) GetOutgoingEdges(nodeID NodeID) ([]*Edge, error) {
if nodeID == "" {
return nil, ErrInvalidID
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return nil, ErrStorageClosed
}
b.mu.RUnlock()
var edges []*Edge
err := b.db.View(func(txn *badger.Txn) error {
prefix := outgoingIndexPrefix(nodeID)
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
edgeID := extractEdgeIDFromIndexKey(it.Item().Key())
if edgeID == "" {
continue
}
// Get the edge
item, err := txn.Get(edgeKey(edgeID))
if err != nil {
continue
}
var edge *Edge
if err := item.Value(func(val []byte) error {
var decodeErr error
edge, decodeErr = decodeEdge(val)
return decodeErr
}); err != nil {
continue
}
edges = append(edges, edge)
}
return nil
})
if err != nil {
return nil, err
}
return edges, nil
}
// GetIncomingEdges returns all edges where the given node is the target.
func (b *BadgerEngine) GetIncomingEdges(nodeID NodeID) ([]*Edge, error) {
if nodeID == "" {
return nil, ErrInvalidID
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return nil, ErrStorageClosed
}
b.mu.RUnlock()
var edges []*Edge
err := b.db.View(func(txn *badger.Txn) error {
prefix := incomingIndexPrefix(nodeID)
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
edgeID := extractEdgeIDFromIndexKey(it.Item().Key())
if edgeID == "" {
continue
}
// Get the edge
item, err := txn.Get(edgeKey(edgeID))
if err != nil {
continue
}
var edge *Edge
if err := item.Value(func(val []byte) error {
var decodeErr error
edge, decodeErr = decodeEdge(val)
return decodeErr
}); err != nil {
continue
}
edges = append(edges, edge)
}
return nil
})
if err != nil {
return nil, err
}
return edges, nil
}
// GetEdgesBetween returns all edges between two nodes.
func (b *BadgerEngine) GetEdgesBetween(startID, endID NodeID) ([]*Edge, error) {
if startID == "" || endID == "" {
return nil, ErrInvalidID
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return nil, ErrStorageClosed
}
b.mu.RUnlock()
outgoing, err := b.GetOutgoingEdges(startID)
if err != nil {
return nil, err
}
var result []*Edge
for _, edge := range outgoing {
if edge.EndNode == endID {
result = append(result, edge)
}
}
return result, nil
}
// GetEdgeBetween returns an edge between two nodes with the given type.
func (b *BadgerEngine) GetEdgeBetween(source, target NodeID, edgeType string) *Edge {
edges, err := b.GetEdgesBetween(source, target)
if err != nil {
return nil
}
for _, edge := range edges {
if edgeType == "" || edge.Type == edgeType {
return edge
}
}
return nil
}
// ============================================================================
// Bulk Operations
// ============================================================================
// BulkCreateNodes creates multiple nodes in a single transaction.
func (b *BadgerEngine) BulkCreateNodes(nodes []*Node) error {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
// Validate all nodes first
for _, node := range nodes {
if node == nil {
return ErrInvalidData
}
if node.ID == "" {
return ErrInvalidID
}
}
// Check unique constraints for all nodes BEFORE inserting any
for _, node := range nodes {
for _, label := range node.Labels {
for propName, propValue := range node.Properties {
if err := b.schema.CheckUniqueConstraint(label, propName, propValue, ""); err != nil {
return fmt.Errorf("constraint violation: %w", err)
}
}
}
}
err := b.db.Update(func(txn *badger.Txn) error {
// Check for duplicates
for _, node := range nodes {
_, err := txn.Get(nodeKey(node.ID))
if err == nil {
return ErrAlreadyExists
}
if err != badger.ErrKeyNotFound {
return err
}
}
// Insert all nodes
for _, node := range nodes {
data, err := encodeNode(node)
if err != nil {
return fmt.Errorf("failed to encode node: %w", err)
}
if err := txn.Set(nodeKey(node.ID), data); err != nil {
return err
}
for _, label := range node.Labels {
if err := txn.Set(labelIndexKey(label, node.ID), []byte{}); err != nil {
return err
}
}
}
return nil
})
// Register unique constraint values after successful bulk insert
if err == nil {
for _, node := range nodes {
for _, label := range node.Labels {
for propName, propValue := range node.Properties {
b.schema.RegisterUniqueValue(label, propName, propValue, node.ID)
}
}
}
}
return err
}
// BulkCreateEdges creates multiple edges in a single transaction.
func (b *BadgerEngine) BulkCreateEdges(edges []*Edge) error {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
// Validate all edges first
for _, edge := range edges {
if edge == nil {
return ErrInvalidData
}
if edge.ID == "" {
return ErrInvalidID
}
}
err := b.db.Update(func(txn *badger.Txn) error {
// Validate all edges
for _, edge := range edges {
// Check edge doesn't exist
_, err := txn.Get(edgeKey(edge.ID))
if err == nil {
return ErrAlreadyExists
}
if err != badger.ErrKeyNotFound {
return err
}
// Verify nodes exist
if _, err := txn.Get(nodeKey(edge.StartNode)); err == badger.ErrKeyNotFound {
return ErrNotFound
}
if _, err := txn.Get(nodeKey(edge.EndNode)); err == badger.ErrKeyNotFound {
return ErrNotFound
}
}
// Insert all edges
for _, edge := range edges {
data, err := encodeEdge(edge)
if err != nil {
return fmt.Errorf("failed to encode edge: %w", err)
}
if err := txn.Set(edgeKey(edge.ID), data); err != nil {
return err
}
if err := txn.Set(outgoingIndexKey(edge.StartNode, edge.ID), []byte{}); err != nil {
return err
}
if err := txn.Set(incomingIndexKey(edge.EndNode, edge.ID), []byte{}); err != nil {
return err
}
if err := txn.Set(edgeTypeIndexKey(edge.Type, edge.ID), []byte{}); err != nil {
return err
}
}
return nil
})
// Invalidate edge type cache on successful bulk create
if err == nil && len(edges) > 0 {
b.InvalidateEdgeTypeCache()
}
return err
}
// ============================================================================
// Degree Functions
// ============================================================================
// GetInDegree returns the number of incoming edges to a node.
func (b *BadgerEngine) GetInDegree(nodeID NodeID) int {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return 0
}
b.mu.RUnlock()
count := 0
_ = b.db.View(func(txn *badger.Txn) error {
prefix := incomingIndexPrefix(nodeID)
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
count++
}
return nil
})
return count
}
// GetOutDegree returns the number of outgoing edges from a node.
func (b *BadgerEngine) GetOutDegree(nodeID NodeID) int {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return 0
}
b.mu.RUnlock()
count := 0
_ = b.db.View(func(txn *badger.Txn) error {
prefix := outgoingIndexPrefix(nodeID)
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
count++
}
return nil
})
return count
}
// ============================================================================
// Stats and Lifecycle
// ============================================================================
// NodeCount returns the total number of nodes.
func (b *BadgerEngine) NodeCount() (int64, error) {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return 0, ErrStorageClosed
}
b.mu.RUnlock()
var count int64
err := b.db.View(func(txn *badger.Txn) error {
prefix := []byte{prefixNode}
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
count++
}
return nil
})
return count, err
}
// EdgeCount returns the total number of edges.
func (b *BadgerEngine) EdgeCount() (int64, error) {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return 0, ErrStorageClosed
}
b.mu.RUnlock()
var count int64
err := b.db.View(func(txn *badger.Txn) error {
prefix := []byte{prefixEdge}
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
count++
}
return nil
})
return count, err
}
// GetSchema returns the schema manager.
func (b *BadgerEngine) GetSchema() *SchemaManager {
return b.schema
}
// Close closes the BadgerDB database.
func (b *BadgerEngine) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return nil
}
b.closed = true
return b.db.Close()
}
// Sync forces a sync of all data to disk.
// This is useful for ensuring durability before a crash.
func (b *BadgerEngine) Sync() error {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
return b.db.Sync()
}
// RunGC runs garbage collection on the BadgerDB value log.
// Should be called periodically for long-running applications.
func (b *BadgerEngine) RunGC() error {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
return b.db.RunValueLogGC(0.5)
}
// Size returns the approximate size of the database in bytes.
func (b *BadgerEngine) Size() (lsm, vlog int64) {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return 0, 0
}
b.mu.RUnlock()
return b.db.Size()
}
// FindNodeNeedingEmbedding iterates through nodes and returns the first one
// without an embedding. Uses Badger's iterator to avoid loading all nodes.
func (b *BadgerEngine) FindNodeNeedingEmbedding() *Node {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return nil
}
b.mu.RUnlock()
var result *Node
scanned := 0
withEmbed := 0
internal := 0
b.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = true
opts.PrefetchSize = 10 // Small batch
it := txn.NewIterator(opts)
defer it.Close()
prefix := []byte{prefixNode}
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
scanned++
item := it.Item()
err := item.Value(func(val []byte) error {
node, err := decodeNode(val)
if err != nil {
return nil // Skip invalid nodes
}
// Skip internal nodes for stats
for _, label := range node.Labels {
if len(label) > 0 && label[0] == '_' {
internal++
return nil
}
}
// Track nodes with embeddings for stats
if len(node.Embedding) > 0 {
withEmbed++
return nil
}
// Use helper to check if node needs embedding
if !NodeNeedsEmbedding(node) {
return nil
}
// Found one that needs embedding
result = node
return ErrIterationStopped // Custom error to break iteration
})
if err == ErrIterationStopped {
break
}
}
return nil
})
// Only log when we find a node needing embedding (reduce log spam)
if result != nil {
fmt.Printf("🔍 Found node needing embedding (scanned %d, %d already embedded)\n",
scanned, withEmbed)
}
return result
}
// IterateNodes iterates through all nodes one at a time without loading all into memory.
// The callback returns true to continue, false to stop.
func (b *BadgerEngine) IterateNodes(fn func(*Node) bool) error {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
return b.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = true
opts.PrefetchSize = 10
it := txn.NewIterator(opts)
defer it.Close()
prefix := []byte{prefixNode}
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
var node *Node
err := item.Value(func(val []byte) error {
var decErr error
node, decErr = decodeNode(val)
return decErr
})
if err != nil {
continue // Skip invalid nodes
}
if !fn(node) {
break // Callback requested stop
}
}
return nil
})
}
// StreamNodes implements StreamingEngine.StreamNodes for memory-efficient iteration.
// Iterates through all nodes one at a time without loading all into memory.
func (b *BadgerEngine) StreamNodes(ctx context.Context, fn func(node *Node) error) error {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
return b.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = true
opts.PrefetchSize = 10
it := txn.NewIterator(opts)
defer it.Close()
prefix := []byte{prefixNode}
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
// Check context cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
item := it.Item()
var node *Node
err := item.Value(func(val []byte) error {
var decErr error
node, decErr = decodeNode(val)
return decErr
})
if err != nil {
continue // Skip invalid nodes
}
if err := fn(node); err != nil {
if err == ErrIterationStopped {
return nil // Normal stop
}
return err
}
}
return nil
})
}
// StreamEdges implements StreamingEngine.StreamEdges for memory-efficient iteration.
// Iterates through all edges one at a time without loading all into memory.
func (b *BadgerEngine) StreamEdges(ctx context.Context, fn func(edge *Edge) error) error {
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
return b.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = true
opts.PrefetchSize = 10
it := txn.NewIterator(opts)
defer it.Close()
prefix := []byte{prefixEdge}
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
// Check context cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
item := it.Item()
var edge *Edge
err := item.Value(func(val []byte) error {
var decErr error
edge, decErr = decodeEdge(val)
return decErr
})
if err != nil {
continue // Skip invalid edges
}
if err := fn(edge); err != nil {
if err == ErrIterationStopped {
return nil // Normal stop
}
return err
}
}
return nil
})
}
// StreamNodeChunks implements StreamingEngine.StreamNodeChunks for batch processing.
// Iterates through nodes in chunks, more efficient for batch operations.
func (b *BadgerEngine) StreamNodeChunks(ctx context.Context, chunkSize int, fn func(nodes []*Node) error) error {
if chunkSize <= 0 {
chunkSize = 1000
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return ErrStorageClosed
}
b.mu.RUnlock()
return b.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = true
opts.PrefetchSize = min(chunkSize, 100)
it := txn.NewIterator(opts)
defer it.Close()
chunk := make([]*Node, 0, chunkSize)
prefix := []byte{prefixNode}
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
// Check context cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
item := it.Item()
var node *Node
err := item.Value(func(val []byte) error {
var decErr error
node, decErr = decodeNode(val)
return decErr
})
if err != nil {
continue // Skip invalid nodes
}
chunk = append(chunk, node)
if len(chunk) >= chunkSize {
if err := fn(chunk); err != nil {
return err
}
// Reset chunk, reuse capacity
chunk = chunk[:0]
}
}
// Process remaining nodes
if len(chunk) > 0 {
if err := fn(chunk); err != nil {
return err
}
}
return nil
})
}
// ============================================================================
// Utility functions for compatibility
// ============================================================================
// HasPrefix checks if a byte slice has the given prefix.
func hasPrefix(s, prefix []byte) bool {
return len(s) >= len(prefix) && bytes.Equal(s[:len(prefix)], prefix)
}
// ClearAllEmbeddings removes embeddings from all nodes, allowing them to be regenerated.
// Returns the number of nodes that had their embeddings cleared.
func (b *BadgerEngine) ClearAllEmbeddings() (int, error) {
b.mu.Lock()
if b.closed {
b.mu.Unlock()
return 0, ErrStorageClosed
}
b.mu.Unlock()
cleared := 0
// First, collect all node IDs that have embeddings
var nodeIDs []NodeID
err := b.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = true
opts.PrefetchSize = 100
it := txn.NewIterator(opts)
defer it.Close()
prefix := []byte{prefixNode}
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
err := item.Value(func(val []byte) error {
node, err := decodeNode(val)
if err != nil {
return nil // Skip invalid nodes
}
if len(node.Embedding) > 0 {
nodeIDs = append(nodeIDs, node.ID)
}
return nil
})
if err != nil {
return err
}
}
return nil
})
if err != nil {
return 0, fmt.Errorf("error scanning nodes: %w", err)
}
// Now update each node to clear its embedding
for _, id := range nodeIDs {
node, err := b.GetNode(id)
if err != nil {
continue // Skip if node no longer exists
}
node.Embedding = nil
if err := b.UpdateNode(node); err != nil {
log.Printf("Warning: failed to clear embedding for node %s: %v", id, err)
continue
}
cleared++
}
log.Printf("✓ Cleared embeddings from %d nodes", cleared)
return cleared, nil
}
// Backup creates a backup of the database to the specified file path.
// Uses BadgerDB's streaming backup which creates a consistent snapshot.
// The backup file is a self-contained, portable copy of the database.
func (b *BadgerEngine) Backup(path string) error {
b.mu.RLock()
defer b.mu.RUnlock()
if b.closed {
return ErrStorageClosed
}
// Create backup file
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("failed to create backup file: %w", err)
}
defer f.Close()
// Use BufferedWriter for better performance
buf := bufio.NewWriterSize(f, 16*1024*1024) // 16MB buffer
// Stream backup (since=0 means full backup)
_, err = b.db.Backup(buf, 0)
if err != nil {
return fmt.Errorf("backup failed: %w", err)
}
// Flush buffer
if err := buf.Flush(); err != nil {
return fmt.Errorf("failed to flush backup: %w", err)
}
// Sync to disk
if err := f.Sync(); err != nil {
return fmt.Errorf("failed to sync backup: %w", err)
}
return nil
}
// Verify BadgerEngine implements Engine interface
var _ Engine = (*BadgerEngine)(nil)