Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
gpu.go44.1 kB
// Package gpu provides optional GPU acceleration for NornicDB vector operations. // // This package implements GPU-accelerated vector similarity search using OpenCL, // CUDA, Metal, and Vulkan compute backends. The design is optimized for the // common case: fast vector search with minimal memory overhead. // // Architecture (Simplified & Focused): // - GPU VRAM stores ONLY embeddings as contiguous float32 arrays // - CPU RAM stores nodeID mappings and all other graph data // - Vector queries are offloaded to GPU for parallel computation // - Results (nodeID indices) are returned to CPU for graph operations // - No complex graph algorithms on GPU (CPU is better for traversal) // // Performance Benefits: // - 10-100x speedup for vector similarity search // - Parallel cosine similarity computation // - Efficient batch operations // - Reduced CPU load for embedding-heavy workloads // // Memory Usage (1024-dim float32 embeddings): // - 100K nodes ≈ 400MB VRAM (100K × 1024 × 4 bytes) // - 500K nodes ≈ 2GB VRAM // - 1M nodes ≈ 4GB VRAM // - 10M nodes ≈ 40GB VRAM (requires high-end GPU) // // Example Usage: // // // Initialize GPU manager // config := gpu.DefaultConfig() // config.Enabled = true // config.PreferredBackend = gpu.BackendOpenCL // config.MaxMemoryMB = 8192 // 8GB limit // // manager, err := gpu.NewManager(config) // if err != nil { // log.Printf("GPU not available: %v", err) // // Fall back to CPU-only mode // } // // // Create embedding index // indexConfig := gpu.DefaultEmbeddingIndexConfig(1024) // 1024 dimensions // index := gpu.NewEmbeddingIndex(manager, indexConfig) // // // Add embeddings // embedding := make([]float32, 1024) // // ... populate embedding ... // index.Add("node-123", embedding) // // // Batch add for efficiency // nodeIDs := []string{"node-1", "node-2", "node-3"} // embeddings := [][]float32{emb1, emb2, emb3} // index.AddBatch(nodeIDs, embeddings) // // // Sync to GPU for acceleration // if err := index.SyncToGPU(); err != nil { // log.Printf("GPU sync failed: %v", err) // } // // // Perform similarity search // query := make([]float32, 1024) // // ... populate query embedding ... // results, err := index.Search(query, 10) // Top 10 similar // if err != nil { // log.Fatal(err) // } // // for _, result := range results { // fmt.Printf("Node %s: similarity %.3f\n", result.ID, result.Score) // } // // // Check performance stats // stats := index.Stats() // fmt.Printf("GPU searches: %d, CPU fallbacks: %d\n", // stats.SearchesGPU, stats.SearchesCPU) // // Supported Backends: // // 1. **OpenCL** (Cross-platform): // - Works with NVIDIA, AMD, Intel GPUs // - Best compatibility across hardware // - Good performance for most workloads // // 2. **CUDA** (NVIDIA only): // - Highest performance on NVIDIA GPUs // - Requires CUDA toolkit installation // - Best for production NVIDIA deployments // // 3. **Metal** (Apple Silicon): // - Native acceleration on M1/M2/M3 Macs // - Excellent performance and power efficiency // - Automatic on macOS with Apple Silicon // // 4. **Vulkan** (Cross-platform): // - Modern compute API // - Good performance across vendors // - Future-proof choice // // Performance Characteristics: // // Vector Search (1024-dim, cosine similarity): // - CPU (single-thread): ~1K vectors/sec // - CPU (multi-thread): ~10K vectors/sec // - GPU (mid-range): ~100K-1M vectors/sec // - GPU (high-end): ~1M-10M vectors/sec // // Memory Bandwidth: // - System RAM: ~50-100 GB/s // - GPU VRAM: ~500-1000 GB/s (10x faster) // - This is why GPU excels at vector operations // // When to Use GPU: // // ✅ Large embedding collections (>10K vectors) // ✅ Frequent similarity searches // ✅ Batch processing workloads // ✅ Real-time recommendation systems // ❌ Small datasets (<1K vectors) // ❌ Infrequent searches // ❌ Memory-constrained environments // // ELI12 (Explain Like I'm 12): // // Think of your computer like a kitchen: // // 1. **CPU = Chef**: Really smart, can do complex recipes (graph traversal), // but can only work on one thing at a time. // // 2. **GPU = Assembly line**: Not as smart as the chef, but can do simple // tasks (vector math) REALLY fast with hundreds of workers in parallel. // // 3. **Vector search**: Like comparing the "taste" of 1 million dishes to // find the 10 most similar. The chef would take forever doing this one by // one, but the assembly line can compare them all at the same time! // // 4. **Memory**: The assembly line has its own super-fast ingredients storage // (VRAM) that's much faster than the main kitchen storage (RAM). // // So we use the assembly line for the repetitive math work, then send the // results back to the chef for the complex decision-making! package gpu import ( "encoding/binary" "errors" "runtime" "sync" "sync/atomic" "time" "unsafe" "github.com/orneryd/nornicdb/pkg/gpu/cuda" "github.com/orneryd/nornicdb/pkg/gpu/metal" ) // Errors var ( ErrGPUNotAvailable = errors.New("gpu: no compatible GPU found") ErrGPUDisabled = errors.New("gpu: acceleration disabled") ErrOutOfMemory = errors.New("gpu: out of GPU memory") ErrKernelFailed = errors.New("gpu: kernel execution failed") ErrDataTooLarge = errors.New("gpu: data exceeds GPU memory") ErrInvalidDimensions = errors.New("gpu: vector dimension mismatch") ) // Backend represents the GPU compute backend. type Backend string const ( BackendNone Backend = "none" // CPU fallback BackendOpenCL Backend = "opencl" // Cross-platform (AMD + NVIDIA) BackendCUDA Backend = "cuda" // NVIDIA only BackendMetal Backend = "metal" // Apple Silicon BackendVulkan Backend = "vulkan" // Cross-platform compute ) // Config holds GPU acceleration configuration options. // // The configuration allows fine-tuning of GPU usage, memory limits, // and fallback behavior. All settings have sensible defaults. // // Example: // // // Production configuration // config := &gpu.Config{ // Enabled: true, // PreferredBackend: gpu.BackendOpenCL, // MaxMemoryMB: 8192, // 8GB limit // BatchSize: 50000, // Larger batches for throughput // SyncInterval: 50 * time.Millisecond, // Faster sync // FallbackOnError: true, // Always fall back to CPU // DeviceID: 0, // Use first GPU // } // // // Development configuration // config = gpu.DefaultConfig() // config.Enabled = false // Disable for development type Config struct { // Enabled toggles GPU acceleration on/off Enabled bool // PreferredBackend selects compute backend (auto-detected if empty) PreferredBackend Backend // MaxMemoryMB limits GPU memory usage (0 = use 80% of available) MaxMemoryMB int // BatchSize for bulk operations BatchSize int // SyncInterval for async GPU->CPU sync SyncInterval time.Duration // FallbackOnError falls back to CPU on GPU errors FallbackOnError bool // DeviceID selects specific GPU (for multi-GPU systems) DeviceID int } // DefaultConfig returns sensible defaults for GPU acceleration. // // The defaults are conservative and prioritize stability over performance: // - GPU disabled by default (must opt-in) // - Automatic backend detection // - 80% of available GPU memory // - Medium batch sizes // - CPU fallback enabled // // Example: // // config := gpu.DefaultConfig() // config.Enabled = true // Enable GPU acceleration // manager, err := gpu.NewManager(config) func DefaultConfig() *Config { return &Config{ Enabled: false, // Disabled by default, must opt-in PreferredBackend: BackendNone, MaxMemoryMB: 0, // Auto-detect BatchSize: 10000, SyncInterval: 100 * time.Millisecond, FallbackOnError: true, DeviceID: 0, } } // DeviceInfo contains information about a GPU device. type DeviceInfo struct { ID int Name string Vendor string Backend Backend MemoryMB int ComputeUnits int MaxWorkGroup int Available bool } // Manager handles GPU resources and operations for vector acceleration. // // The Manager provides a simplified interface focused on vector similarity // search. It handles device detection, memory management, and fallback to // CPU when GPU is unavailable or encounters errors. // // Key responsibilities: // - GPU device detection and initialization // - Memory allocation and tracking // - Performance statistics // - Graceful fallback to CPU operations // // Example: // // config := gpu.DefaultConfig() // config.Enabled = true // // manager, err := gpu.NewManager(config) // if err != nil { // log.Printf("GPU unavailable: %v", err) // return // Use CPU-only mode // } // // if manager.IsEnabled() { // device := manager.Device() // fmt.Printf("Using GPU: %s (%s)\n", device.Name, device.Backend) // fmt.Printf("Memory: %d MB\n", device.MemoryMB) // } // // // Check usage periodically // stats := manager.Stats() // fmt.Printf("GPU operations: %d, CPU fallbacks: %d\n", // stats.OperationsGPU, stats.FallbackCount) // // Thread Safety: // // All methods are thread-safe and can be called concurrently. type Manager struct { config *Config device *DeviceInfo enabled atomic.Bool mu sync.RWMutex // Memory management (simplified) allocatedMB int // Stats stats Stats } // Stats tracks GPU usage statistics. type Stats struct { OperationsGPU int64 OperationsCPU int64 BytesTransferred int64 KernelExecutions int64 FallbackCount int64 AverageKernelTimeNs int64 } // NewManager creates a new GPU manager with the given configuration. // // The manager attempts to detect and initialize a compatible GPU device. // If GPU is disabled in config or no compatible device is found, the // manager operates in CPU-only mode. // // Parameters: // - config: GPU configuration (uses DefaultConfig() if nil) // // Returns: // - Manager instance (always succeeds if FallbackOnError=true) // - Error if GPU required but unavailable // // Example: // // // Try to use GPU, fall back to CPU // config := gpu.DefaultConfig() // config.Enabled = true // config.FallbackOnError = true // // manager, err := gpu.NewManager(config) // if err != nil { // log.Fatal(err) // Should not happen with fallback enabled // } // // if manager.IsEnabled() { // fmt.Println("GPU acceleration active") // } else { // fmt.Println("Using CPU-only mode") // } // // Device Detection: // // The manager tries backends in order: Preferred -> OpenCL -> CUDA -> Vulkan -> Metal func NewManager(config *Config) (*Manager, error) { if config == nil { config = DefaultConfig() } m := &Manager{ config: config, } if config.Enabled { device, err := detectGPU(config) if err != nil { if config.FallbackOnError { // Fall back to CPU mode m.enabled.Store(false) return m, nil } return nil, err } m.device = device m.enabled.Store(true) } return m, nil } // detectGPU attempts to find a compatible GPU. func detectGPU(config *Config) (*DeviceInfo, error) { // Build list of backends to try based on platform and preference var backends []Backend // Add preferred backend first if config.PreferredBackend != BackendNone { backends = append(backends, config.PreferredBackend) } // Add platform-appropriate backends switch runtime.GOOS { case "darwin": // Metal is the best choice on macOS/iOS backends = append(backends, BackendMetal) case "linux", "windows": // Try OpenCL and CUDA on Linux/Windows backends = append(backends, BackendOpenCL, BackendCUDA, BackendVulkan) default: backends = append(backends, BackendOpenCL, BackendVulkan) } for _, backend := range backends { if backend == BackendNone { continue } device, err := probeBackend(backend, config.DeviceID) if err == nil && device != nil { return device, nil } } return nil, ErrGPUNotAvailable } // probeBackend checks if a specific backend is available. // Implements actual GPU detection for supported backends. func probeBackend(backend Backend, deviceID int) (*DeviceInfo, error) { switch backend { case BackendMetal: return probeMetal(deviceID) case BackendCUDA: return probeCUDA(deviceID) case BackendOpenCL: // TODO: Implement OpenCL detection return nil, ErrGPUNotAvailable case BackendVulkan: // TODO: Implement Vulkan detection return nil, ErrGPUNotAvailable default: return nil, ErrGPUNotAvailable } } // probeCUDA checks for NVIDIA CUDA GPU availability. func probeCUDA(deviceID int) (*DeviceInfo, error) { if runtime.GOOS != "linux" && runtime.GOOS != "windows" { return nil, ErrGPUNotAvailable } // Check if GPU hardware is present (via nvidia-smi) if cuda.HasGPUHardware() && !cuda.IsCUDACapable() { // GPU detected but binary not built with CUDA support return &DeviceInfo{ ID: 0, Name: cuda.GPUName() + " (CUDA disabled - CPU fallback)", Vendor: "NVIDIA", Backend: BackendNone, // Can't use CUDA ops, but acknowledge GPU exists MemoryMB: cuda.GPUMemoryMB(), ComputeUnits: 0, MaxWorkGroup: 0, Available: false, // Not available for GPU compute }, nil } if !cuda.IsAvailable() { return nil, ErrGPUNotAvailable } deviceCount := cuda.DeviceCount() if deviceCount == 0 { return nil, ErrGPUNotAvailable } // Use specified device or first available if deviceID < 0 || deviceID >= deviceCount { deviceID = 0 } device, err := cuda.NewDevice(deviceID) if err != nil { return nil, err } defer device.Release() ccMajor, ccMinor := device.ComputeCapability() return &DeviceInfo{ ID: deviceID, Name: device.Name(), Vendor: "NVIDIA", Backend: BackendCUDA, MemoryMB: device.MemoryMB(), ComputeUnits: ccMajor*10 + ccMinor, // Store compute capability MaxWorkGroup: 1024, // Typical CUDA max threads per block Available: true, }, nil } // probeMetal checks for Metal GPU availability (macOS/iOS only). func probeMetal(deviceID int) (*DeviceInfo, error) { if runtime.GOOS != "darwin" { return nil, ErrGPUNotAvailable } if !metal.IsAvailable() { return nil, ErrGPUNotAvailable } device, err := metal.NewDevice() if err != nil { return nil, err } // Get device info info := &DeviceInfo{ ID: deviceID, Name: device.Name(), Vendor: "Apple", Backend: BackendMetal, MemoryMB: device.MemoryMB(), ComputeUnits: 0, // Not exposed by Metal API MaxWorkGroup: 256, Available: true, } // Release the test device (will be recreated when needed) device.Release() return info, nil } // IsEnabled returns whether GPU acceleration is active. func (m *Manager) IsEnabled() bool { return m.enabled.Load() } // Enable activates GPU acceleration. func (m *Manager) Enable() error { if m.device == nil { device, err := detectGPU(m.config) if err != nil { return err } m.device = device } m.enabled.Store(true) return nil } // Disable deactivates GPU acceleration. func (m *Manager) Disable() { m.enabled.Store(false) } // Device returns current GPU device info. func (m *Manager) Device() *DeviceInfo { return m.device } // Stats returns GPU usage statistics. func (m *Manager) Stats() Stats { m.mu.RLock() defer m.mu.RUnlock() return m.stats } // AllocatedMemoryMB returns current GPU memory usage. func (m *Manager) AllocatedMemoryMB() int { m.mu.RLock() defer m.mu.RUnlock() return m.allocatedMB } // VectorIndex provides GPU-accelerated vector operations. // Legacy implementation - use EmbeddingIndex for production. type VectorIndex struct { manager *Manager dimensions int vectors [][]float32 // CPU fallback storage ids []string mu sync.RWMutex gpuBuffer unsafe.Pointer // Native GPU buffer handle } // NewVectorIndex creates a GPU-accelerated vector index. func NewVectorIndex(manager *Manager, dimensions int) *VectorIndex { return &VectorIndex{ manager: manager, dimensions: dimensions, vectors: make([][]float32, 0), ids: make([]string, 0), } } // Add inserts a vector into the index. func (vi *VectorIndex) Add(id string, vector []float32) error { if len(vector) != vi.dimensions { return ErrInvalidDimensions } vi.mu.Lock() defer vi.mu.Unlock() vi.ids = append(vi.ids, id) vi.vectors = append(vi.vectors, vector) // TODO: Upload to GPU if enabled return nil } // Search finds the k nearest neighbors. func (vi *VectorIndex) Search(query []float32, k int) ([]SearchResult, error) { if len(query) != vi.dimensions { return nil, ErrInvalidDimensions } vi.mu.RLock() defer vi.mu.RUnlock() if vi.manager.IsEnabled() { return vi.searchGPU(query, k) } return vi.searchCPU(query, k) } // SearchResult holds a search result. type SearchResult struct { ID string Score float32 Distance float32 } // searchGPU performs GPU-accelerated similarity search. func (vi *VectorIndex) searchGPU(query []float32, k int) ([]SearchResult, error) { // TODO: Implement actual GPU kernel execution // For now, fall back to CPU atomic.AddInt64(&vi.manager.stats.FallbackCount, 1) return vi.searchCPU(query, k) } // searchCPU performs CPU-based similarity search. func (vi *VectorIndex) searchCPU(query []float32, k int) ([]SearchResult, error) { atomic.AddInt64(&vi.manager.stats.OperationsCPU, 1) if len(vi.vectors) == 0 { return nil, nil } // Calculate all similarities type scored struct { id string score float32 } scores := make([]scored, len(vi.vectors)) for i, vec := range vi.vectors { scores[i] = scored{ id: vi.ids[i], score: cosineSimilarity(query, vec), } } // Sort by score (descending) for i := 0; i < len(scores)-1; i++ { for j := i + 1; j < len(scores); j++ { if scores[j].score > scores[i].score { scores[i], scores[j] = scores[j], scores[i] } } } // Take top k if k > len(scores) { k = len(scores) } results := make([]SearchResult, k) for i := 0; i < k; i++ { results[i] = SearchResult{ ID: scores[i].id, Score: scores[i].score, Distance: 1 - scores[i].score, } } return results, nil } // cosineSimilarity calculates cosine similarity between two vectors. func cosineSimilarity(a, b []float32) float32 { if len(a) != len(b) { return 0 } var dot, normA, normB float32 for i := range a { dot += a[i] * b[i] normA += a[i] * a[i] normB += b[i] * b[i] } if normA == 0 || normB == 0 { return 0 } return dot / (sqrt32(normA) * sqrt32(normB)) } // sqrt32 computes square root for float32. func sqrt32(x float32) float32 { if x <= 0 { return 0 } // Newton's method z := x for i := 0; i < 10; i++ { z = (z + x/z) / 2 } return z } // ============================================================================= // REMOVED: TransactionBuffer and GraphAccelerator // ============================================================================= // These were removed because they provided no actual GPU benefit: // // TransactionBuffer: Just a map wrapper with no GPU usage // - Could be replaced with simple buffering in application layer // - No GPU computation or memory transfer benefit // // GraphAccelerator: All methods were TODOs with CPU fallbacks // - BFS and PageRank unimplemented on GPU // - Complex to implement, low ROI vs EmbeddingIndex // - Graph traversal benefits more from CPU cache locality // // Future: If GPU graph algorithms are needed, implement as separate package // focused specifically on that use case with proper OpenCL/CUDA kernels. // ListDevices returns all available GPU devices. func ListDevices() ([]DeviceInfo, error) { // TODO: Implement actual device enumeration return nil, ErrGPUNotAvailable } // BenchmarkDevice runs a simple benchmark on a GPU. func BenchmarkDevice(deviceID int) (*BenchmarkResult, error) { // TODO: Implement benchmark return nil, ErrGPUNotAvailable } // BenchmarkResult holds GPU benchmark results. type BenchmarkResult struct { DeviceID int VectorOpsPerSec int64 MemoryBandwidthGB float64 LatencyUs int64 } // ============================================================================= // EmbeddingIndex - Optimized GPU Vector Search // ============================================================================= // This is the core GPU acceleration feature. It stores ONLY embeddings in GPU // VRAM, with nodeID mapping on CPU side. This is the minimal optimal design. // // MEMORY LAYOUT (Optimized for GPU efficiency): // // GPU VRAM (contiguous float32 array - NO STRINGS): // [vec0[0], vec0[1], ..., vec0[D-1], vec1[0], vec1[1], ..., vec1[D-1], ...] // Pure float32 data, optimal for parallel computation // // CPU RAM (nodeID mapping): // nodeIDs[0] = "node-123" -> corresponds to vec0 in GPU // nodeIDs[1] = "node-456" -> corresponds to vec1 in GPU // nodeIDs[i] = "node-XXX" -> corresponds to vec_i in GPU // // SEARCH FLOW: // 1. Upload query vector to GPU (single float32 array) // 2. GPU computes cosine similarity for ALL embeddings in parallel // 3. GPU returns top-k indices: [5, 12, 3, ...] // 4. CPU maps indices to nodeIDs: ["node-456", "node-789", "node-234", ...] // // MEMORY EFFICIENCY: // - GPU: Only stores float32 vectors (4 bytes × dimensions × count) // - CPU: Only stores string references (minimal overhead ~32 bytes/node) // - NO redundant data in GPU (no node properties, labels, edges) // - Total: ~4GB GPU for 1M nodes @ 1024 dims // EmbeddingIndex provides GPU-accelerated vector similarity search. // // This is the core GPU acceleration feature. It stores embeddings in GPU VRAM // as contiguous float32 arrays for optimal parallel processing, while keeping // nodeID mappings and metadata on CPU. // // Memory Layout (Optimized for GPU): // // GPU VRAM (contiguous float32 array): // // [vec0[0], vec0[1], ..., vec0[D-1], vec1[0], vec1[1], ..., vec1[D-1], ...] // Pure numerical data, perfect for SIMD/parallel computation // // CPU RAM (nodeID mapping): // // nodeIDs[0] = "node-123" -> corresponds to vec0 in GPU // nodeIDs[1] = "node-456" -> corresponds to vec1 in GPU // idToIndex["node-123"] = 0 -> fast lookup // // Search Flow: // 1. Upload query vector to GPU (single float32 array) // 2. GPU computes cosine similarity for ALL embeddings in parallel // 3. GPU performs parallel reduction to find top-k indices // 4. CPU maps indices back to nodeIDs: [5, 12, 3] -> ["node-456", "node-789", "node-234"] // // Performance: // - CPU (1M vectors): ~1-10 seconds // - GPU (1M vectors): ~10-100 milliseconds (10-100x speedup) // - Memory bandwidth: GPU VRAM ~10x faster than system RAM // // Example: // // // Create index // config := gpu.DefaultEmbeddingIndexConfig(1024) // index := gpu.NewEmbeddingIndex(manager, config) // // // Add embeddings (CPU side) // for i, nodeID := range nodeIDs { // index.Add(nodeID, embeddings[i]) // } // // // Sync to GPU for acceleration // if err := index.SyncToGPU(); err != nil { // log.Printf("GPU sync failed, using CPU: %v", err) // } // // // Fast similarity search // results, err := index.Search(queryEmbedding, 10) // if err != nil { // log.Fatal(err) // } // // // Results are automatically sorted by similarity (descending) // for i, result := range results { // fmt.Printf("%d. %s (%.3f similarity)\n", // i+1, result.ID, result.Score) // } // // Memory Efficiency: // - Only embeddings stored in GPU (no strings, metadata, properties) // - Contiguous layout maximizes memory bandwidth // - CPU overhead: ~32 bytes per nodeID (string + map entry) // - GPU overhead: dimensions × 4 bytes per embedding // // Thread Safety: // // All methods are thread-safe. Concurrent searches are supported. type EmbeddingIndex struct { manager *Manager dimensions int // CPU-side index mapping (NEVER transferred to GPU) nodeIDs []string // nodeIDs[i] corresponds to embedding at GPU position i idToIndex map[string]int // Fast lookup: nodeID -> GPU position // CPU fallback storage (used when GPU disabled or for reference) cpuVectors []float32 // Flat array: [vec0..., vec1..., vec2...] // GPU storage (ONLY embeddings, no strings or metadata) gpuBuffer unsafe.Pointer // Native GPU buffer handle (legacy) metalBuffer *metal.Buffer // Metal GPU buffer (macOS) metalDevice *metal.Device // Metal device reference cudaBuffer *cuda.Buffer // CUDA GPU buffer (NVIDIA) cudaDevice *cuda.Device // CUDA device reference gpuAllocated int // Bytes allocated on GPU (dimensions × count × 4) gpuCapacity int // Max embeddings before realloc needed gpuSynced bool // Is GPU in sync with CPU? // Stats searchesGPU int64 searchesCPU int64 uploadsCount int64 uploadBytes int64 mu sync.RWMutex } // EmbeddingIndexConfig configures the embedding index. type EmbeddingIndexConfig struct { Dimensions int // Embedding dimensions (e.g., 1024) InitialCap int // Initial capacity (number of embeddings) GPUEnabled bool // Use GPU if available AutoSync bool // Auto-sync to GPU on Add BatchThreshold int // Batch size before GPU sync } // DefaultEmbeddingIndexConfig returns sensible defaults. func DefaultEmbeddingIndexConfig(dimensions int) *EmbeddingIndexConfig { return &EmbeddingIndexConfig{ Dimensions: dimensions, InitialCap: 10000, GPUEnabled: true, AutoSync: true, BatchThreshold: 1000, } } // NewEmbeddingIndex creates a new GPU-accelerated embedding index. // // The index is created in CPU memory initially. Call SyncToGPU() to upload // embeddings to GPU for acceleration. The index gracefully falls back to // CPU computation when GPU is unavailable. // // Parameters: // - manager: GPU manager (can be nil for CPU-only mode) // - config: Index configuration (uses defaults if nil) // // Returns: // - EmbeddingIndex ready for use // // Example: // // // Create with custom config // config := &gpu.EmbeddingIndexConfig{ // Dimensions: 1024, // InitialCap: 100000, // Pre-allocate for 100K embeddings // GPUEnabled: true, // AutoSync: false, // Manual sync control // BatchThreshold: 5000, // Sync every 5K additions // } // index := gpu.NewEmbeddingIndex(manager, config) // // // Or use defaults // index = gpu.NewEmbeddingIndex(manager, nil) // // Memory Pre-allocation: // // Setting InitialCap avoids repeated memory allocations during bulk loading. func NewEmbeddingIndex(manager *Manager, config *EmbeddingIndexConfig) *EmbeddingIndex { if config == nil { config = DefaultEmbeddingIndexConfig(1024) } return &EmbeddingIndex{ manager: manager, dimensions: config.Dimensions, nodeIDs: make([]string, 0, config.InitialCap), idToIndex: make(map[string]int, config.InitialCap), cpuVectors: make([]float32, 0, config.InitialCap*config.Dimensions), gpuCapacity: config.InitialCap, } } // Add inserts or updates an embedding for a node. // // The embedding is stored in CPU memory and the GPU sync flag is cleared. // Call SyncToGPU() to upload changes to GPU for acceleration. // // Parameters: // - nodeID: Unique identifier for the node // - embedding: Vector embedding (must match index dimensions) // // Returns: // - ErrInvalidDimensions if embedding size doesn't match // // Example: // // // Add single embedding // embedding := make([]float32, 1024) // // ... populate embedding from model ... // err := index.Add("user-123", embedding) // if err != nil { // log.Fatal(err) // } // // // Update existing embedding // newEmbedding := make([]float32, 1024) // // ... compute updated embedding ... // index.Add("user-123", newEmbedding) // Overwrites previous // // Performance: // - O(1) for new insertions // - O(1) for updates (overwrites in-place) // - Thread-safe (uses mutex) // // Memory: // - Embedding is copied (safe to modify original after Add) // - GPU sync is deferred until SyncToGPU() is called func (ei *EmbeddingIndex) Add(nodeID string, embedding []float32) error { if len(embedding) != ei.dimensions { return ErrInvalidDimensions } ei.mu.Lock() defer ei.mu.Unlock() if idx, exists := ei.idToIndex[nodeID]; exists { // Update existing embedding copy(ei.cpuVectors[idx*ei.dimensions:], embedding) } else { // Add new embedding ei.nodeIDs = append(ei.nodeIDs, nodeID) ei.idToIndex[nodeID] = len(ei.nodeIDs) - 1 ei.cpuVectors = append(ei.cpuVectors, embedding...) } ei.gpuSynced = false return nil } // AddBatch inserts multiple embeddings efficiently. func (ei *EmbeddingIndex) AddBatch(nodeIDs []string, embeddings [][]float32) error { if len(nodeIDs) != len(embeddings) { return errors.New("gpu: nodeIDs and embeddings length mismatch") } ei.mu.Lock() defer ei.mu.Unlock() for i, nodeID := range nodeIDs { if len(embeddings[i]) != ei.dimensions { return ErrInvalidDimensions } if idx, exists := ei.idToIndex[nodeID]; exists { copy(ei.cpuVectors[idx*ei.dimensions:], embeddings[i]) } else { ei.nodeIDs = append(ei.nodeIDs, nodeID) ei.idToIndex[nodeID] = len(ei.nodeIDs) - 1 ei.cpuVectors = append(ei.cpuVectors, embeddings[i]...) } } ei.gpuSynced = false return nil } // Remove deletes an embedding from the index. func (ei *EmbeddingIndex) Remove(nodeID string) bool { ei.mu.Lock() defer ei.mu.Unlock() idx, exists := ei.idToIndex[nodeID] if !exists { return false } // Swap with last element for O(1) removal lastIdx := len(ei.nodeIDs) - 1 if idx != lastIdx { lastNodeID := ei.nodeIDs[lastIdx] ei.nodeIDs[idx] = lastNodeID ei.idToIndex[lastNodeID] = idx // Copy last embedding to removed position srcStart := lastIdx * ei.dimensions dstStart := idx * ei.dimensions copy(ei.cpuVectors[dstStart:dstStart+ei.dimensions], ei.cpuVectors[srcStart:srcStart+ei.dimensions]) } // Truncate ei.nodeIDs = ei.nodeIDs[:lastIdx] ei.cpuVectors = ei.cpuVectors[:lastIdx*ei.dimensions] delete(ei.idToIndex, nodeID) ei.gpuSynced = false return true } // Search finds the k most similar embeddings to the query vector. // // The search automatically uses GPU acceleration if available and synced, // otherwise falls back to optimized CPU computation. Results are sorted // by similarity score in descending order. // // Parameters: // - query: Query embedding vector (must match index dimensions) // - k: Number of most similar results to return // // Returns: // - SearchResult slice with nodeIDs and similarity scores // - ErrInvalidDimensions if query size doesn't match // // Example: // // // Search for similar items // queryEmbedding := getEmbedding("search query") // results, err := index.Search(queryEmbedding, 10) // if err != nil { // log.Fatal(err) // } // // // Process results (sorted by similarity) // for i, result := range results { // fmt.Printf("%d. %s (similarity: %.3f, distance: %.3f)\n", // i+1, result.ID, result.Score, result.Distance) // } // // // Check if GPU was used // stats := index.Stats() // if stats.SearchesGPU > stats.SearchesCPU { // fmt.Println("GPU acceleration is working!") // } // // Performance: // - CPU: O(n×d) where n=embeddings, d=dimensions // - GPU: O(d) with massive parallelization // - Typical speedup: 10-100x for large datasets // // Similarity Metric: // // Uses cosine similarity: score = dot(a,b) / (||a|| × ||b||) // Range: [-1, 1] where 1 = identical, 0 = orthogonal, -1 = opposite func (ei *EmbeddingIndex) Search(query []float32, k int) ([]SearchResult, error) { if len(query) != ei.dimensions { return nil, ErrInvalidDimensions } ei.mu.RLock() defer ei.mu.RUnlock() if len(ei.nodeIDs) == 0 { return nil, nil } // Use GPU if enabled and synced if ei.manager.IsEnabled() && ei.gpuSynced { return ei.searchGPU(query, k) } return ei.searchCPU(query, k) } // searchGPU performs similarity search on GPU. func (ei *EmbeddingIndex) searchGPU(query []float32, k int) ([]SearchResult, error) { atomic.AddInt64(&ei.searchesGPU, 1) // Determine which backend to use if ei.manager.device != nil { switch ei.manager.device.Backend { case BackendMetal: return ei.searchMetal(query, k) case BackendCUDA: return ei.searchCUDA(query, k) } } // Fallback to CPU if no GPU backend available return ei.searchCPU(query, k) } // searchCUDA performs similarity search using NVIDIA CUDA GPU. func (ei *EmbeddingIndex) searchCUDA(query []float32, k int) ([]SearchResult, error) { if ei.cudaBuffer == nil || ei.cudaDevice == nil { // Fall back to CPU if CUDA not initialized return ei.searchCPU(query, k) } n := uint32(len(ei.nodeIDs)) if k > int(n) { k = int(n) } // Perform GPU search using CUDA results, err := ei.cudaDevice.Search( ei.cudaBuffer, query, n, uint32(ei.dimensions), k, true, // vectors are normalized ) if err != nil { // Fall back to CPU on GPU error atomic.AddInt64(&ei.manager.stats.FallbackCount, 1) return ei.searchCPU(query, k) } // Update stats atomic.AddInt64(&ei.manager.stats.OperationsGPU, 1) atomic.AddInt64(&ei.manager.stats.KernelExecutions, 2) // similarity + topk // Convert CUDA results to SearchResult with nodeIDs output := make([]SearchResult, len(results)) for i, r := range results { if int(r.Index) < len(ei.nodeIDs) { output[i] = SearchResult{ ID: ei.nodeIDs[r.Index], Score: r.Score, Distance: 1 - r.Score, } } } return output, nil } // searchMetal performs similarity search using Metal GPU. func (ei *EmbeddingIndex) searchMetal(query []float32, k int) ([]SearchResult, error) { if ei.metalBuffer == nil || ei.metalDevice == nil { // Fall back to CPU if Metal not initialized return ei.searchCPU(query, k) } n := uint32(len(ei.nodeIDs)) if k > int(n) { k = int(n) } // Perform GPU search results, err := ei.metalDevice.Search( ei.metalBuffer, query, n, uint32(ei.dimensions), k, true, // vectors are normalized ) if err != nil { // Fall back to CPU on GPU error atomic.AddInt64(&ei.manager.stats.FallbackCount, 1) return ei.searchCPU(query, k) } // Update stats atomic.AddInt64(&ei.manager.stats.OperationsGPU, 1) atomic.AddInt64(&ei.manager.stats.KernelExecutions, 2) // similarity + topk // Convert Metal results to SearchResult with nodeIDs output := make([]SearchResult, len(results)) for i, r := range results { if int(r.Index) < len(ei.nodeIDs) { output[i] = SearchResult{ ID: ei.nodeIDs[r.Index], Score: r.Score, Distance: 1 - r.Score, } } } return output, nil } // searchCPU performs similarity search on CPU (fallback). func (ei *EmbeddingIndex) searchCPU(query []float32, k int) ([]SearchResult, error) { atomic.AddInt64(&ei.searchesCPU, 1) n := len(ei.nodeIDs) if k > n { k = n } // Compute all similarities scores := make([]float32, n) for i := 0; i < n; i++ { start := i * ei.dimensions end := start + ei.dimensions scores[i] = cosineSimilarityFlat(query, ei.cpuVectors[start:end]) } // Find top-k using partial sort indices := make([]int, n) for i := range indices { indices[i] = i } // Partial quickselect for top-k partialSort(indices, scores, k) // Build results results := make([]SearchResult, k) for i := 0; i < k; i++ { idx := indices[i] results[i] = SearchResult{ ID: ei.nodeIDs[idx], Score: scores[idx], Distance: 1 - scores[idx], } } return results, nil } // SyncToGPU uploads the current embeddings to GPU memory. func (ei *EmbeddingIndex) SyncToGPU() error { if !ei.manager.IsEnabled() { return ErrGPUDisabled } ei.mu.Lock() defer ei.mu.Unlock() if len(ei.cpuVectors) == 0 { ei.gpuSynced = true return nil } // Determine which backend to use if ei.manager.device != nil { switch ei.manager.device.Backend { case BackendMetal: return ei.syncToMetal() case BackendCUDA: return ei.syncToCUDA() } } // No backend available return ErrGPUNotAvailable } // syncToCUDA uploads embeddings to NVIDIA CUDA GPU buffer. func (ei *EmbeddingIndex) syncToCUDA() error { // Initialize CUDA device if needed if ei.cudaDevice == nil { deviceID := 0 if ei.manager.config != nil { deviceID = ei.manager.config.DeviceID } device, err := cuda.NewDevice(deviceID) if err != nil { return err } ei.cudaDevice = device } // Release old buffer if ei.cudaBuffer != nil { ei.cudaBuffer.Release() ei.cudaBuffer = nil } // Create new buffer with embeddings buffer, err := ei.cudaDevice.NewBuffer(ei.cpuVectors, cuda.MemoryDevice) if err != nil { return err } ei.cudaBuffer = buffer // Normalize vectors on GPU for faster cosine similarity n := uint32(len(ei.nodeIDs)) dims := uint32(ei.dimensions) if err := ei.cudaDevice.NormalizeVectors(ei.cudaBuffer, n, dims); err != nil { // Non-fatal: we can still search with unnormalized vectors // (but slower since we need to normalize each query) } // Update stats ei.gpuAllocated = len(ei.cpuVectors) * 4 ei.gpuSynced = true atomic.AddInt64(&ei.uploadsCount, 1) atomic.AddInt64(&ei.uploadBytes, int64(ei.gpuAllocated)) atomic.AddInt64(&ei.manager.stats.BytesTransferred, int64(ei.gpuAllocated)) return nil } // syncToMetal uploads embeddings to Metal GPU buffer. func (ei *EmbeddingIndex) syncToMetal() error { // Initialize Metal device if needed if ei.metalDevice == nil { device, err := metal.NewDevice() if err != nil { return err } ei.metalDevice = device } // Release old buffer if ei.metalBuffer != nil { ei.metalBuffer.Release() ei.metalBuffer = nil } // Create new buffer with embeddings buffer, err := ei.metalDevice.NewBuffer(ei.cpuVectors, metal.StorageShared) if err != nil { return err } ei.metalBuffer = buffer ei.gpuAllocated = len(ei.cpuVectors) * 4 ei.gpuSynced = true ei.uploadsCount++ ei.uploadBytes += int64(len(ei.cpuVectors) * 4) // Update manager stats atomic.AddInt64(&ei.manager.stats.BytesTransferred, int64(len(ei.cpuVectors)*4)) return nil } // Count returns the number of embeddings in the index. func (ei *EmbeddingIndex) Count() int { ei.mu.RLock() defer ei.mu.RUnlock() return len(ei.nodeIDs) } // MemoryUsageMB returns estimated memory usage. func (ei *EmbeddingIndex) MemoryUsageMB() float64 { ei.mu.RLock() defer ei.mu.RUnlock() // Each embedding: dimensions * 4 bytes (float32) // Plus nodeID overhead (~32 bytes average) bytesPerEmbed := ei.dimensions*4 + 32 totalBytes := len(ei.nodeIDs) * bytesPerEmbed return float64(totalBytes) / (1024 * 1024) } // GPUMemoryUsageMB returns GPU memory usage. func (ei *EmbeddingIndex) GPUMemoryUsageMB() float64 { ei.mu.RLock() defer ei.mu.RUnlock() return float64(ei.gpuAllocated) / (1024 * 1024) } // Stats returns index statistics. func (ei *EmbeddingIndex) Stats() EmbeddingIndexStats { ei.mu.RLock() defer ei.mu.RUnlock() return EmbeddingIndexStats{ Count: len(ei.nodeIDs), Dimensions: ei.dimensions, GPUSynced: ei.gpuSynced, SearchesGPU: atomic.LoadInt64(&ei.searchesGPU), SearchesCPU: atomic.LoadInt64(&ei.searchesCPU), UploadsCount: ei.uploadsCount, UploadBytes: ei.uploadBytes, } } // EmbeddingIndexStats holds embedding index statistics. type EmbeddingIndexStats struct { Count int Dimensions int GPUSynced bool SearchesGPU int64 SearchesCPU int64 UploadsCount int64 UploadBytes int64 } // Has checks if a nodeID exists in the index. func (ei *EmbeddingIndex) Has(nodeID string) bool { ei.mu.RLock() defer ei.mu.RUnlock() _, exists := ei.idToIndex[nodeID] return exists } // Get retrieves the embedding for a nodeID. func (ei *EmbeddingIndex) Get(nodeID string) ([]float32, bool) { ei.mu.RLock() defer ei.mu.RUnlock() idx, exists := ei.idToIndex[nodeID] if !exists { return nil, false } start := idx * ei.dimensions result := make([]float32, ei.dimensions) copy(result, ei.cpuVectors[start:start+ei.dimensions]) return result, true } // Clear removes all embeddings from the index. func (ei *EmbeddingIndex) Clear() { ei.mu.Lock() defer ei.mu.Unlock() ei.nodeIDs = ei.nodeIDs[:0] ei.idToIndex = make(map[string]int) ei.cpuVectors = ei.cpuVectors[:0] ei.gpuSynced = false // Release GPU resources if ei.metalBuffer != nil { ei.metalBuffer.Release() ei.metalBuffer = nil } ei.gpuAllocated = 0 } // Release frees all GPU resources associated with this index. // Call this when the index is no longer needed to free GPU memory. func (ei *EmbeddingIndex) Release() { ei.mu.Lock() defer ei.mu.Unlock() // Release Metal resources if ei.metalBuffer != nil { ei.metalBuffer.Release() ei.metalBuffer = nil } if ei.metalDevice != nil { ei.metalDevice.Release() ei.metalDevice = nil } // Release CUDA resources if ei.cudaBuffer != nil { ei.cudaBuffer.Release() ei.cudaBuffer = nil } if ei.cudaDevice != nil { ei.cudaDevice.Release() ei.cudaDevice = nil } ei.gpuAllocated = 0 ei.gpuSynced = false } // Serialize exports the index to bytes for persistence. func (ei *EmbeddingIndex) Serialize() ([]byte, error) { ei.mu.RLock() defer ei.mu.RUnlock() // Format: [dims:4][count:4][nodeIDs...][vectors...] n := len(ei.nodeIDs) // Calculate size size := 8 // header for _, id := range ei.nodeIDs { size += 4 + len(id) // length prefix + string } size += n * ei.dimensions * 4 // vectors buf := make([]byte, size) offset := 0 // Write header binary.LittleEndian.PutUint32(buf[offset:], uint32(ei.dimensions)) offset += 4 binary.LittleEndian.PutUint32(buf[offset:], uint32(n)) offset += 4 // Write nodeIDs for _, id := range ei.nodeIDs { binary.LittleEndian.PutUint32(buf[offset:], uint32(len(id))) offset += 4 copy(buf[offset:], id) offset += len(id) } // Write vectors for _, v := range ei.cpuVectors { binary.LittleEndian.PutUint32(buf[offset:], floatToUint32(v)) offset += 4 } return buf, nil } // Deserialize loads the index from bytes. func (ei *EmbeddingIndex) Deserialize(data []byte) error { ei.mu.Lock() defer ei.mu.Unlock() if len(data) < 8 { return errors.New("gpu: invalid serialized data") } offset := 0 // Read header dims := int(binary.LittleEndian.Uint32(data[offset:])) offset += 4 count := int(binary.LittleEndian.Uint32(data[offset:])) offset += 4 if dims != ei.dimensions { return ErrInvalidDimensions } // Read nodeIDs ei.nodeIDs = make([]string, count) ei.idToIndex = make(map[string]int, count) for i := 0; i < count; i++ { length := int(binary.LittleEndian.Uint32(data[offset:])) offset += 4 ei.nodeIDs[i] = string(data[offset : offset+length]) ei.idToIndex[ei.nodeIDs[i]] = i offset += length } // Read vectors ei.cpuVectors = make([]float32, count*dims) for i := range ei.cpuVectors { ei.cpuVectors[i] = uint32ToFloat(binary.LittleEndian.Uint32(data[offset:])) offset += 4 } ei.gpuSynced = false return nil } // Helper functions func floatToUint32(f float32) uint32 { return *(*uint32)(unsafe.Pointer(&f)) } func uint32ToFloat(u uint32) float32 { return *(*float32)(unsafe.Pointer(&u)) } // cosineSimilarityFlat computes cosine similarity for flat arrays. func cosineSimilarityFlat(a, b []float32) float32 { if len(a) != len(b) { return 0 } var dot, normA, normB float32 for i := range a { dot += a[i] * b[i] normA += a[i] * a[i] normB += b[i] * b[i] } if normA == 0 || normB == 0 { return 0 } return dot / (sqrt32(normA) * sqrt32(normB)) } // partialSort performs partial quicksort to get top-k elements. func partialSort(indices []int, scores []float32, k int) { if k >= len(indices) { // Full sort needed for i := 0; i < len(indices)-1; i++ { for j := i + 1; j < len(indices); j++ { if scores[indices[j]] > scores[indices[i]] { indices[i], indices[j] = indices[j], indices[i] } } } return } // Simple partial sort: just get top-k for i := 0; i < k; i++ { maxIdx := i for j := i + 1; j < len(indices); j++ { if scores[indices[j]] > scores[indices[maxIdx]] { maxIdx = j } } indices[i], indices[maxIdx] = indices[maxIdx], indices[i] } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server