Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
local_gguf.go11.8 kB
//go:build localllm // LocalGGUFEmbedder provides embedding generation using local GGUF models. // // This embedder runs models directly within NornicDB using llama.cpp, // eliminating the need for external services like Ollama. // // Features: // - GPU acceleration (CUDA/Metal) with CPU fallback // - Memory-mapped model loading for low memory footprint // - Thread-safe concurrent embedding generation // - Crash resilience with panic recovery // - Model warmup to prevent GPU memory eviction // // Example: // // config := &embed.Config{ // Provider: "local", // Model: "bge-m3", // Resolves to /data/models/bge-m3.gguf // Dimensions: 1024, // } // embedder, err := embed.NewLocalGGUF(config) // if err != nil { // log.Fatal(err) // } // defer embedder.Close() // // vec, err := embedder.Embed(ctx, "hello world") package embed import ( "context" "fmt" "os" "path/filepath" "runtime" "strconv" "sync" "sync/atomic" "time" "github.com/orneryd/nornicdb/pkg/localllm" ) // LocalGGUFEmbedder implements Embedder using a local GGUF model via llama.cpp. // // This embedder provides GPU-accelerated embedding generation without // external dependencies. Models are loaded from the configured models // directory (default: /data/models/). // // Thread-safe: Can be used concurrently from multiple goroutines. // // Crash Resilience: // - Panics in CGO are caught and converted to errors // - Model warmup keeps GPU memory active to prevent eviction // - Detailed logging helps diagnose CUDA/Metal issues type LocalGGUFEmbedder struct { model *localllm.Model modelName string modelPath string // Crash resilience mu sync.RWMutex closed bool stopWarmup chan struct{} warmupStopped chan struct{} // Stats for monitoring embedCount atomic.Int64 errorCount atomic.Int64 panicCount atomic.Int64 lastEmbedTime atomic.Int64 // Unix timestamp } // NewLocalGGUF creates an embedder using the existing Config pattern. // // Model resolution: config.Model → {NORNICDB_MODELS_DIR}/{model}.gguf // // Environment variables: // - NORNICDB_MODELS_DIR: Directory for .gguf files (default: /data/models) // - NORNICDB_EMBEDDING_GPU_LAYERS: GPU layer offload (-1=auto, 0=CPU, N=N layers) // - NORNICDB_EMBEDDING_WARMUP_INTERVAL: Warmup interval (default: 5m, 0=disabled) // // Example: // // config := &embed.Config{ // Provider: "local", // Model: "bge-m3", // Dimensions: 1024, // } // embedder, err := embed.NewLocalGGUF(config) // if err != nil { // // Model not found or failed to load // log.Fatal(err) // } // defer embedder.Close() // // vec, _ := embedder.Embed(ctx, "semantic search") // fmt.Printf("Dimensions: %d\n", len(vec)) // 1024 func NewLocalGGUF(config *Config) (*LocalGGUFEmbedder, error) { // Resolve model path: model name → /data/models/{name}.gguf modelsDir := os.Getenv("NORNICDB_MODELS_DIR") if modelsDir == "" { modelsDir = "/data/models" } modelPath := filepath.Join(modelsDir, config.Model+".gguf") // Check if file exists if _, err := os.Stat(modelPath); os.IsNotExist(err) { return nil, fmt.Errorf("model not found: %s (expected at %s)\n"+ " → Download a GGUF model (e.g., bge-m3) and place it in the models directory\n"+ " → Or set NORNICDB_MODELS_DIR to point to your models directory", config.Model, modelPath) } opts := localllm.DefaultOptions(modelPath) // Configure GPU layers from environment if gpuLayersStr := os.Getenv("NORNICDB_EMBEDDING_GPU_LAYERS"); gpuLayersStr != "" { if gpuLayers, err := strconv.Atoi(gpuLayersStr); err == nil { opts.GPULayers = gpuLayers } } fmt.Printf("🧠 Loading local embedding model: %s\n", modelPath) fmt.Printf(" GPU layers: %d (-1 = auto/all)\n", opts.GPULayers) model, err := localllm.LoadModel(opts) if err != nil { return nil, fmt.Errorf("failed to load model %s: %w", modelPath, err) } // Verify dimensions match if specified if config.Dimensions > 0 && model.Dimensions() != config.Dimensions { model.Close() return nil, fmt.Errorf("dimension mismatch: model has %d, config expects %d", model.Dimensions(), config.Dimensions) } fmt.Printf("✅ Model loaded: %s (%d dimensions)\n", config.Model, model.Dimensions()) embedder := &LocalGGUFEmbedder{ model: model, modelName: config.Model, modelPath: modelPath, stopWarmup: make(chan struct{}), warmupStopped: make(chan struct{}), } // Start warmup goroutine to keep model in GPU memory warmupInterval := 5 * time.Minute // Default: warmup every 5 minutes if intervalStr := os.Getenv("NORNICDB_EMBEDDING_WARMUP_INTERVAL"); intervalStr != "" { if d, err := time.ParseDuration(intervalStr); err == nil { warmupInterval = d } } if warmupInterval > 0 { go embedder.warmupLoop(warmupInterval) fmt.Printf("🔥 Model warmup enabled: every %v\n", warmupInterval) } return embedder, nil } // warmupLoop periodically generates a dummy embedding to keep the model in GPU memory. // This prevents GPU memory eviction which would require reloading from disk. func (e *LocalGGUFEmbedder) warmupLoop(interval time.Duration) { defer close(e.warmupStopped) ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-e.stopWarmup: return case <-ticker.C: // Check if we've had recent activity (no need to warmup if actively used) lastEmbed := time.Unix(e.lastEmbedTime.Load(), 0) if time.Since(lastEmbed) < interval/2 { continue // Model was recently used, skip warmup } // Generate a dummy embedding to keep model warm ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) _, err := e.embedWithRecovery(ctx, "warmup") cancel() if err != nil { fmt.Printf("⚠️ Model warmup failed: %v\n", err) } } } } // embedWithRecovery wraps the embed call with panic recovery. // CGO calls can panic on segfaults or other C-level errors. // This prevents the entire process from crashing. func (e *LocalGGUFEmbedder) embedWithRecovery(ctx context.Context, text string) (embedding []float32, err error) { // Recover from panics in CGO defer func() { if r := recover(); r != nil { e.panicCount.Add(1) // Capture stack trace for debugging buf := make([]byte, 4096) n := runtime.Stack(buf, false) stackTrace := string(buf[:n]) err = fmt.Errorf("PANIC in llama.cpp embedding (recovered): %v\nStack trace:\n%s", r, stackTrace) // Log detailed error for diagnostics fmt.Printf("🔴 EMBEDDING PANIC RECOVERED:\n") fmt.Printf(" Error: %v\n", r) fmt.Printf(" Text length: %d\n", len(text)) fmt.Printf(" Model: %s\n", e.modelName) fmt.Printf(" Total panics: %d\n", e.panicCount.Load()) fmt.Printf(" Stack trace:\n%s\n", stackTrace) } }() // Check if embedder is closed e.mu.RLock() if e.closed { e.mu.RUnlock() return nil, fmt.Errorf("embedder is closed") } model := e.model e.mu.RUnlock() // Perform the actual embedding embedding, err = model.Embed(ctx, text) if err != nil { e.errorCount.Add(1) // Log CUDA/Metal specific errors for diagnostics errStr := err.Error() if containsGPUError(errStr) { fmt.Printf("🔴 GPU ERROR detected:\n") fmt.Printf(" Error: %v\n", err) fmt.Printf(" Text length: %d\n", len(text)) fmt.Printf(" Model: %s\n", e.modelName) fmt.Printf(" Total errors: %d\n", e.errorCount.Load()) } } return embedding, err } // containsGPUError checks if an error message indicates a GPU-related issue. func containsGPUError(errStr string) bool { gpuKeywords := []string{ "CUDA", "cuda", "GPU", "gpu", "Metal", "metal", "out of memory", "OOM", "device", "driver", "cublas", "cudnn", "allocation failed", } for _, kw := range gpuKeywords { if contains(errStr, kw) { return true } } return false } // contains is a simple string contains check (avoiding strings import for build tag compat) func contains(s, substr string) bool { for i := 0; i <= len(s)-len(substr); i++ { if s[i:i+len(substr)] == substr { return true } } return false } // Embed generates a normalized embedding for the given text. // // The returned vector is L2-normalized, suitable for cosine similarity. // // Crash Resilience: // - Panics in llama.cpp are caught and converted to errors // - Errors are logged with diagnostics for debugging // // Example: // // vec, err := embedder.Embed(ctx, "graph database") // if err != nil { // return err // } // // Use vec for similarity search func (e *LocalGGUFEmbedder) Embed(ctx context.Context, text string) ([]float32, error) { e.embedCount.Add(1) e.lastEmbedTime.Store(time.Now().Unix()) return e.embedWithRecovery(ctx, text) } // EmbedBatch generates embeddings for multiple texts. // // Each text is processed sequentially with crash recovery. // If one text fails, processing continues with the others. // // Example: // // texts := []string{"query 1", "query 2", "query 3"} // vecs, err := embedder.EmbedBatch(ctx, texts) func (e *LocalGGUFEmbedder) EmbedBatch(ctx context.Context, texts []string) ([][]float32, error) { results := make([][]float32, len(texts)) var firstErr error for i, t := range texts { select { case <-ctx.Done(): return nil, ctx.Err() default: } emb, err := e.Embed(ctx, t) if err != nil { if firstErr == nil { firstErr = fmt.Errorf("text %d: %w", i, err) } // Continue processing other texts even if one fails fmt.Printf("⚠️ Embedding failed for text %d (len=%d): %v\n", i, len(t), err) continue } results[i] = emb } // Return first error if any, but still return partial results return results, firstErr } // Dimensions returns the embedding vector dimension. // // Common values: // - BGE-M3: 1024 // - E5-large: 1024 // - Jina-v2-base-code: 768 func (e *LocalGGUFEmbedder) Dimensions() int { e.mu.RLock() defer e.mu.RUnlock() if e.model == nil { return 0 } return e.model.Dimensions() } // Model returns the model name (without path or extension). func (e *LocalGGUFEmbedder) Model() string { return e.modelName } // Stats returns embedding statistics for monitoring. type EmbedderStats struct { EmbedCount int64 `json:"embed_count"` ErrorCount int64 `json:"error_count"` PanicCount int64 `json:"panic_count"` LastEmbedTime time.Time `json:"last_embed_time"` ModelName string `json:"model_name"` ModelPath string `json:"model_path"` } // Stats returns current embedder statistics for monitoring. func (e *LocalGGUFEmbedder) Stats() EmbedderStats { lastEmbed := time.Unix(e.lastEmbedTime.Load(), 0) if e.lastEmbedTime.Load() == 0 { lastEmbed = time.Time{} // Zero time if never used } return EmbedderStats{ EmbedCount: e.embedCount.Load(), ErrorCount: e.errorCount.Load(), PanicCount: e.panicCount.Load(), LastEmbedTime: lastEmbed, ModelName: e.modelName, ModelPath: e.modelPath, } } // Close releases all resources associated with the embedder. // // After Close is called, the embedder must not be used. func (e *LocalGGUFEmbedder) Close() error { e.mu.Lock() defer e.mu.Unlock() if e.closed { return nil } e.closed = true // Stop warmup goroutine close(e.stopWarmup) // Wait for warmup to finish (with timeout) select { case <-e.warmupStopped: case <-time.After(5 * time.Second): fmt.Printf("⚠️ Warmup goroutine did not stop in time\n") } // Close the model if e.model != nil { fmt.Printf("🧠 Closing embedding model: %s\n", e.modelName) fmt.Printf(" Total embeddings: %d\n", e.embedCount.Load()) fmt.Printf(" Total errors: %d\n", e.errorCount.Load()) fmt.Printf(" Total panics recovered: %d\n", e.panicCount.Load()) return e.model.Close() } return nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server