Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
gpu-implementation.md66 kB
# GPU K-Means Clustering for Mimir/NornicDB: Combined Analysis & Implementation Plan ## 🔧 CRITICAL FIX: Metal Atomic Float Workaround **⚠️ The Metal kernels in Section 1.3 (lines 253-361) use `atomic_float` which does not exist in Metal 2.x!** **✅ FIXED:** Production-ready Metal kernels with atomic float workaround are available: - **Implementation:** `nornicdb/pkg/gpu/metal/kmeans_kernels_darwin.metal` - **Full Documentation:** `nornicdb/docs/GPU_KMEANS_METAL_ATOMIC_FIX.md` - **Quick Reference:** `nornicdb/docs/GPU_KMEANS_METAL_ATOMIC_SUMMARY.md` The fix uses atomic compare-exchange on `atomic_uint` to emulate atomic float operations. Performance impact: 3-5x slower atomic accumulation, but **still 94x faster than CPU overall**. --- ## Executive Summary This document combines the theoretical analysis from K-MEANS.md and K-MEANS-RT.md with a critical evaluation of their applicability to Mimir's architecture. It provides a realistic implementation plan for enhancing NornicDB's vector search capabilities with GPU-accelerated k-means clustering. --- ## Part 1: Critical Analysis ### 1.1 Current State of NornicDB GPU Infrastructure **What Exists:** - ✅ `pkg/gpu/` package with multi-backend support (Metal, CUDA, OpenCL, Vulkan) - ✅ `EmbeddingIndex` - GPU-accelerated vector similarity search - ✅ Working Metal (macOS) and CUDA (NVIDIA) backends - ✅ CPU fallback for all operations - ✅ Auto-detected embedding dimensions (inferred from first embedding added) **What's Missing for K-Means:** - ❌ No k-means clustering implementation exists - ❌ No cluster assignment tracking - ❌ No centroid management - ❌ No integration with inference engine - ❌ No real-time clustering hooks ### 1.2 Mimir Context: Why K-Means Matters From `AGENTS.md` and `README.md`, Mimir's architecture uses: - **Neo4j Graph Database** for persistent storage - **Vector embeddings** with auto-detected dimensions (varies by model): - `mxbai-embed-large`: 1024 dimensions - `nomic-embed-text`: 768 dimensions - `text-embedding-3-small` (OpenAI): 1536 dimensions - Dimensions are automatically inferred from the embeddings at runtime - **Semantic search** via `vector_search_nodes` tool - **File indexing** with chunking (1000-char chunks with embeddings) **Current Vector Search Flow:** ``` Query → Embedding → vector_search_nodes → Cosine Similarity → Top-K Results ``` **Proposed K-Means Enhanced Flow:** ``` Query → Embedding → Cluster Lookup (O(1)) → Intra-cluster Refinement → Results ``` ### 1.3 Critical Gaps in Original Documents #### K-MEANS.md Concerns: 1. **Python/cuML Focus**: Examples use Python libraries (RAPIDS cuML, FAISS, PyTorch), but NornicDB is Go-based. The Go interface examples are theoretical - no actual CUDA/Metal kernel implementations are provided. 2. **Missing GPU Kernel Code**: The CUDA/Metal kernels shown are simplified pseudocode. Production k-means requires: - K-means++ initialization (not shown) - Parallel prefix sum for centroid updates - Convergence checking across GPU threads - Memory coalescing optimizations 3. **Memory Estimates Optimistic**: The 40MB for 10K embeddings assumes contiguous storage. Real overhead includes: - GPU command buffers - Intermediate results buffers - Working memory for reductions - Actual: ~80-120MB for 10K embeddings 4. **NornicDB Integration Untested**: The `ConceptClusteringEngine` shown has never been implemented. The interface to `storage.Engine` doesn't match NornicDB's actual storage API. #### K-MEANS-RT.md Concerns: 1. **3-Tier System Complexity**: While elegant, implementing three tiers simultaneously is risky: - Tier 1 (instant reassignment): Simple, low risk - Tier 2 (batch updates): Moderate complexity - Tier 3 (full re-clustering): Requires careful synchronization 2. **Drift Detection Untested**: The `computeDrift` and `shouldRecluster` heuristics need empirical tuning. Thresholds like 0.1 are arbitrary. 3. **Concurrent Update Handling**: The locking strategy (`sync.RWMutex`) may cause contention under high-throughput scenarios. The document doesn't address: - Lock-free alternatives - Sharded cluster indices - Read-copy-update patterns 4. **GPU Kernel Launch Overhead**: Launching GPU kernels for single-node reassignment (~0.1ms claimed) may actually take 0.3-0.5ms due to: - Driver overhead - Memory synchronization - Command queue latency ### 1.4 What the Documents Get Right **Feasibility**: ✅ GPU k-means on high-dimensional embeddings (768-1536 dims) is absolutely feasible. NornicDB already has the GPU infrastructure. **Performance Claims**: ✅ The 100-400x speedup over CPU for batch operations is realistic based on existing Metal/CUDA benchmarks. **Use Cases**: ✅ The three main use cases are valid for Mimir: 1. **Topic Discovery**: Cluster similar file chunks for concept mining 2. **Related Documents**: O(1) lookup of related files via cluster membership 3. **Concept Drift**: Track how codebase topics evolve over indexing **Hybrid Approach**: ✅ Combining cluster-based filtering with vector refinement is the correct architecture. --- ## Part 2: Realistic Implementation Plan ### Phase 1: Foundation (2-3 weeks) **Goal**: Implement basic GPU k-means as extension to existing `EmbeddingIndex` #### 1.1 Extend EmbeddingIndex with Clustering ```go // pkg/gpu/kmeans.go (NEW FILE) package gpu import ( "sync" "sync/atomic" ) // KMeansConfig configures k-means clustering type KMeansConfig struct { NumClusters int // K value (default: sqrt(N/2)) MaxIterations int // Convergence limit (default: 100) Tolerance float32 // Convergence threshold (default: 0.0001) InitMethod string // "kmeans++" or "random" AutoK bool // Auto-determine optimal K // Note: Dimensions are auto-detected from the first embedding added } // DefaultKMeansConfig returns sensible defaults // Dimensions are auto-detected from the first embedding added func DefaultKMeansConfig() *KMeansConfig { return &KMeansConfig{ NumClusters: 0, // Auto-detect based on data size MaxIterations: 100, Tolerance: 0.0001, InitMethod: "kmeans++", AutoK: true, } } // ClusterIndex extends EmbeddingIndex with clustering capabilities type ClusterIndex struct { *EmbeddingIndex config *KMeansConfig dimensions int // Auto-detected from first embedding // Cluster state centroids [][]float32 // [K][dimensions] centroid vectors assignments []int // [N] cluster assignment per embedding clusterMap map[int][]int // cluster_id -> embedding indices // GPU buffers for clustering centroidBuffer unsafe.Pointer // GPU buffer for centroids // State tracking clustered bool mu sync.RWMutex // Stats clusterIterations int64 lastClusterTime time.Duration } // NewClusterIndex creates a clusterable embedding index func NewClusterIndex(manager *Manager, embConfig *EmbeddingIndexConfig, kmeansConfig *KMeansConfig) *ClusterIndex { if kmeansConfig == nil { kmeansConfig = DefaultKMeansConfig() } return &ClusterIndex{ EmbeddingIndex: NewEmbeddingIndex(manager, embConfig), config: kmeansConfig, clusterMap: make(map[int][]int), } } ``` #### 1.2 Implement CPU K-Means First ```go // Cluster performs k-means clustering on current embeddings func (ci *ClusterIndex) Cluster() error { ci.mu.Lock() defer ci.mu.Unlock() n := len(ci.nodeIDs) if n == 0 { return nil } // Auto-determine K if not specified k := ci.config.NumClusters if k <= 0 || ci.config.AutoK { k = optimalK(n) } // Initialize centroids (k-means++ or random) ci.centroids = ci.initCentroids(k) ci.assignments = make([]int, n) // Iterate until convergence start := time.Now() for iter := 0; iter < ci.config.MaxIterations; iter++ { // Assignment step changed := ci.assignToCentroids() // Update centroids ci.updateCentroids() if !changed { break } atomic.AddInt64(&ci.clusterIterations, 1) } ci.lastClusterTime = time.Since(start) // Build cluster map ci.buildClusterMap() ci.clustered = true return nil } // optimalK calculates optimal cluster count using rule of thumb func optimalK(n int) int { // sqrt(n/2) is a common heuristic k := int(math.Sqrt(float64(n) / 2)) if k < 10 { k = 10 // Minimum clusters } if k > 1000 { k = 1000 // Maximum clusters } return k } ``` #### 1.3 Add Metal GPU Kernel > **IMPORTANT**: Metal does not have native `atomic_float`. We must emulate it using > `atomic_uint` with compare-exchange, matching the pattern used in `shaders_darwin.metal`. ```metal // pkg/gpu/metal/kernels/kmeans.metal (NEW FILE) #include <metal_stdlib> using namespace metal; // ============================================================================= // Atomic Float Emulation (Metal lacks native atomic_float) // ============================================================================= // Uses compare-exchange loop on atomic_uint reinterpreted as float. // This is the standard technique for atomic float operations in Metal. inline void atomicAddFloat(device atomic_uint* addr, float val) { uint expected = atomic_load_explicit(addr, memory_order_relaxed); uint desired; do { float current = as_type<float>(expected); desired = as_type<uint>(current + val); } while (!atomic_compare_exchange_weak_explicit( addr, &expected, desired, memory_order_relaxed, memory_order_relaxed )); } // ============================================================================= // Kernel: Compute Distances (Points to Centroids) // ============================================================================= // Computes squared Euclidean distance from each point to each centroid. // This is the "assignment step" preparation in k-means. kernel void kmeans_compute_distances( device const float* embeddings [[buffer(0)]], // [N * D] device const float* centroids [[buffer(1)]], // [K * D] device float* distances [[buffer(2)]], // [N * K] constant uint& N [[buffer(3)]], constant uint& K [[buffer(4)]], constant uint& D [[buffer(5)]], uint2 gid [[thread_position_in_grid]] ) { uint n = gid.x; // embedding index uint k = gid.y; // centroid index if (n >= N || k >= K) return; float dist = 0.0f; for (uint d = 0; d < D; d++) { float diff = embeddings[n * D + d] - centroids[k * D + d]; dist += diff * diff; } distances[n * K + k] = dist; } // ============================================================================= // Kernel: Assign Clusters // ============================================================================= // Finds nearest centroid for each point (assignment step). // Tracks number of changed assignments for convergence detection. kernel void kmeans_assign_clusters( device const float* distances [[buffer(0)]], // [N * K] device int* assignments [[buffer(1)]], // [N] device atomic_int* changed [[buffer(2)]], // [1] - convergence counter constant uint& N [[buffer(3)]], constant uint& K [[buffer(4)]], uint gid [[thread_position_in_grid]] ) { if (gid >= N) return; int old_cluster = assignments[gid]; // Find minimum distance centroid float min_dist = distances[gid * K]; int closest = 0; for (uint k = 1; k < K; k++) { float d = distances[gid * K + k]; if (d < min_dist) { min_dist = d; closest = int(k); } } assignments[gid] = closest; // Track changes for convergence if (closest != old_cluster) { atomic_fetch_add_explicit(changed, 1, memory_order_relaxed); } } // ============================================================================= // Kernel: Accumulate Centroid Sums // ============================================================================= // Parallel reduction to compute sum of embeddings per cluster. // Uses atomic float emulation for thread-safe accumulation. // // Note: This kernel uses atomic operations which are slower. // For production, consider hierarchical reduction or warp-level primitives. kernel void kmeans_accumulate_sums( device const float* embeddings [[buffer(0)]], // [N * D] device const int* assignments [[buffer(1)]], // [N] device atomic_uint* centroid_sums [[buffer(2)]], // [K * D] as uint for atomic emulation device atomic_int* cluster_counts [[buffer(3)]], // [K] constant uint& N [[buffer(4)]], constant uint& K [[buffer(5)]], constant uint& D [[buffer(6)]], uint gid [[thread_position_in_grid]] ) { if (gid >= N) return; int cluster = assignments[gid]; // Atomic add each dimension to centroid sum for (uint d = 0; d < D; d++) { atomicAddFloat(&centroid_sums[cluster * D + d], embeddings[gid * D + d]); } // Increment cluster count (only once per embedding) atomic_fetch_add_explicit(&cluster_counts[cluster], 1, memory_order_relaxed); } // ============================================================================= // Kernel: Finalize Centroids // ============================================================================= // Divides accumulated sums by counts to compute new centroid positions. kernel void kmeans_finalize_centroids( device float* centroids [[buffer(0)]], // [K * D] device const atomic_uint* centroid_sums [[buffer(1)]], // [K * D] device const int* cluster_counts [[buffer(2)]], // [K] constant uint& K [[buffer(3)]], constant uint& D [[buffer(4)]], uint2 gid [[thread_position_in_grid]] ) { uint k = gid.x; uint d = gid.y; if (k >= K || d >= D) return; int count = cluster_counts[k]; if (count > 0) { // Read accumulated sum (reinterpret uint as float) uint sum_uint = atomic_load_explicit(&centroid_sums[k * D + d], memory_order_relaxed); float sum = as_type<float>(sum_uint); centroids[k * D + d] = sum / float(count); } } // ============================================================================= // Kernel: Find Nearest Centroid (Single Query) // ============================================================================= // Used for real-time assignment of new embeddings to existing clusters. // This is fast (single kernel launch) for incremental updates. kernel void kmeans_find_nearest_centroid( device const float* query [[buffer(0)]], // [D] device const float* centroids [[buffer(1)]], // [K * D] device int* result [[buffer(2)]], // [1] - nearest cluster ID device float* min_distance [[buffer(3)]], // [1] - distance to nearest constant uint& K [[buffer(4)]], constant uint& D [[buffer(5)]], uint gid [[thread_position_in_grid]], uint lid [[thread_position_in_threadgroup]], threadgroup float* shared_distances [[threadgroup(0)]], threadgroup int* shared_indices [[threadgroup(1)]] ) { // Each thread computes distance to one centroid if (gid < K) { float dist = 0.0f; for (uint d = 0; d < D; d++) { float diff = query[d] - centroids[gid * D + d]; dist += diff * diff; } shared_distances[lid] = dist; shared_indices[lid] = int(gid); } else { shared_distances[lid] = INFINITY; shared_indices[lid] = -1; } threadgroup_barrier(mem_flags::mem_threadgroup); // Parallel reduction to find minimum for (uint stride = 128; stride > 0; stride /= 2) { if (lid < stride && lid + stride < 256) { if (shared_distances[lid + stride] < shared_distances[lid]) { shared_distances[lid] = shared_distances[lid + stride]; shared_indices[lid] = shared_indices[lid + stride]; } } threadgroup_barrier(mem_flags::mem_threadgroup); } // First thread writes result if (lid == 0) { result[0] = shared_indices[0]; min_distance[0] = shared_distances[0]; } } ``` ### Phase 2: Inference Engine Integration (2-3 weeks) **Goal**: Wire k-means clustering into Mimir's inference engine following existing patterns > **CRITICAL**: The inference engine uses **dependency injection** for vector search, not direct references. > Follow the `TopologyIntegration` pattern from `pkg/inference/topology_integration.go`. #### 2.1 Create ClusterIntegration (pkg/inference/cluster_integration.go) ```go // pkg/inference/cluster_integration.go (NEW FILE) package inference import ( "context" "sync" "time" "github.com/orneryd/nornicdb/pkg/gpu" ) // ClusterConfig controls cluster-accelerated search integration. // // This allows the inference engine to use pre-computed clusters // for faster similarity search and related-document discovery. // // Example: // // config := &inference.ClusterConfig{ // Enabled: true, // ExpansionFactor: 3, // Search 3 nearest clusters // AutoClusterOnIndex: true, // Cluster after indexing // MinEmbeddings: 1000, // Min embeddings before clustering // } type ClusterConfig struct { // Enable cluster-accelerated search Enabled bool // ExpansionFactor: how many clusters to search (default: 3) // Higher = better recall, slower search ExpansionFactor int // AutoClusterOnIndex: trigger clustering after index operations AutoClusterOnIndex bool // MinEmbeddings: minimum embeddings before auto-clustering MinEmbeddings int // ReclusterInterval: how often to rebuild clusters (0 = manual only) ReclusterInterval time.Duration } // DefaultClusterConfig returns sensible defaults for cluster integration. func DefaultClusterConfig() *ClusterConfig { return &ClusterConfig{ Enabled: false, // Opt-in ExpansionFactor: 3, AutoClusterOnIndex: true, MinEmbeddings: 1000, ReclusterInterval: time.Hour, } } // ClusterIntegration adds cluster-accelerated search to the inference engine. // // This wraps a ClusterIndex and provides the same interface as the // similarity search function, but uses clusters for faster lookup. // // Architecture: // // ┌─────────────────────────────────────────────────────────────┐ // │ Inference Engine │ // ├─────────────────────────────────────────────────────────────┤ // │ SetSimilaritySearch(fn) ←─── ClusterIntegration.Search │ // │ SetClusterIntegration(ci) ←── NEW: Direct cluster access │ // └─────────────────────────────────────────────────────────────┘ // │ // ▼ // ┌─────────────────────────────────────────────────────────────┐ // │ ClusterIntegration │ // ├─────────────────────────────────────────────────────────────┤ // │ clusterIndex *gpu.ClusterIndex ←── GPU/CPU clustering │ // │ fallbackSearch func(...) ←── Original search │ // │ config *ClusterConfig ←── Settings │ // └─────────────────────────────────────────────────────────────┘ // // Example: // // engine := inference.New(inference.DefaultConfig()) // // // Create cluster integration // clusterConfig := inference.DefaultClusterConfig() // clusterConfig.Enabled = true // // ci := inference.NewClusterIntegration(gpuManager, clusterConfig) // // // Wire to inference engine (Option A: via SetSimilaritySearch) // engine.SetSimilaritySearch(ci.Search) // // // Or (Option B: direct access for OnIndexComplete) // engine.SetClusterIntegration(ci) type ClusterIntegration struct { config *ClusterConfig clusterIndex *gpu.ClusterIndex // Fallback search function (used when clusters unavailable) fallbackSearch func(ctx context.Context, embedding []float32, k int) ([]SimilarityResult, error) mu sync.RWMutex } // NewClusterIntegration creates a new cluster integration. // // Parameters: // - manager: GPU manager for cluster index // - config: Cluster configuration (nil uses defaults) // // Returns ready-to-use integration that can be attached to inference engine. func NewClusterIntegration(manager *gpu.Manager, config *ClusterConfig) *ClusterIntegration { if config == nil { config = DefaultClusterConfig() } return &ClusterIntegration{ config: config, clusterIndex: gpu.NewClusterIndex(manager, nil, nil), } } // SetFallbackSearch sets the fallback search function used when clusters // are not available (e.g., before clustering, or when disabled). // // This should be the original EmbeddingIndex.Search function. func (ci *ClusterIntegration) SetFallbackSearch(fn func(ctx context.Context, embedding []float32, k int) ([]SimilarityResult, error)) { ci.mu.Lock() defer ci.mu.Unlock() ci.fallbackSearch = fn } // Search implements the similarity search interface, using clusters when available. // // This method can be passed to engine.SetSimilaritySearch() to transparently // add cluster-based acceleration. // // Flow: // 1. If clusters available and enabled → cluster search // 2. Otherwise → fallback to brute-force search // // Returns results compatible with inference engine expectations. func (ci *ClusterIntegration) Search(ctx context.Context, embedding []float32, k int) ([]SimilarityResult, error) { ci.mu.RLock() defer ci.mu.RUnlock() // Use cluster search if available if ci.config.Enabled && ci.clusterIndex.IsClustered() { return ci.searchWithClusters(ctx, embedding, k) } // Fallback to brute-force if ci.fallbackSearch != nil { return ci.fallbackSearch(ctx, embedding, k) } // Direct search on cluster index (which wraps EmbeddingIndex) return ci.clusterIndex.Search(ctx, embedding, k) } // searchWithClusters performs cluster-accelerated search. func (ci *ClusterIntegration) searchWithClusters(ctx context.Context, embedding []float32, k int) ([]SimilarityResult, error) { // 1. Find nearest clusters clusterIDs := ci.clusterIndex.FindNearestClusters(embedding, ci.config.ExpansionFactor) // 2. Get candidate node IDs from clusters candidates := ci.clusterIndex.GetClusterMembers(clusterIDs) // 3. Refine with exact similarity on candidates only results, err := ci.clusterIndex.SearchCandidates(ctx, embedding, candidates, k) if err != nil { return nil, err } // 4. Convert to SimilarityResult format simResults := make([]SimilarityResult, len(results)) for i, r := range results { simResults[i] = SimilarityResult{ ID: r.ID, Score: r.Score, } } return simResults, nil } // AddEmbedding adds an embedding and handles real-time cluster assignment. // // Call this when a new node is stored to keep clusters updated. func (ci *ClusterIntegration) AddEmbedding(nodeID string, embedding []float32) error { return ci.clusterIndex.OnNodeUpdate(nodeID, embedding) } // Cluster triggers k-means clustering on all indexed embeddings. // // This is called automatically by OnIndexComplete when AutoClusterOnIndex=true, // or can be called manually via MCP tool. func (ci *ClusterIntegration) Cluster() error { return ci.clusterIndex.Cluster() } // IsClustered returns true if clustering has been performed. func (ci *ClusterIntegration) IsClustered() bool { return ci.clusterIndex.IsClustered() } // Stats returns clustering statistics for monitoring. func (ci *ClusterIntegration) Stats() gpu.ClusterStats { return ci.clusterIndex.ClusterStats() } // GetClusterIndex returns the underlying cluster index for direct access. func (ci *ClusterIntegration) GetClusterIndex() *gpu.ClusterIndex { return ci.clusterIndex } ``` #### 2.2 Extend Inference Engine (pkg/inference/inference.go additions) ```go // Add to Engine struct (pkg/inference/inference.go) type Engine struct { // ... existing fields ... // Optional cluster integration (NEW) clusterIntegration *ClusterIntegration } // SetClusterIntegration enables cluster-accelerated search. // // This adds clustering capabilities to the inference engine, following // the same pattern as SetTopologyIntegration. // // When both similaritySearch and clusterIntegration are set, the cluster // integration's Search method wraps the original for acceleration. // // Example: // // engine := inference.New(inference.DefaultConfig()) // // // Set up cluster integration // clusterConfig := inference.DefaultClusterConfig() // clusterConfig.Enabled = true // ci := inference.NewClusterIntegration(gpuManager, clusterConfig) // // // Wire it up // engine.SetClusterIntegration(ci) // // // The engine now uses clusters for similarity search func (e *Engine) SetClusterIntegration(ci *ClusterIntegration) { e.mu.Lock() defer e.mu.Unlock() // If we already have a similarity search, make it the fallback if e.similaritySearch != nil { ci.SetFallbackSearch(e.similaritySearch) } // Replace similarity search with cluster-aware version e.similaritySearch = ci.Search e.clusterIntegration = ci } // GetClusterIntegration returns the current cluster integration (or nil). func (e *Engine) GetClusterIntegration() *ClusterIntegration { e.mu.RLock() defer e.mu.RUnlock() return e.clusterIntegration } // OnIndexComplete is called after batch indexing operations complete. // // This triggers auto-clustering if enabled, following NornicDB's pattern // of optional, feature-flagged enhancements. // // Who calls this: // - MCP server after index_folder completes // - Storage engine after batch import // - File watcher after batch processing // // Example: // // // In MCP index_folder handler // func handleIndexFolder(ctx context.Context, path string) error { // // ... index files ... // // // Notify inference engine // if engine.GetClusterIntegration() != nil { // engine.OnIndexComplete(ctx) // } // return nil // } func (e *Engine) OnIndexComplete(ctx context.Context) error { e.mu.RLock() ci := e.clusterIntegration e.mu.RUnlock() if ci == nil || !ci.config.AutoClusterOnIndex { return nil } // Check minimum embedding threshold if ci.clusterIndex.Count() < ci.config.MinEmbeddings { return nil } // Cluster in background (non-blocking) go func() { if err := ci.Cluster(); err != nil { // Log error but don't fail the indexing operation // Import: "log" or use structured logging log.Printf("Auto-clustering failed: %v", err) } else { stats := ci.Stats() log.Printf("Clustered %d embeddings into %d clusters (took %v)", stats.EmbeddingCount, stats.NumClusters, stats.LastClusterTime) } }() return nil } ``` #### 2.3 Wiring Diagram: Startup to MCP Tool ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ STARTUP SEQUENCE │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ 1. main.go / cmd/nornicdb/main.go │ │ ┌──────────────────────────────────────────────────────────────────┐ │ │ │ // Initialize GPU manager │ │ │ │ gpuManager, _ := gpu.NewManager(gpuConfig) │ │ │ │ │ │ │ │ // Initialize inference engine │ │ │ │ inferEngine := inference.New(inferConfig) │ │ │ │ │ │ │ │ // Create cluster integration │ │ │ │ clusterConfig := inference.DefaultClusterConfig() │ │ │ │ clusterConfig.Enabled = config.Clustering.Enabled │ │ │ │ ci := inference.NewClusterIntegration(gpuManager, clusterConfig)│ │ │ │ │ │ │ │ // Wire to inference engine │ │ │ │ inferEngine.SetClusterIntegration(ci) │ │ │ │ │ │ │ │ // Pass to MCP server │ │ │ │ mcpServer := mcp.NewServer(inferEngine) │ │ │ └──────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ 2. MCP Server receives inference engine reference │ │ ┌──────────────────────────────────────────────────────────────────┐ │ │ │ type Server struct { │ │ │ │ inferEngine *inference.Engine │ │ │ │ } │ │ │ └──────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ 3. MCP Tool Handlers use inference engine │ │ ┌──────────────────────────────────────────────────────────────────┐ │ │ │ // vector_search_nodes tool │ │ │ │ func (s *Server) handleVectorSearch(ctx, params) { │ │ │ │ results, _ := s.inferEngine.OnStore(ctx, query, embedding) │ │ │ │ // Automatically uses clusters if enabled! │ │ │ │ } │ │ │ │ │ │ │ │ // index_folder tool │ │ │ │ func (s *Server) handleIndexFolder(ctx, params) { │ │ │ │ // ... index files ... │ │ │ │ s.inferEngine.OnIndexComplete(ctx) // Triggers clustering │ │ │ │ } │ │ │ │ │ │ │ │ // cluster_embeddings tool (NEW) │ │ │ │ func (s *Server) handleClusterEmbeddings(ctx, params) { │ │ │ │ ci := s.inferEngine.GetClusterIntegration() │ │ │ │ if ci != nil { │ │ │ │ return ci.Cluster() │ │ │ │ } │ │ │ │ } │ │ │ └──────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` #### 2.4 Add MCP Tool for Clustering ```typescript // New tool: cluster_embeddings { name: "cluster_embeddings", description: "Cluster all embeddings into semantic groups for faster related-document lookup", inputSchema: { type: "object", properties: { num_clusters: { type: "number", description: "Number of clusters (auto-detected if not specified)" }, node_types: { type: "array", items: { type: "string" }, description: "Node types to cluster (default: all)" } } } } ``` #### 2.5 Enhance vector_search_nodes with Cluster Filtering ```typescript // Enhanced vector_search_nodes async function vectorSearchNodes(params: { query: string; limit?: number; types?: string[]; use_clusters?: boolean; // NEW: Use pre-computed clusters cluster_expansion?: number; // NEW: Search N similar clusters }) { // If clusters are available and enabled, use them if (params.use_clusters && clustersAvailable()) { // 1. Find query's nearest cluster const queryCluster = await findNearestCluster(queryEmbedding); // 2. Get similar clusters if expansion requested const searchClusters = params.cluster_expansion > 1 ? await getSimilarClusters(queryCluster, params.cluster_expansion) : [queryCluster]; // 3. Get candidates from clusters (O(1) per cluster) const candidates = await getClusterMembers(searchClusters); // 4. Refine with exact similarity return rankBySimilarity(queryEmbedding, candidates, params.limit); } // Fallback to full vector search return fullVectorSearch(queryEmbedding, params.limit); } ``` ### Phase 3: Real-Time Updates (3-4 weeks) **Goal**: Implement the 3-tier system from K-MEANS-RT.md with proper safeguards #### 3.1 Tier 1: Instant Reassignment ```go // OnNodeUpdate handles real-time embedding changes func (ci *ClusterIndex) OnNodeUpdate(nodeID string, newEmbedding []float32) error { if !ci.clustered { // No clustering yet, just update embedding return ci.EmbeddingIndex.Add(nodeID, newEmbedding) } ci.mu.Lock() defer ci.mu.Unlock() // Find nearest centroid (use GPU if available) newCluster := ci.findNearestCentroid(newEmbedding) idx, exists := ci.idToIndex[nodeID] if exists { oldCluster := ci.assignments[idx] if newCluster != oldCluster { // Update cluster membership ci.removeFromClusterMap(oldCluster, idx) ci.addToClusterMap(newCluster, idx) ci.assignments[idx] = newCluster // Track for batch centroid update ci.pendingUpdates = append(ci.pendingUpdates, nodeUpdate{ idx: idx, oldCluster: oldCluster, newCluster: newCluster, }) } } // Update embedding return ci.EmbeddingIndex.Add(nodeID, newEmbedding) } ``` #### 3.2 Tier 2: Batch Centroid Updates ```go // Periodically called to update centroids based on accumulated changes func (ci *ClusterIndex) updateCentroidsBatch() { ci.mu.Lock() updates := ci.pendingUpdates ci.pendingUpdates = nil ci.mu.Unlock() if len(updates) == 0 { return } // Group by affected clusters affectedClusters := make(map[int]bool) for _, u := range updates { affectedClusters[u.oldCluster] = true affectedClusters[u.newCluster] = true } // Recompute centroids only for affected clusters for clusterID := range affectedClusters { ci.recomputeCentroid(clusterID) } } ``` #### 3.3 Tier 3: Scheduled Re-Clustering ```go // Start background re-clustering worker func (ci *ClusterIndex) StartClusterMaintenance(interval time.Duration) { go func() { ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { if ci.shouldRecluster() { log.Printf("Triggering full re-clustering (updates=%d, drift=%.4f)", ci.updatesSinceCluster, ci.maxDrift()) if err := ci.Cluster(); err != nil { log.Printf("Re-clustering failed: %v", err) } } } }() } func (ci *ClusterIndex) shouldRecluster() bool { // Trigger if: // 1. Too many updates (>10% of dataset) if float64(ci.updatesSinceCluster) > float64(ci.Count())*0.1 { return true } // 2. High centroid drift if ci.maxDrift() > ci.config.DriftThreshold { return true } // 3. Time-based (every hour) if time.Since(ci.lastClusterTime) > time.Hour { return true } return false } ``` ### Phase 4: Testing & Benchmarking (2 weeks) #### 4.1 Unit Tests for ClusterIndex (pkg/gpu/kmeans_test.go) ```go // pkg/gpu/kmeans_test.go package gpu import ( "fmt" "math" "math/rand" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestKMeansBasic(t *testing.T) { // Create test embeddings (3 clear clusters) dims := 1024 // Example: mxbai-embed-large embeddings := generateClusteredData(1000, 3, dims) manager, _ := NewManager(DefaultConfig()) index := NewClusterIndex(manager, nil, &KMeansConfig{ NumClusters: 3, MaxIterations: 100, }) for i, emb := range embeddings { err := index.Add(fmt.Sprintf("node-%d", i), emb) require.NoError(t, err) } err := index.Cluster() require.NoError(t, err) // Verify cluster quality stats := index.ClusterStats() assert.Equal(t, 3, stats.NumClusters) assert.InDelta(t, 333, stats.AvgClusterSize, 50) assert.True(t, index.IsClustered()) } func TestKMeansGPUvsCPU(t *testing.T) { dims := 1024 embeddings := generateRandomData(10000, dims) // CPU clustering cpuConfig := DefaultConfig() cpuConfig.Enabled = false cpuManager, _ := NewManager(cpuConfig) cpuIndex := NewClusterIndex(cpuManager, nil, nil) for i, emb := range embeddings { cpuIndex.Add(fmt.Sprintf("node-%d", i), emb) } cpuStart := time.Now() cpuIndex.Cluster() cpuTime := time.Since(cpuStart) // GPU clustering gpuConfig := DefaultConfig() gpuConfig.Enabled = true gpuManager, _ := NewManager(gpuConfig) gpuIndex := NewClusterIndex(gpuManager, nil, nil) for i, emb := range embeddings { gpuIndex.Add(fmt.Sprintf("node-%d", i), emb) } gpuStart := time.Now() gpuIndex.Cluster() gpuTime := time.Since(gpuStart) // GPU should be at least 10x faster t.Logf("CPU: %v, GPU: %v, Speedup: %.1fx", cpuTime, gpuTime, float64(cpuTime)/float64(gpuTime)) if gpuManager.IsEnabled() { assert.Less(t, gpuTime, cpuTime/10) } } func TestOptimalK(t *testing.T) { tests := []struct { n int expected int }{ {100, 10}, // Minimum {200, 10}, // sqrt(100) = 10 {10000, 70}, // sqrt(5000) ≈ 70 {1000000, 707}, // sqrt(500000) ≈ 707 {5000000, 1000}, // Maximum cap } for _, tt := range tests { t.Run(fmt.Sprintf("n=%d", tt.n), func(t *testing.T) { k := optimalK(tt.n) assert.Equal(t, tt.expected, k) }) } } func TestFindNearestCentroid(t *testing.T) { manager, _ := NewManager(DefaultConfig()) index := NewClusterIndex(manager, nil, &KMeansConfig{NumClusters: 5}) // Create 5 distinct clusters for cluster := 0; cluster < 5; cluster++ { for i := 0; i < 100; i++ { emb := make([]float32, 128) // Each cluster has a distinct pattern for d := 0; d < 128; d++ { emb[d] = float32(cluster) + rand.Float32()*0.1 } index.Add(fmt.Sprintf("c%d-n%d", cluster, i), emb) } } require.NoError(t, index.Cluster()) // Query should find correct cluster query := make([]float32, 128) for d := 0; d < 128; d++ { query[d] = 2.5 // Should match cluster 2 or 3 } clusterID := index.FindNearestCentroid(query) assert.True(t, clusterID == 2 || clusterID == 3, "Expected cluster 2 or 3, got %d", clusterID) } // Helper: generate clustered test data func generateClusteredData(n, k, dims int) [][]float32 { data := make([][]float32, n) nodesPerCluster := n / k for i := 0; i < n; i++ { cluster := i / nodesPerCluster data[i] = make([]float32, dims) for d := 0; d < dims; d++ { // Each cluster centered around different values center := float32(cluster * 10) data[i][d] = center + rand.Float32()*2 - 1 } } return data } // Helper: generate random data func generateRandomData(n, dims int) [][]float32 { data := make([][]float32, n) for i := 0; i < n; i++ { data[i] = make([]float32, dims) for d := 0; d < dims; d++ { data[i][d] = rand.Float32() } } return data } ``` #### 4.2 Integration Tests for ClusterIntegration (pkg/inference/cluster_integration_test.go) ```go // pkg/inference/cluster_integration_test.go package inference import ( "context" "fmt" "testing" "time" "github.com/orneryd/nornicdb/pkg/gpu" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestClusterIntegration_Creation(t *testing.T) { manager, _ := gpu.NewManager(gpu.DefaultConfig()) // With defaults ci := NewClusterIntegration(manager, nil) assert.NotNil(t, ci) assert.False(t, ci.config.Enabled) // Opt-in by default // With custom config config := DefaultClusterConfig() config.Enabled = true config.ExpansionFactor = 5 ci = NewClusterIntegration(manager, config) assert.True(t, ci.config.Enabled) assert.Equal(t, 5, ci.config.ExpansionFactor) } func TestClusterIntegration_SearchFallback(t *testing.T) { manager, _ := gpu.NewManager(gpu.DefaultConfig()) config := DefaultClusterConfig() config.Enabled = false // Disabled, should use fallback ci := NewClusterIntegration(manager, config) // Set up a mock fallback search fallbackCalled := false ci.SetFallbackSearch(func(ctx context.Context, embedding []float32, k int) ([]SimilarityResult, error) { fallbackCalled = true return []SimilarityResult{{ID: "test", Score: 0.9}}, nil }) // Search should use fallback results, err := ci.Search(context.Background(), []float32{0.1, 0.2}, 10) require.NoError(t, err) assert.True(t, fallbackCalled) assert.Len(t, results, 1) } func TestClusterIntegration_SearchWithClusters(t *testing.T) { manager, _ := gpu.NewManager(gpu.DefaultConfig()) config := DefaultClusterConfig() config.Enabled = true config.ExpansionFactor = 2 ci := NewClusterIntegration(manager, config) // Add test embeddings dims := 128 for i := 0; i < 1000; i++ { emb := make([]float32, dims) for d := 0; d < dims; d++ { emb[d] = float32(i%10) + float32(d)*0.01 } ci.AddEmbedding(fmt.Sprintf("node-%d", i), emb) } // Trigger clustering require.NoError(t, ci.Cluster()) assert.True(t, ci.IsClustered()) // Search should use clusters query := make([]float32, dims) for d := 0; d < dims; d++ { query[d] = 5.0 + float32(d)*0.01 // Similar to node-5, node-15, etc. } results, err := ci.Search(context.Background(), query, 10) require.NoError(t, err) assert.LessOrEqual(t, len(results), 10) } func TestEngine_SetClusterIntegration(t *testing.T) { engine := New(DefaultConfig()) // Initially no cluster integration assert.Nil(t, engine.GetClusterIntegration()) // Set up original similarity search originalCalled := false engine.SetSimilaritySearch(func(ctx context.Context, embedding []float32, k int) ([]SimilarityResult, error) { originalCalled = true return []SimilarityResult{{ID: "original", Score: 0.8}}, nil }) // Add cluster integration manager, _ := gpu.NewManager(gpu.DefaultConfig()) config := DefaultClusterConfig() config.Enabled = false // Start disabled ci := NewClusterIntegration(manager, config) engine.SetClusterIntegration(ci) // Cluster integration should be set assert.NotNil(t, engine.GetClusterIntegration()) // The fallback should be wired up results, err := ci.Search(context.Background(), []float32{0.1}, 5) require.NoError(t, err) assert.True(t, originalCalled, "Original search should be called as fallback") assert.Equal(t, "original", results[0].ID) } func TestEngine_OnIndexComplete(t *testing.T) { engine := New(DefaultConfig()) manager, _ := gpu.NewManager(gpu.DefaultConfig()) config := DefaultClusterConfig() config.Enabled = true config.AutoClusterOnIndex = true config.MinEmbeddings = 100 // Low threshold for test ci := NewClusterIntegration(manager, config) // Add enough embeddings for i := 0; i < 150; i++ { emb := make([]float32, 64) for d := 0; d < 64; d++ { emb[d] = float32(i) * 0.01 } ci.AddEmbedding(fmt.Sprintf("node-%d", i), emb) } engine.SetClusterIntegration(ci) // Initially not clustered assert.False(t, ci.IsClustered()) // Trigger OnIndexComplete err := engine.OnIndexComplete(context.Background()) require.NoError(t, err) // Wait for background clustering time.Sleep(100 * time.Millisecond) // Now should be clustered assert.True(t, ci.IsClustered()) stats := ci.Stats() assert.Greater(t, stats.NumClusters, 0) } func TestEngine_OnIndexComplete_BelowThreshold(t *testing.T) { engine := New(DefaultConfig()) manager, _ := gpu.NewManager(gpu.DefaultConfig()) config := DefaultClusterConfig() config.Enabled = true config.AutoClusterOnIndex = true config.MinEmbeddings = 1000 // High threshold ci := NewClusterIntegration(manager, config) // Add fewer embeddings than threshold for i := 0; i < 50; i++ { ci.AddEmbedding(fmt.Sprintf("node-%d", i), make([]float32, 64)) } engine.SetClusterIntegration(ci) // Trigger OnIndexComplete err := engine.OnIndexComplete(context.Background()) require.NoError(t, err) // Wait a bit time.Sleep(50 * time.Millisecond) // Should NOT be clustered (below threshold) assert.False(t, ci.IsClustered()) } func TestClusterIntegration_Stats(t *testing.T) { manager, _ := gpu.NewManager(gpu.DefaultConfig()) ci := NewClusterIntegration(manager, nil) // Add embeddings for i := 0; i < 500; i++ { emb := make([]float32, 128) for d := 0; d < 128; d++ { emb[d] = float32(i%5) * 0.1 } ci.AddEmbedding(fmt.Sprintf("node-%d", i), emb) } ci.Cluster() stats := ci.Stats() assert.Greater(t, stats.EmbeddingCount, 0) assert.Greater(t, stats.NumClusters, 0) assert.Greater(t, stats.LastClusterTime, time.Duration(0)) } ``` #### 4.3 End-to-End Test: MCP Server Integration ```go // pkg/mcp/cluster_handler_test.go package mcp import ( "context" "testing" "github.com/orneryd/nornicdb/pkg/gpu" "github.com/orneryd/nornicdb/pkg/inference" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestMCPServer_ClusterEmbeddings(t *testing.T) { // Set up server with clustering enabled gpuManager, _ := gpu.NewManager(gpu.DefaultConfig()) clusterConfig := inference.DefaultClusterConfig() clusterConfig.Enabled = true ci := inference.NewClusterIntegration(gpuManager, clusterConfig) inferEngine := inference.New(inference.DefaultConfig()) inferEngine.SetClusterIntegration(ci) server := NewServer(inferEngine) // Add test embeddings via MCP for i := 0; i < 200; i++ { emb := make([]float32, 128) for d := 0; d < 128; d++ { emb[d] = float32(i%4) + float32(d)*0.001 } ci.AddEmbedding(fmt.Sprintf("test-%d", i), emb) } // Call cluster_embeddings tool result, err := server.handleClusterEmbeddings(context.Background(), map[string]interface{}{ "num_clusters": 4, }) require.NoError(t, err) // Verify clustering happened assert.True(t, ci.IsClustered()) stats := ci.Stats() assert.Equal(t, 4, stats.NumClusters) } func TestMCPServer_VectorSearchWithClusters(t *testing.T) { // Set up server with clustering enabled gpuManager, _ := gpu.NewManager(gpu.DefaultConfig()) clusterConfig := inference.DefaultClusterConfig() clusterConfig.Enabled = true clusterConfig.ExpansionFactor = 2 ci := inference.NewClusterIntegration(gpuManager, clusterConfig) inferEngine := inference.New(inference.DefaultConfig()) inferEngine.SetClusterIntegration(ci) server := NewServer(inferEngine) // Add and cluster embeddings for i := 0; i < 1000; i++ { emb := make([]float32, 256) cluster := i / 250 for d := 0; d < 256; d++ { emb[d] = float32(cluster*10) + float32(d)*0.001 } ci.AddEmbedding(fmt.Sprintf("doc-%d", i), emb) } ci.Cluster() // Search should use clusters transparently query := make([]float32, 256) for d := 0; d < 256; d++ { query[d] = 10.0 + float32(d)*0.001 // Similar to cluster 1 } results, err := server.handleVectorSearch(context.Background(), map[string]interface{}{ "query": query, "limit": 10, "use_clusters": true, }) require.NoError(t, err) // Results should be from cluster 1 (docs 250-499) for _, r := range results { // Verify results are in the expected range assert.Contains(t, r.ID, "doc-") } } func TestMCPServer_IndexFolderTriggersCluster(t *testing.T) { gpuManager, _ := gpu.NewManager(gpu.DefaultConfig()) clusterConfig := inference.DefaultClusterConfig() clusterConfig.Enabled = true clusterConfig.AutoClusterOnIndex = true clusterConfig.MinEmbeddings = 50 ci := inference.NewClusterIntegration(gpuManager, clusterConfig) inferEngine := inference.New(inference.DefaultConfig()) inferEngine.SetClusterIntegration(ci) server := NewServer(inferEngine) // Simulate indexing files (would normally come from index_folder) for i := 0; i < 100; i++ { emb := make([]float32, 128) ci.AddEmbedding(fmt.Sprintf("file-%d", i), emb) } // Simulate index completion err := inferEngine.OnIndexComplete(context.Background()) require.NoError(t, err) // Wait for background clustering time.Sleep(200 * time.Millisecond) // Clustering should have been triggered assert.True(t, ci.IsClustered()) } ``` #### 4.4 Benchmark Suite ```go func BenchmarkKMeansCluster(b *testing.B) { sizes := []int{1000, 10000, 100000} clusters := []int{10, 100, 500} dimensionSizes := []int{768, 1024, 1536} // Test common embedding model dimensions for _, dims := range dimensionSizes { for _, n := range sizes { for _, k := range clusters { b.Run(fmt.Sprintf("N=%d_K=%d_D=%d", n, k, dims), func(b *testing.B) { embeddings := generateRandomData(n, dims) index := setupClusterIndex(embeddings, k) // Dimensions auto-detected b.ResetTimer() for i := 0; i < b.N; i++ { index.Cluster() } }) } } } } func BenchmarkClusterSearch(b *testing.B) { // Compare cluster-accelerated search vs brute-force // Dimensions auto-detected from indexed embeddings index := setupClusteredIndex(100000, 500) dims := index.Dimensions() // Get auto-detected dimensions query := randomEmbedding(dims) b.Run("ClusterSearch", func(b *testing.B) { for i := 0; i < b.N; i++ { index.SearchWithClusters(query, 10, 3) // Expand to 3 clusters } }) b.Run("BruteForceSearch", func(b *testing.B) { for i := 0; i < b.N; i++ { index.Search(query, 10) // Full vector scan } }) } func BenchmarkClusterIntegration_Search(b *testing.B) { manager, _ := gpu.NewManager(gpu.DefaultConfig()) config := inference.DefaultClusterConfig() config.Enabled = true config.ExpansionFactor = 3 ci := inference.NewClusterIntegration(manager, config) // Add 100K embeddings dims := 1024 for i := 0; i < 100000; i++ { emb := make([]float32, dims) for d := 0; d < dims; d++ { emb[d] = rand.Float32() } ci.AddEmbedding(fmt.Sprintf("node-%d", i), emb) } ci.Cluster() query := make([]float32, dims) for d := 0; d < dims; d++ { query[d] = rand.Float32() } b.ResetTimer() for i := 0; i < b.N; i++ { ci.Search(context.Background(), query, 10) } } ``` --- ## Part 3: Configuration ### 3.1 YAML Configuration ```yaml # nornicdb.example.yaml (additions) clustering: # Enable k-means clustering enabled: true # Cluster configuration num_clusters: 0 # 0 = auto-detect # Note: dimensions are auto-detected from embeddings, no config needed max_iterations: 100 tolerance: 0.0001 init_method: "kmeans++" # "kmeans++" or "random" # Auto-clustering triggers auto_cluster_on_index: true auto_cluster_threshold: 1000 # Min embeddings before clustering # Real-time updates (3-tier system) realtime: enabled: true batch_size: 100 # Tier 2 batch threshold recluster_threshold: 10000 # Tier 3 update threshold recluster_interval: "1h" # Tier 3 time threshold drift_threshold: 0.1 # Tier 3 drift threshold # GPU settings gpu: enabled: true device: "auto" # "auto", "metal", "cuda" max_memory_mb: 128 # 80-120MB for 10K embeddings, scale accordingly ``` ### 3.2 Environment Variables ```bash # Enable clustering export NORNICDB_CLUSTERING_ENABLED=true # Cluster count (0 = auto) export NORNICDB_CLUSTERING_NUM_CLUSTERS=500 # Note: Embedding dimensions are auto-detected from the data # No configuration needed - works with any model (768, 1024, 1536, etc.) # GPU device export NORNICDB_CLUSTERING_DEVICE=auto # Real-time settings export NORNICDB_CLUSTERING_REALTIME_ENABLED=true export NORNICDB_CLUSTERING_BATCH_SIZE=100 export NORNICDB_CLUSTERING_RECLUSTER_THRESHOLD=10000 ``` --- ## Part 4: Timeline & Resources ### Timeline | Phase | Duration | Dependencies | Deliverables | | -------------------- | --------------- | ------------ | --------------------------------------------- | | Phase 1: Foundation | 2-3 weeks | None | CPU k-means, Metal kernel, basic ClusterIndex | | Phase 2: Integration | 3-4 weeks | Phase 1 | ClusterIntegration, MCP tools, Engine wiring | | Phase 3: Real-Time | 3-4 weeks | Phase 2 | 3-tier system, maintenance workers | | Phase 4: Testing | 2 weeks | Phase 3 | Unit tests, integration tests, benchmarks | | **Total** | **10-13 weeks** | | | > **Note**: Timeline increased from original 9-12 weeks to 10-13 weeks to account for: > > - Proper inference engine integration (ClusterIntegration pattern) > - Metal atomic float emulation > - Comprehensive integration tests ### Resource Requirements **Development:** - 1 Go developer familiar with GPU/Metal/CUDA - Access to macOS (for Metal testing) and Linux with NVIDIA GPU (for CUDA testing) **Hardware for Testing:** - MacBook with M1/M2/M3 (Metal) - Linux box with RTX 3080+ (CUDA) - CI runners for both platforms **Dependencies:** - No new Go dependencies (Metal/CUDA already wrapped) - Metal shader compiler (Xcode) - CUDA toolkit 11.x+ (for NVIDIA testing) --- ## Part 5: Risk Assessment ### High Risk | Risk | Mitigation | | ------------------------------------- | -------------------------------------------------- | | GPU kernel bugs causing crashes | Extensive CPU fallback, fuzzing tests | | Performance regression on CPU path | Benchmark gates in CI | | Cluster quality degradation over time | Automated quality monitoring, forced re-clustering | ### Medium Risk | Risk | Mitigation | | --------------------------------- | ------------------------------------------------ | | Memory pressure on large datasets | Configurable memory limits, streaming clustering | | Lock contention under high load | Consider lock-free structures in Phase 4 | | Cross-platform GPU issues | Comprehensive platform testing matrix | ### Low Risk | Risk | Mitigation | | ------------------------ | --------------------------------------- | | API breaking changes | Version MCP tools, deprecation warnings | | Configuration complexity | Sensible defaults, documentation | --- ## Part 6: Success Metrics ### Performance Targets | Metric | Target | Measurement | | --------------------------------------- | ------------------- | ----------- | | Clustering 10K embeddings | <100ms (GPU) | Benchmark | | Clustering 100K embeddings | <1s (GPU) | Benchmark | | Single-node reassignment | <1ms | Benchmark | | Related-document lookup | <10ms | E2E test | | Search speedup (cluster vs brute-force) | >10x for 100K nodes | Benchmark | ### Quality Targets | Metric | Target | Measurement | | ------------------------------- | ------ | ------------- | | Cluster purity (synthetic data) | >0.85 | Unit test | | Search recall@10 vs brute-force | >0.95 | Benchmark | | CPU fallback coverage | 100% | Test coverage | --- ## Conclusion GPU k-means clustering is a valuable enhancement for Mimir/NornicDB that can dramatically improve related-document discovery and semantic search performance. The existing GPU infrastructure in `pkg/gpu/` provides a solid foundation. **Key Recommendations:** 1. **Start with CPU k-means** - Get the algorithm right before optimizing 2. **Metal first, CUDA second** - macOS is the primary development platform 3. **Prioritize the hybrid search flow** - This delivers the most user value 4. **Don't over-engineer real-time updates initially** - Start with Tier 1 only **Next Steps:** 1. Review this plan with the team 2. Create tracking issues for each phase 3. Begin Phase 1 implementation --- ## Appendix A: Architecture Validation & Fixes This section documents the validation performed against the actual NornicDB codebase and fixes applied to ensure architectural soundness. ### A.1 Validation Summary | Claim | Status | Evidence | | -------------------------------------------------- | ----------- | -------------------------------------------------------------------- | | pkg/gpu/ has multi-backend support | ✅ Verified | `nornicdb/pkg/gpu/metal/`, `cuda/`, `opencl/`, `vulkan/` directories | | EmbeddingIndex exists with GPU acceleration | ✅ Verified | Lines 844-872 in gpu.go | | Metal shaders use `atomic_uint` not `atomic_float` | ✅ Verified | `shaders_darwin.metal` uses `atomic_uint`, `atomic_int` only | | Inference Engine uses dependency injection | ✅ Verified | `SetSimilaritySearch(fn)` pattern in `inference.go` | | TopologyIntegration pattern exists | ✅ Verified | `topology_integration.go` shows proper integration pattern | ### A.2 Issues Fixed in This Plan #### Issue 1: Inference Engine Integration Mismatch **Problem**: Original plan showed direct `clusterIndex` field on Engine: ```go // WRONG - Original plan func (e *InferenceEngine) OnIndexComplete(ctx context.Context) error { if e.clusterIndex != nil { ... } // ❌ No such field! } ``` **Fix**: Created `ClusterIntegration` following the `TopologyIntegration` pattern: ```go // CORRECT - This plan type ClusterIntegration struct { clusterIndex *gpu.ClusterIndex fallbackSearch func(...) // Wraps original search config *ClusterConfig } func (e *Engine) SetClusterIntegration(ci *ClusterIntegration) { // Wire up properly via existing patterns } ``` #### Issue 2: Metal `atomic_float` Does Not Exist **Problem**: Original kernels used `atomic_float` which doesn't exist in Metal: ```metal // WRONG - Original plan device atomic_float* centroid_sums // ❌ No such type in Metal! ``` **Fix**: Implemented atomic float emulation using compare-exchange: ```metal // CORRECT - This plan inline void atomicAddFloat(device atomic_uint* addr, float val) { uint expected = atomic_load_explicit(addr, memory_order_relaxed); uint desired; do { float current = as_type<float>(expected); desired = as_type<uint>(current + val); } while (!atomic_compare_exchange_weak_explicit( addr, &expected, desired, memory_order_relaxed, memory_order_relaxed )); } ``` #### Issue 3: Missing Wiring Path **Problem**: Original plan didn't show how components connect at startup. **Fix**: Added complete wiring diagram (Section 2.3) showing: - `main.go` → GPU Manager → ClusterIntegration → Inference Engine → MCP Server - Who calls `OnIndexComplete()` and when - How MCP tools access clustering #### Issue 4: Incomplete Tests **Problem**: Original tests only covered GPU layer, not integration. **Fix**: Added three test categories: 1. **Unit Tests** (`kmeans_test.go`) - ClusterIndex functionality 2. **Integration Tests** (`cluster_integration_test.go`) - Engine + ClusterIntegration 3. **E2E Tests** (`cluster_handler_test.go`) - MCP Server + full stack ### A.3 Memory Estimate Correction **Original Claim**: 40MB for 10K embeddings **Actual Calculation**: ``` 10,000 embeddings × 1024 dims × 4 bytes = 40MB embeddings + 10,000 × 500 clusters × 4 bytes distances = 20MB + 500 clusters × 1024 dims × 4 bytes centroids = 2MB + 10,000 × 4 bytes assignments = 40KB + GPU command buffers (~1-2MB) = ~63MB minimum (80-120MB recommended) ``` **Updated Recommendation**: Configure `clustering.gpu.max_memory_mb: 128` for 10K embeddings. ### A.4 CUDA Kernel Status **Note**: This plan provides Metal kernels only. CUDA implementation options: 1. **CPU Fallback** (Recommended initially): CUDA platforms use CPU k-means 2. **CUDA Kernels** (Phase 2+): Port Metal kernels to CUDA 3. **cuML Integration** (Alternative): Use NVIDIA's cuML library (requires Python bridge) For production, recommend starting with CPU fallback for CUDA platforms while Metal provides GPU acceleration on macOS. --- _Document Version: 2.0_ _Last Updated: November 2025_ _Based on: K-MEANS.md, K-MEANS-RT.md, NornicDB source analysis, and architecture validation_

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server