Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
topology_integration.go15.9 kB
// Integration between topological link prediction and semantic inference. // // This file bridges the gap between: // - pkg/linkpredict (topological algorithms) // - pkg/inference (semantic/behavioral inference) // // Provides unified edge suggestion API that combines both approaches. // // OPTIMIZATIONS (v2): // - Uses streaming graph construction (fixes memory spikes) // - Parallel edge fetching (4-8x speedup) // - Disk-based graph caching (avoids rebuilds) // - Incremental updates (delta changes) // - Context cancellation support // - Progress callbacks package inference import ( "context" "sync" "sync/atomic" "time" "github.com/orneryd/nornicdb/pkg/linkpredict" "github.com/orneryd/nornicdb/pkg/storage" ) // TopologyConfig controls topological link prediction integration. // // This allows the inference engine to incorporate graph structure signals // alongside semantic similarity, co-access, and temporal patterns. // // Example: // // config := &inference.TopologyConfig{ // Enabled: true, // Algorithm: "adamic_adar", // TopK: 10, // MinScore: 0.3, // Weight: 0.4, // 40% weight vs 60% semantic // } type TopologyConfig struct { // Enable topological link prediction Enabled bool // Algorithm to use: "adamic_adar", "jaccard", "common_neighbors", // "resource_allocation", "preferential_attachment", or "ensemble" Algorithm string // TopK results to consider from topological algorithm TopK int // Minimum score threshold for topology predictions MinScore float64 // Weight for topology score in hybrid mode (0.0-1.0) // Semantic weight is (1.0 - Weight) Weight float64 // GraphRefreshInterval: how often to rebuild graph from storage // Zero means rebuild on every prediction (safe but slow) GraphRefreshInterval int // number of predictions before refresh // CachePath is the directory for persisting cached graphs. // Empty string disables disk caching. CachePath string // CacheTTL is how long a cached graph is valid. // Default: 1 hour CacheTTL time.Duration // BuildTimeout is the maximum time for graph building. // Default: 5 minutes BuildTimeout time.Duration // WorkerCount controls parallel edge fetching. // Default: runtime.NumCPU() WorkerCount int // ChunkSize controls streaming construction. // Default: 1000 ChunkSize int // ProgressCallback is called during graph building. // Optional. ProgressCallback func(processed, total int, elapsed time.Duration) } // DefaultTopologyConfig returns sensible defaults for topology integration. func DefaultTopologyConfig() *TopologyConfig { return &TopologyConfig{ Enabled: false, // Opt-in Algorithm: "adamic_adar", TopK: 10, MinScore: 0.3, Weight: 0.4, // 40% topology, 60% semantic GraphRefreshInterval: 100, // Rebuild every 100 predictions CachePath: "", // Disabled by default CacheTTL: time.Hour, BuildTimeout: 5 * time.Minute, WorkerCount: 0, // Use default (NumCPU) ChunkSize: 1000, ProgressCallback: nil, } } // TopologyIntegration adds topological link prediction to the inference engine. // // This is an optional extension that can be enabled to incorporate graph // structure signals into edge suggestions. When enabled, suggestions combine: // - Semantic similarity (embeddings) // - Co-access patterns // - Temporal proximity // - Graph topology (NEW) // // OPTIMIZATIONS: // - Streaming graph construction with chunked processing // - Parallel edge fetching with worker pool // - Disk-based caching (gob serialization) // - Incremental updates via delta changes // - Context cancellation and timeout support // // Example: // // engine := inference.New(inference.DefaultConfig()) // // // Enable topology integration with caching // topoConfig := inference.DefaultTopologyConfig() // topoConfig.Enabled = true // topoConfig.CachePath = "/tmp/nornicdb/cache" // topoConfig.Weight = 0.5 // Equal weight // // topo := inference.NewTopologyIntegration(storageEngine, topoConfig) // engine.SetTopologyIntegration(topo) // // // Now OnStore() suggestions include topology signals // suggestions, _ := engine.OnStore(ctx, nodeID, embedding) type TopologyIntegration struct { config *TopologyConfig storage storage.Engine // Graph builder with optimizations builder *linkpredict.GraphBuilder // Cached graph for performance cachedGraph linkpredict.Graph predictionCount int64 graphMu sync.RWMutex // Stats lastBuildTime time.Duration totalBuildTime time.Duration buildsCompleted int64 predictionsRun int64 // Delta tracking for incremental updates pendingDelta *linkpredict.GraphDelta deltaMu sync.Mutex } // NewTopologyIntegration creates a new topology integration. // // Parameters: // - storage: Storage engine to build graph from // - config: Topology configuration (nil uses defaults) // // Returns ready-to-use integration that can be attached to inference engine. func NewTopologyIntegration(storage storage.Engine, config *TopologyConfig) *TopologyIntegration { if config == nil { config = DefaultTopologyConfig() } // Create optimized graph builder buildConfig := &linkpredict.BuildConfig{ ChunkSize: config.ChunkSize, WorkerCount: config.WorkerCount, Undirected: true, // Most graphs are undirected for link prediction GCAfterChunk: true, ProgressCallback: config.ProgressCallback, CachePath: config.CachePath, CacheTTL: config.CacheTTL, } return &TopologyIntegration{ config: config, storage: storage, builder: linkpredict.NewGraphBuilder(storage, buildConfig), pendingDelta: &linkpredict.GraphDelta{}, } } // SuggestTopological generates edge suggestions using graph topology. // // This method: // 1. Builds/refreshes graph from storage if needed (with caching) // 2. Runs configured topological algorithm // 3. Converts results to EdgeSuggestion format // 4. Returns suggestions compatible with inference engine // // Parameters: // - ctx: Context for cancellation // - sourceID: Node to predict edges from // // Returns: // - Slice of EdgeSuggestion with Method="topology_*" // - Error if graph building or prediction fails // // Example: // // topo := NewTopologyIntegration(storage, config) // suggestions, err := topo.SuggestTopological(ctx, "node-123") // for _, sug := range suggestions { // fmt.Printf("%s: %.3f (%s)\n", sug.TargetID, sug.Confidence, sug.Method) // } func (t *TopologyIntegration) SuggestTopological(ctx context.Context, sourceID string) ([]EdgeSuggestion, error) { // Check both config and feature flag if !t.config.Enabled { return nil, nil } // Check for nil storage early if t.storage == nil { return nil, nil } // Apply timeout if configured if t.config.BuildTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, t.config.BuildTimeout) defer cancel() } // Rebuild graph if needed if err := t.ensureGraph(ctx); err != nil { return nil, err } // Increment prediction counters (atomic) atomic.AddInt64(&t.predictionsRun, 1) atomic.AddInt64(&t.predictionCount, 1) // Get read lock for algorithm execution t.graphMu.RLock() defer t.graphMu.RUnlock() if t.cachedGraph == nil { return nil, nil } // Run topological algorithm var predictions []linkpredict.Prediction sourceNodeID := storage.NodeID(sourceID) switch t.config.Algorithm { case "adamic_adar": predictions = linkpredict.AdamicAdar(t.cachedGraph, sourceNodeID, t.config.TopK) case "jaccard": predictions = linkpredict.Jaccard(t.cachedGraph, sourceNodeID, t.config.TopK) case "common_neighbors": predictions = linkpredict.CommonNeighbors(t.cachedGraph, sourceNodeID, t.config.TopK) case "resource_allocation": predictions = linkpredict.ResourceAllocation(t.cachedGraph, sourceNodeID, t.config.TopK) case "preferential_attachment": predictions = linkpredict.PreferentialAttachment(t.cachedGraph, sourceNodeID, t.config.TopK) default: // Default to Adamic-Adar (best all-around) predictions = linkpredict.AdamicAdar(t.cachedGraph, sourceNodeID, t.config.TopK) } // Convert to EdgeSuggestion format // Scores are already normalized to [0, 1] by the algorithm implementations suggestions := make([]EdgeSuggestion, 0, len(predictions)) for _, pred := range predictions { if pred.Score < t.config.MinScore { continue } suggestions = append(suggestions, EdgeSuggestion{ SourceID: sourceID, TargetID: string(pred.TargetID), Type: "RELATES_TO", Confidence: pred.Score, Reason: pred.Reason, Method: "topology_" + pred.Algorithm, }) } return suggestions, nil } // ensureGraph builds or refreshes the cached graph if needed. func (t *TopologyIntegration) ensureGraph(ctx context.Context) error { t.graphMu.RLock() needsBuild := t.cachedGraph == nil || atomic.LoadInt64(&t.predictionCount) >= int64(t.config.GraphRefreshInterval) t.graphMu.RUnlock() if !needsBuild { // Check if there are pending deltas to apply t.deltaMu.Lock() hasDelta := len(t.pendingDelta.AddedNodes) > 0 || len(t.pendingDelta.RemovedNodes) > 0 || len(t.pendingDelta.AddedEdges) > 0 || len(t.pendingDelta.RemovedEdges) > 0 t.deltaMu.Unlock() if hasDelta { return t.applyPendingDelta() } return nil } // Need to rebuild - acquire write lock t.graphMu.Lock() defer t.graphMu.Unlock() // Double-check after acquiring lock if t.cachedGraph != nil && atomic.LoadInt64(&t.predictionCount) < int64(t.config.GraphRefreshInterval) { return nil } startTime := time.Now() // Use optimized builder graph, err := t.builder.Build(ctx) if err != nil { return err } t.cachedGraph = graph atomic.StoreInt64(&t.predictionCount, 0) // Clear pending delta since we just rebuilt t.deltaMu.Lock() t.pendingDelta = &linkpredict.GraphDelta{} t.deltaMu.Unlock() // Update stats buildTime := time.Since(startTime) t.lastBuildTime = buildTime t.totalBuildTime += buildTime atomic.AddInt64(&t.buildsCompleted, 1) return nil } // applyPendingDelta applies incremental updates to the cached graph. func (t *TopologyIntegration) applyPendingDelta() error { t.graphMu.Lock() defer t.graphMu.Unlock() t.deltaMu.Lock() delta := t.pendingDelta t.pendingDelta = &linkpredict.GraphDelta{} t.deltaMu.Unlock() if t.cachedGraph == nil { return nil // No graph to update } // Apply delta t.cachedGraph = t.builder.ApplyDelta(t.cachedGraph, delta) return nil } // OnNodeAdded notifies the integration of a new node. // Call this when a node is created to enable incremental updates. func (t *TopologyIntegration) OnNodeAdded(nodeID storage.NodeID) { t.deltaMu.Lock() defer t.deltaMu.Unlock() t.pendingDelta.AddedNodes = append(t.pendingDelta.AddedNodes, nodeID) } // OnNodeRemoved notifies the integration of a removed node. func (t *TopologyIntegration) OnNodeRemoved(nodeID storage.NodeID) { t.deltaMu.Lock() defer t.deltaMu.Unlock() t.pendingDelta.RemovedNodes = append(t.pendingDelta.RemovedNodes, nodeID) } // OnEdgeAdded notifies the integration of a new edge. func (t *TopologyIntegration) OnEdgeAdded(from, to storage.NodeID) { t.deltaMu.Lock() defer t.deltaMu.Unlock() t.pendingDelta.AddedEdges = append(t.pendingDelta.AddedEdges, linkpredict.EdgeChange{ From: from, To: to, }) } // OnEdgeRemoved notifies the integration of a removed edge. func (t *TopologyIntegration) OnEdgeRemoved(from, to storage.NodeID) { t.deltaMu.Lock() defer t.deltaMu.Unlock() t.pendingDelta.RemovedEdges = append(t.pendingDelta.RemovedEdges, linkpredict.EdgeChange{ From: from, To: to, }) } // CombinedSuggestions blends semantic and topological suggestions. // // This method: // 1. Gets semantic suggestions (from existing inference engine) // 2. Gets topological suggestions (from this integration) // 3. Merges and ranks by weighted score // 4. Removes duplicates (keeping highest scored) // // Parameters: // - semantic: Suggestions from semantic inference // - topological: Suggestions from topology integration // // Returns merged and ranked suggestions. // // Example: // // semantic := engine.OnStore(ctx, nodeID, embedding) // existing // topological, _ := topo.SuggestTopological(ctx, nodeID) // new // // combined := topo.CombinedSuggestions(semantic, topological) // for _, sug := range combined { // if sug.Confidence >= 0.7 { // createEdge(sug) // } // } func (t *TopologyIntegration) CombinedSuggestions(semantic, topological []EdgeSuggestion) []EdgeSuggestion { if !t.config.Enabled || len(topological) == 0 { return semantic } // Build map of all suggestions by target suggestionMap := make(map[string]EdgeSuggestion) // Add semantic suggestions with semantic weight semanticWeight := 1.0 - t.config.Weight for _, sug := range semantic { sug.Confidence *= semanticWeight suggestionMap[sug.TargetID] = sug } // Add or merge topological suggestions for _, sug := range topological { sug.Confidence *= t.config.Weight if existing, exists := suggestionMap[sug.TargetID]; exists { // Merge: combine scores and reasons existing.Confidence += sug.Confidence existing.Reason = existing.Reason + " + " + sug.Reason existing.Method = existing.Method + ",topology" suggestionMap[sug.TargetID] = existing } else { suggestionMap[sug.TargetID] = sug } } // Convert back to slice combined := make([]EdgeSuggestion, 0, len(suggestionMap)) for _, sug := range suggestionMap { combined = append(combined, sug) } // Sort by confidence descending sortByConfidence(combined) return combined } // InvalidateCache forces graph rebuild on next prediction. // // Call this when the graph structure changes significantly (e.g., batch import, // node/edge deletion, schema changes). func (t *TopologyIntegration) InvalidateCache() { t.graphMu.Lock() t.cachedGraph = nil atomic.StoreInt64(&t.predictionCount, 0) t.graphMu.Unlock() // Clear pending delta t.deltaMu.Lock() t.pendingDelta = &linkpredict.GraphDelta{} t.deltaMu.Unlock() // Also invalidate disk cache if t.builder != nil { t.builder.InvalidateCache() } } // Stats returns topology integration statistics. func (t *TopologyIntegration) Stats() TopologyStats { t.graphMu.RLock() nodeCount := 0 edgeCount := 0 if t.cachedGraph != nil { nodeCount = len(t.cachedGraph) for _, neighbors := range t.cachedGraph { edgeCount += len(neighbors) } } t.graphMu.RUnlock() t.deltaMu.Lock() pendingChanges := len(t.pendingDelta.AddedNodes) + len(t.pendingDelta.RemovedNodes) + len(t.pendingDelta.AddedEdges) + len(t.pendingDelta.RemovedEdges) t.deltaMu.Unlock() builderStats := t.builder.Stats() return TopologyStats{ GraphNodeCount: nodeCount, GraphEdgeCount: edgeCount, PredictionsRun: atomic.LoadInt64(&t.predictionsRun), BuildsCompleted: atomic.LoadInt64(&t.buildsCompleted), LastBuildTime: t.lastBuildTime, TotalBuildTime: t.totalBuildTime, PendingChanges: pendingChanges, CacheHits: builderStats.CacheHits, CacheMisses: builderStats.CacheMisses, } } // TopologyStats contains statistics about the topology integration. type TopologyStats struct { GraphNodeCount int GraphEdgeCount int PredictionsRun int64 BuildsCompleted int64 LastBuildTime time.Duration TotalBuildTime time.Duration PendingChanges int CacheHits int64 CacheMisses int64 } func sortByConfidence(suggestions []EdgeSuggestion) { // Simple bubble sort for small slices for i := 0; i < len(suggestions); i++ { for j := i + 1; j < len(suggestions); j++ { if suggestions[j].Confidence > suggestions[i].Confidence { suggestions[i], suggestions[j] = suggestions[j], suggestions[i] } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server