Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
warmup.go17.9 kB
// Package warmup provides APOC database warmup functions. // // This package implements all apoc.warmup.* functions for preloading // data into memory and optimizing database performance. package warmup import ( "sync" "time" "github.com/orneryd/nornicdb/apoc/storage" ) // Node represents a graph node. type Node = storage.Node // Relationship represents a graph relationship. type Relationship = storage.Relationship // Storage is the interface for database operations. var Storage storage.Storage = storage.NewInMemoryStorage() // WarmupCache holds cached data for quick access. type WarmupCache struct { mu sync.RWMutex nodes map[int64]*Node relationships map[int64]*Relationship nodesByLabel map[string][]*Node relsByType map[string][]*Relationship properties map[string][]interface{} queries map[string]interface{} stats CacheStats lastRun *time.Time nextRun *time.Time running bool scheduledCron string } // CacheStats tracks cache statistics. type CacheStats struct { Hits int64 Misses int64 MemoryUsed int64 } // Global cache instance var cache = &WarmupCache{ nodes: make(map[int64]*Node), relationships: make(map[int64]*Relationship), nodesByLabel: make(map[string][]*Node), relsByType: make(map[string][]*Relationship), properties: make(map[string][]interface{}), queries: make(map[string]interface{}), } // Run performs a full database warmup. // // Example: // // apoc.warmup.run() => {nodesLoaded: 1000, relsLoaded: 5000, time: 150} func Run() map[string]interface{} { start := time.Now() cache.mu.Lock() cache.running = true cache.mu.Unlock() defer func() { cache.mu.Lock() cache.running = false now := time.Now() cache.lastRun = &now cache.mu.Unlock() }() nodesLoaded := 0 relsLoaded := 0 propertiesLoaded := 0 // Load all nodes nodes, err := Storage.AllNodes() if err == nil { cache.mu.Lock() for _, node := range nodes { cache.nodes[node.ID] = node nodesLoaded++ propertiesLoaded += len(node.Properties) // Index by label for _, label := range node.Labels { cache.nodesByLabel[label] = append(cache.nodesByLabel[label], node) } } cache.mu.Unlock() } // Load all relationships rels, err := Storage.AllRelationships() if err == nil { cache.mu.Lock() for _, rel := range rels { cache.relationships[rel.ID] = rel relsLoaded++ propertiesLoaded += len(rel.Properties) // Index by type cache.relsByType[rel.Type] = append(cache.relsByType[rel.Type], rel) } cache.mu.Unlock() } elapsed := time.Since(start).Milliseconds() return map[string]interface{}{ "nodesLoaded": nodesLoaded, "relationshipsLoaded": relsLoaded, "propertiesLoaded": propertiesLoaded, "indexesLoaded": len(cache.nodesByLabel) + len(cache.relsByType), "timeTaken": elapsed, } } // RunWithParams performs warmup with parameters. // // Example: // // apoc.warmup.run({labels: ['Person'], loadIndexes: true}) func RunWithParams(params map[string]interface{}) map[string]interface{} { start := time.Now() labels := []string{} if l, ok := params["labels"].([]string); ok { labels = l } types := []string{} if t, ok := params["types"].([]string); ok { types = t } loadIndexes := true if li, ok := params["loadIndexes"].(bool); ok { loadIndexes = li } nodesLoaded := 0 relsLoaded := 0 indexesLoaded := 0 // Load nodes by specific labels if len(labels) > 0 { result := Nodes(labels) nodesLoaded = result["nodesLoaded"].(int) if loadIndexes { indexesLoaded += len(labels) } } // Load relationships by specific types if len(types) > 0 { result := Relationships(types) relsLoaded = result["relationshipsLoaded"].(int) if loadIndexes { indexesLoaded += len(types) } } // If no specific labels/types, load all if len(labels) == 0 && len(types) == 0 { fullResult := Run() return fullResult } elapsed := time.Since(start).Milliseconds() return map[string]interface{}{ "nodesLoaded": nodesLoaded, "relationshipsLoaded": relsLoaded, "indexesLoaded": indexesLoaded, "timeTaken": elapsed, } } // Nodes warms up specific nodes by label. // // Example: // // apoc.warmup.nodes(['Person', 'Company']) => nodes loaded func Nodes(labels []string) map[string]interface{} { start := time.Now() nodesLoaded := 0 nodes, err := Storage.AllNodes() if err != nil { return map[string]interface{}{ "nodesLoaded": 0, "labels": labels, "timeTaken": time.Since(start).Milliseconds(), "error": err.Error(), } } labelSet := make(map[string]bool) for _, label := range labels { labelSet[label] = true } cache.mu.Lock() for _, node := range nodes { for _, label := range node.Labels { if labelSet[label] { cache.nodes[node.ID] = node cache.nodesByLabel[label] = append(cache.nodesByLabel[label], node) nodesLoaded++ break } } } cache.mu.Unlock() elapsed := time.Since(start).Milliseconds() return map[string]interface{}{ "nodesLoaded": nodesLoaded, "labels": labels, "timeTaken": elapsed, } } // Relationships warms up specific relationships by type. // // Example: // // apoc.warmup.relationships(['KNOWS', 'WORKS_AT']) => rels loaded func Relationships(types []string) map[string]interface{} { start := time.Now() relsLoaded := 0 rels, err := Storage.AllRelationships() if err != nil { return map[string]interface{}{ "relationshipsLoaded": 0, "types": types, "timeTaken": time.Since(start).Milliseconds(), "error": err.Error(), } } typeSet := make(map[string]bool) for _, t := range types { typeSet[t] = true } cache.mu.Lock() for _, rel := range rels { if typeSet[rel.Type] { cache.relationships[rel.ID] = rel cache.relsByType[rel.Type] = append(cache.relsByType[rel.Type], rel) relsLoaded++ } } cache.mu.Unlock() elapsed := time.Since(start).Milliseconds() return map[string]interface{}{ "relationshipsLoaded": relsLoaded, "types": types, "timeTaken": elapsed, } } // Indexes warms up indexes (pre-builds label and type indexes). // // Example: // // apoc.warmup.indexes() => indexes loaded func Indexes() map[string]interface{} { start := time.Now() indexesLoaded := 0 // Build node label index nodes, err := Storage.AllNodes() if err == nil { cache.mu.Lock() // Clear existing indexes cache.nodesByLabel = make(map[string][]*Node) for _, node := range nodes { for _, label := range node.Labels { cache.nodesByLabel[label] = append(cache.nodesByLabel[label], node) } } indexesLoaded += len(cache.nodesByLabel) cache.mu.Unlock() } // Build relationship type index rels, err := Storage.AllRelationships() if err == nil { cache.mu.Lock() cache.relsByType = make(map[string][]*Relationship) for _, rel := range rels { cache.relsByType[rel.Type] = append(cache.relsByType[rel.Type], rel) } indexesLoaded += len(cache.relsByType) cache.mu.Unlock() } elapsed := time.Since(start).Milliseconds() return map[string]interface{}{ "indexesLoaded": indexesLoaded, "labelIndexes": len(cache.nodesByLabel), "typeIndexes": len(cache.relsByType), "timeTaken": elapsed, } } // Properties warms up property data by collecting unique values. // // Example: // // apoc.warmup.properties(['name', 'email']) => properties loaded func Properties(keys []string) map[string]interface{} { start := time.Now() propertiesLoaded := 0 keySet := make(map[string]bool) for _, key := range keys { keySet[key] = true } nodes, err := Storage.AllNodes() if err == nil { cache.mu.Lock() for _, node := range nodes { for key, value := range node.Properties { if keySet[key] { cache.properties[key] = append(cache.properties[key], value) propertiesLoaded++ } } } cache.mu.Unlock() } rels, err := Storage.AllRelationships() if err == nil { cache.mu.Lock() for _, rel := range rels { for key, value := range rel.Properties { if keySet[key] { cache.properties[key] = append(cache.properties[key], value) propertiesLoaded++ } } } cache.mu.Unlock() } elapsed := time.Since(start).Milliseconds() return map[string]interface{}{ "propertiesLoaded": propertiesLoaded, "keys": keys, "timeTaken": elapsed, } } // Subgraph warms up a subgraph starting from a node. // // Example: // // apoc.warmup.subgraph(startNode, 3) => subgraph loaded func Subgraph(startNode *Node, depth int) map[string]interface{} { startTime := time.Now() nodesLoaded := 0 relsLoaded := 0 if startNode == nil { return map[string]interface{}{ "nodesLoaded": 0, "relationshipsLoaded": 0, "depth": depth, "timeTaken": time.Since(startTime).Milliseconds(), "error": "start node is nil", } } // BFS to load subgraph visited := make(map[int64]bool) queue := []*Node{startNode} visited[startNode.ID] = true currentDepth := 0 cache.mu.Lock() cache.nodes[startNode.ID] = startNode nodesLoaded++ for len(queue) > 0 && currentDepth < depth { levelSize := len(queue) for i := 0; i < levelSize; i++ { current := queue[0] queue = queue[1:] neighbors, err := Storage.GetNodeNeighbors(current.ID, "", storage.DirectionBoth) if err == nil { for _, neighbor := range neighbors { if !visited[neighbor.ID] { visited[neighbor.ID] = true queue = append(queue, neighbor) cache.nodes[neighbor.ID] = neighbor nodesLoaded++ } } } // Also cache the relationships rels, err := Storage.GetNodeRelationships(current.ID, "", storage.DirectionBoth) if err == nil { for _, rel := range rels { if _, exists := cache.relationships[rel.ID]; !exists { cache.relationships[rel.ID] = rel relsLoaded++ } } } } currentDepth++ } cache.mu.Unlock() elapsed := time.Since(startTime).Milliseconds() return map[string]interface{}{ "nodesLoaded": nodesLoaded, "relationshipsLoaded": relsLoaded, "depth": depth, "timeTaken": elapsed, } } // Path warms up a specific path. // // Example: // // apoc.warmup.path(nodes, rels) => path loaded func Path(nodes []*Node, rels []*Relationship) map[string]interface{} { start := time.Now() cache.mu.Lock() for _, node := range nodes { cache.nodes[node.ID] = node } for _, rel := range rels { cache.relationships[rel.ID] = rel } cache.mu.Unlock() nodesLoaded := len(nodes) relsLoaded := len(rels) elapsed := time.Since(start).Milliseconds() return map[string]interface{}{ "nodesLoaded": nodesLoaded, "relationshipsLoaded": relsLoaded, "timeTaken": elapsed, } } // Cache warms up cache for specific queries (stores query results). // // Example: // // apoc.warmup.cache(['MATCH (n:Person) RETURN n']) => cache warmed func Cache(queries []string) map[string]interface{} { start := time.Now() queriesWarmed := 0 cache.mu.Lock() for _, query := range queries { // Store a placeholder for the query result // In a real implementation, this would execute the query and cache results cache.queries[query] = map[string]interface{}{ "cached": true, "cachedAt": time.Now(), } queriesWarmed++ } cache.mu.Unlock() elapsed := time.Since(start).Milliseconds() return map[string]interface{}{ "queriesWarmed": queriesWarmed, "timeTaken": elapsed, } } // Stats returns warmup statistics. // // Example: // // apoc.warmup.stats() => {cacheHitRate: 0.95, ...} func Stats() map[string]interface{} { cache.mu.RLock() defer cache.mu.RUnlock() totalRequests := cache.stats.Hits + cache.stats.Misses hitRate := 0.0 missRate := 0.0 if totalRequests > 0 { hitRate = float64(cache.stats.Hits) / float64(totalRequests) missRate = float64(cache.stats.Misses) / float64(totalRequests) } itemsCached := len(cache.nodes) + len(cache.relationships) + len(cache.queries) return map[string]interface{}{ "cacheHitRate": hitRate, "cacheMissRate": missRate, "cacheHits": cache.stats.Hits, "cacheMisses": cache.stats.Misses, "memoryUsed": cache.stats.MemoryUsed, "itemsCached": itemsCached, "nodesCached": len(cache.nodes), "relationshipsCached": len(cache.relationships), "queriesCached": len(cache.queries), "labelIndexes": len(cache.nodesByLabel), "typeIndexes": len(cache.relsByType), } } // Clear clears warmed up data. // // Example: // // apoc.warmup.clear() => cache cleared func Clear() map[string]interface{} { cache.mu.Lock() defer cache.mu.Unlock() itemsCleared := len(cache.nodes) + len(cache.relationships) + len(cache.queries) cache.nodes = make(map[int64]*Node) cache.relationships = make(map[int64]*Relationship) cache.nodesByLabel = make(map[string][]*Node) cache.relsByType = make(map[string][]*Relationship) cache.properties = make(map[string][]interface{}) cache.queries = make(map[string]interface{}) cache.stats = CacheStats{} return map[string]interface{}{ "cleared": true, "itemsCleared": itemsCleared, } } // Optimize analyzes cache usage and provides recommendations. // // Example: // // apoc.warmup.optimize() => optimization results func Optimize() map[string]interface{} { start := time.Now() cache.mu.RLock() recommendations := []string{} // Analyze cache and provide recommendations if len(cache.nodes) == 0 { recommendations = append(recommendations, "Consider running warmup.Run() to cache nodes") } if len(cache.nodesByLabel) == 0 { recommendations = append(recommendations, "Consider running warmup.Indexes() to build label indexes") } totalRequests := cache.stats.Hits + cache.stats.Misses if totalRequests > 0 { hitRate := float64(cache.stats.Hits) / float64(totalRequests) if hitRate < 0.5 { recommendations = append(recommendations, "Low cache hit rate - consider warming up more frequently accessed data") } } // Find most used labels topLabels := []string{} for label, nodes := range cache.nodesByLabel { if len(nodes) > 10 { topLabels = append(topLabels, label) } } if len(topLabels) > 0 { recommendations = append(recommendations, "Frequently used labels detected - consider prioritizing: "+joinStrings(topLabels, ", ")) } cache.mu.RUnlock() if len(recommendations) == 0 { recommendations = append(recommendations, "Cache is well optimized") } elapsed := time.Since(start).Milliseconds() return map[string]interface{}{ "optimized": true, "timeTaken": elapsed, "recommendations": recommendations, } } // Schedule schedules periodic warmup (stores cron expression). // // Example: // // apoc.warmup.schedule('0 0 * * *') => scheduled func Schedule(cron string) map[string]interface{} { cache.mu.Lock() cache.scheduledCron = cron // In a real implementation, this would set up a cron job cache.mu.Unlock() return map[string]interface{}{ "scheduled": true, "cron": cron, } } // Status returns warmup status. // // Example: // // apoc.warmup.status() => {running: false, lastRun: ...} func Status() map[string]interface{} { cache.mu.RLock() defer cache.mu.RUnlock() var lastRunStr, nextRunStr interface{} if cache.lastRun != nil { lastRunStr = cache.lastRun.Format(time.RFC3339) } if cache.nextRun != nil { nextRunStr = cache.nextRun.Format(time.RFC3339) } return map[string]interface{}{ "running": cache.running, "lastRun": lastRunStr, "nextRun": nextRunStr, "scheduledCron": cache.scheduledCron, "itemsCached": len(cache.nodes) + len(cache.relationships) + len(cache.queries), } } // Progress returns warmup progress. // // Example: // // apoc.warmup.progress() => {percentage: 75, ...} func Progress() map[string]interface{} { cache.mu.RLock() defer cache.mu.RUnlock() // Get totals from storage allNodes, _ := Storage.AllNodes() allRels, _ := Storage.AllRelationships() totalNodes := len(allNodes) totalRels := len(allRels) cachedNodes := len(cache.nodes) cachedRels := len(cache.relationships) percentage := 0.0 if totalNodes+totalRels > 0 { percentage = float64(cachedNodes+cachedRels) / float64(totalNodes+totalRels) * 100 } return map[string]interface{}{ "percentage": percentage, "nodesLoaded": cachedNodes, "totalNodes": totalNodes, "relsLoaded": cachedRels, "totalRels": totalRels, } } // GetCachedNode retrieves a node from cache (increments hit/miss stats). func GetCachedNode(id int64) (*Node, bool) { cache.mu.Lock() defer cache.mu.Unlock() if node, exists := cache.nodes[id]; exists { cache.stats.Hits++ return node, true } cache.stats.Misses++ return nil, false } // GetCachedRelationship retrieves a relationship from cache. func GetCachedRelationship(id int64) (*Relationship, bool) { cache.mu.Lock() defer cache.mu.Unlock() if rel, exists := cache.relationships[id]; exists { cache.stats.Hits++ return rel, true } cache.stats.Misses++ return nil, false } // GetNodesByLabel retrieves nodes by label from cache. func GetNodesByLabel(label string) ([]*Node, bool) { cache.mu.RLock() defer cache.mu.RUnlock() if nodes, exists := cache.nodesByLabel[label]; exists { return nodes, true } return nil, false } // GetRelationshipsByType retrieves relationships by type from cache. func GetRelationshipsByType(relType string) ([]*Relationship, bool) { cache.mu.RLock() defer cache.mu.RUnlock() if rels, exists := cache.relsByType[relType]; exists { return rels, true } return nil, false } // Helper function to join strings func joinStrings(strs []string, sep string) string { if len(strs) == 0 { return "" } result := strs[0] for i := 1; i < len(strs); i++ { result += sep + strs[i] } return result }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server