Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
query_load.go14.4 kB
// Package temporal - Query load prediction for resource scaling. // // QueryLoadPredictor tracks query volume trends and predicts future load // using KalmanVelocity filters. This enables: // - Predicting upcoming query spikes // - Detecting load trends (increasing/decreasing) // - Triggering pre-emptive resource scaling // - Identifying query patterns (peak hours, burst events) // // Use cases: // - Auto-scale database connections // - Pre-warm caches before predicted spikes // - Alert on unusual load patterns // - Capacity planning // // Example usage: // // predictor := temporal.NewQueryLoadPredictor(temporal.DefaultLoadConfig()) // // // Record each query // predictor.RecordQuery() // // // Get current load prediction // prediction := predictor.GetPrediction() // fmt.Printf("Current QPS: %.1f, Predicted (5min): %.1f\n", // prediction.CurrentQPS, prediction.PredictedQPS) // // // Check if scaling needed // if predictor.ShouldScaleUp(100) { // threshold QPS // triggerScaleUp() // } // // # ELI12 (Explain Like I'm 12) // // Imagine you're running a lemonade stand. You want to know: // // 🍋 "How many customers are coming right now?" (Current QPS) // 🍋 "Will it get busier or slower?" (Trend) // 🍋 "Should I make more lemonade NOW?" (Scale up prediction) // // The QueryLoadPredictor counts how many "questions" (queries) the database // gets every second. It's like counting customers: // // Second 1: 10 queries // Second 2: 12 queries // Second 3: 15 queries // Second 4: 20 queries // → "Whoa, we're getting BUSIER! Velocity is positive!" 📈 // // The Kalman filter smooths out the bumps: // // Raw counts: 10, 12, 50, 11, 13 (that 50 was a weird spike!) // Filtered: 10, 11, 15, 13, 13 (smoothed - ignores the spike) // // Why filter? Because ONE busy second doesn't mean you need to panic! // The filter asks: "Is this a REAL trend or just random noise?" // // Predictions: // // Current: 50 QPS (queries per second) // Velocity: +5 QPS/second (getting busier) // Predicted in 5 min: 50 + (5 × 300) = 1550 QPS! 😱 // → "Better scale up NOW before we're overwhelmed!" // // Anomaly detection: // // Normal: 50 QPS // Suddenly: 500 QPS ← "SPIKE! Something's happening!" // Suddenly: 5 QPS ← "DROP! Did something break?" // // Peak hour detection: // // Hour 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 // Count 2 1 1 1 2 5 15 30 50 45 40 30 35 40 50 45 40 35 25 20 15 10 5 3 // ^^^^^^^^^^^^ // "Peak hours are 8-10am and 2-4pm" // // This lets us PREPARE before the rush! Pre-warm caches at 7:30am! 🚀 package temporal import ( "math" "sync" "time" "github.com/orneryd/nornicdb/pkg/filter" ) // LoadPrediction represents a query load prediction. type LoadPrediction struct { // Current metrics CurrentQPS float64 // Queries per second (smoothed) CurrentQPM float64 // Queries per minute (smoothed) RawQPS float64 // Unfiltered QPS TotalQueries int64 // Trend Velocity float64 // Rate of change (positive = increasing load) Trend string // "increasing", "decreasing", "stable" // Predictions PredictedQPS5m float64 // Predicted QPS in 5 minutes PredictedQPS15m float64 // Predicted QPS in 15 minutes PredictedQPS1h float64 // Predicted QPS in 1 hour // Confidence Confidence float64 // Time-of-day pattern PeakHour int IsNearPeak bool // Anomaly detection IsAnomaly bool AnomalyType string // "spike", "drop", "sustained_high", "sustained_low" // Timestamp Timestamp time.Time } // LoadConfig holds configuration for query load prediction. type LoadConfig struct { // FilterConfig for the underlying Kalman velocity filter FilterConfig filter.VelocityConfig // BucketDurationSeconds - duration of each measurement bucket BucketDurationSeconds float64 // SpikeThreshold - QPS velocity above which is a "spike" SpikeThreshold float64 // DropThreshold - QPS velocity below which is a "drop" DropThreshold float64 // AnomalyStdDevs - number of standard deviations for anomaly detection AnomalyStdDevs float64 // ScaleUpThreshold - relative increase that suggests scaling up ScaleUpThreshold float64 // ScaleDownThreshold - relative decrease that suggests scaling down ScaleDownThreshold float64 // PeakDetectionWindow - hours to track for peak detection PeakDetectionWindow int } // DefaultLoadConfig returns sensible defaults. func DefaultLoadConfig() LoadConfig { return LoadConfig{ FilterConfig: filter.VelocityConfig{ ProcessNoisePos: 0.5, // QPS can change quickly ProcessNoiseVel: 0.1, MeasurementNoise: 2.0, // Measurement has noise InitialPosVariance: 100.0, InitialVelVariance: 10.0, Dt: 1.0, }, BucketDurationSeconds: 1.0, // 1-second buckets SpikeThreshold: 5.0, // 5 QPS/sec increase DropThreshold: -5.0, AnomalyStdDevs: 3.0, ScaleUpThreshold: 0.5, // 50% increase ScaleDownThreshold: -0.3, // 30% decrease PeakDetectionWindow: 24, } } // HighSensitivityLoadConfig returns config for high-sensitivity detection. func HighSensitivityLoadConfig() LoadConfig { cfg := DefaultLoadConfig() cfg.SpikeThreshold = 2.0 cfg.AnomalyStdDevs = 2.0 cfg.ScaleUpThreshold = 0.3 return cfg } // QueryLoadPredictor predicts query load using velocity tracking. type QueryLoadPredictor struct { mu sync.RWMutex config LoadConfig // Kalman filter for QPS tracking qpsFilter *filter.KalmanVelocity // Current bucket currentBucket time.Time bucketCount int64 // Statistics totalQueries int64 startTime time.Time // Rolling window for variance calculation recentQPS []float64 recentQPSIdx int windowSize int // Hour-of-day tracking hourCounts [24]int64 hourSums [24]float64 // Baseline for anomaly detection baselineQPS float64 baselineStdDev float64 } // NewQueryLoadPredictor creates a new query load predictor. func NewQueryLoadPredictor(cfg LoadConfig) *QueryLoadPredictor { return &QueryLoadPredictor{ config: cfg, qpsFilter: filter.NewKalmanVelocity(cfg.FilterConfig), startTime: time.Now(), currentBucket: time.Now().Truncate(time.Duration(cfg.BucketDurationSeconds) * time.Second), recentQPS: make([]float64, 60), // Track last 60 measurements windowSize: 60, } } // RecordQuery records a query event. func (qlp *QueryLoadPredictor) RecordQuery() { qlp.RecordQueryAt(time.Now()) } // RecordQueryAt records a query at a specific time. func (qlp *QueryLoadPredictor) RecordQueryAt(timestamp time.Time) { qlp.mu.Lock() defer qlp.mu.Unlock() qlp.totalQueries++ // Track by hour hour := timestamp.Hour() qlp.hourCounts[hour]++ // Check if we've moved to a new bucket bucket := timestamp.Truncate(time.Duration(qlp.config.BucketDurationSeconds) * time.Second) if bucket.After(qlp.currentBucket) { // Flush the old bucket qlp.flushBucket() qlp.currentBucket = bucket qlp.bucketCount = 0 } qlp.bucketCount++ } // RecordQueries records multiple query events (batch recording). func (qlp *QueryLoadPredictor) RecordQueries(count int) { qlp.mu.Lock() defer qlp.mu.Unlock() qlp.totalQueries += int64(count) qlp.bucketCount += int64(count) hour := time.Now().Hour() qlp.hourCounts[hour] += int64(count) } // flushBucket processes the completed bucket. func (qlp *QueryLoadPredictor) flushBucket() { if qlp.bucketCount == 0 { return } // Calculate QPS for this bucket qps := float64(qlp.bucketCount) / qlp.config.BucketDurationSeconds // Feed to Kalman filter qlp.qpsFilter.Process(qps) // Track in rolling window qlp.recentQPS[qlp.recentQPSIdx] = qps qlp.recentQPSIdx = (qlp.recentQPSIdx + 1) % qlp.windowSize // Update hour tracking hour := qlp.currentBucket.Hour() qlp.hourSums[hour] += qps // Update baseline qlp.updateBaseline() } // updateBaseline updates the baseline QPS statistics. func (qlp *QueryLoadPredictor) updateBaseline() { // Calculate mean and std dev from recent window var sum, sumSq float64 count := 0 for _, qps := range qlp.recentQPS { if qps > 0 { sum += qps sumSq += qps * qps count++ } } if count > 5 { mean := sum / float64(count) variance := sumSq/float64(count) - mean*mean if variance < 0 { variance = 0 } qlp.baselineQPS = mean qlp.baselineStdDev = math.Sqrt(variance) } } // GetPrediction returns the current load prediction. func (qlp *QueryLoadPredictor) GetPrediction() LoadPrediction { qlp.mu.RLock() defer qlp.mu.RUnlock() now := time.Now() // Get filtered QPS and velocity currentQPS := qlp.qpsFilter.State() velocity := qlp.qpsFilter.Velocity() // Calculate raw QPS from current bucket elapsed := now.Sub(qlp.currentBucket).Seconds() rawQPS := 0.0 if elapsed > 0 { rawQPS = float64(qlp.bucketCount) / elapsed } // Determine trend var trend string if velocity > qlp.config.SpikeThreshold/10 { trend = "increasing" } else if velocity < qlp.config.DropThreshold/10 { trend = "decreasing" } else { trend = "stable" } // Predictions pred5m := qlp.qpsFilter.Predict(300) // 5 minutes pred15m := qlp.qpsFilter.Predict(900) // 15 minutes pred1h := qlp.qpsFilter.Predict(3600) // 1 hour // Clamp predictions to reasonable values if pred5m < 0 { pred5m = 0 } if pred15m < 0 { pred15m = 0 } if pred1h < 0 { pred1h = 0 } // Find peak hour peakHour := 0 maxCount := int64(0) for h, count := range qlp.hourCounts { if count > maxCount { maxCount = count peakHour = h } } // Check if near peak (within 2 hours) currentHour := now.Hour() isNearPeak := abs(currentHour-peakHour) <= 2 || abs(currentHour-peakHour) >= 22 // Handle wrap-around // Anomaly detection isAnomaly := false anomalyType := "" if qlp.baselineStdDev > 0 { deviation := (currentQPS - qlp.baselineQPS) / qlp.baselineStdDev if deviation > qlp.config.AnomalyStdDevs { isAnomaly = true if velocity > qlp.config.SpikeThreshold { anomalyType = "spike" } else { anomalyType = "sustained_high" } } else if deviation < -qlp.config.AnomalyStdDevs { isAnomaly = true if velocity < qlp.config.DropThreshold { anomalyType = "drop" } else { anomalyType = "sustained_low" } } } // Calculate confidence confidence := float64(qlp.totalQueries) / float64(qlp.totalQueries+1000) return LoadPrediction{ CurrentQPS: currentQPS, CurrentQPM: currentQPS * 60, RawQPS: rawQPS, TotalQueries: qlp.totalQueries, Velocity: velocity, Trend: trend, PredictedQPS5m: pred5m, PredictedQPS15m: pred15m, PredictedQPS1h: pred1h, Confidence: confidence, PeakHour: peakHour, IsNearPeak: isNearPeak, IsAnomaly: isAnomaly, AnomalyType: anomalyType, Timestamp: now, } } // abs returns the absolute value of an int. func abs(x int) int { if x < 0 { return -x } return x } // ShouldScaleUp checks if load is increasing and above threshold. func (qlp *QueryLoadPredictor) ShouldScaleUp(thresholdQPS float64) bool { pred := qlp.GetPrediction() // Scale up if: // 1. Current QPS is near threshold // 2. Trend is increasing // 3. Predicted QPS exceeds threshold if pred.CurrentQPS > thresholdQPS*0.8 && pred.Trend == "increasing" { return true } if pred.PredictedQPS5m > thresholdQPS { return true } return false } // ShouldScaleDown checks if load is decreasing and below threshold. func (qlp *QueryLoadPredictor) ShouldScaleDown(thresholdQPS float64, minQPS float64) bool { pred := qlp.GetPrediction() // Scale down if: // 1. Current QPS is well below threshold // 2. Trend is decreasing or stable // 3. Above minimum QPS if pred.CurrentQPS < thresholdQPS*0.5 && pred.Trend != "increasing" && pred.CurrentQPS > minQPS { return true } return false } // GetLoadLevel returns a simplified load level (0-5). func (qlp *QueryLoadPredictor) GetLoadLevel(maxQPS float64) int { pred := qlp.GetPrediction() ratio := pred.CurrentQPS / maxQPS switch { case ratio < 0.1: return 0 // Idle case ratio < 0.3: return 1 // Low case ratio < 0.5: return 2 // Medium case ratio < 0.7: return 3 // High case ratio < 0.9: return 4 // Very High default: return 5 // Critical } } // PredictPeakTime predicts when the next peak will occur. func (qlp *QueryLoadPredictor) PredictPeakTime() time.Time { qlp.mu.RLock() defer qlp.mu.RUnlock() now := time.Now() currentHour := now.Hour() // Find peak hour peakHour := 0 maxCount := int64(0) for h, count := range qlp.hourCounts { if count > maxCount { maxCount = count peakHour = h } } // Calculate next occurrence of peak hour hoursUntilPeak := peakHour - currentHour if hoursUntilPeak <= 0 { hoursUntilPeak += 24 } return now.Add(time.Duration(hoursUntilPeak) * time.Hour).Truncate(time.Hour) } // LoadStats holds statistics about query load. type LoadStats struct { TotalQueries int64 UptimeSeconds float64 AverageQPS float64 CurrentQPS float64 PeakQPS float64 PeakHour int } // GetStats returns query load statistics. func (qlp *QueryLoadPredictor) GetStats() LoadStats { qlp.mu.RLock() defer qlp.mu.RUnlock() uptime := time.Since(qlp.startTime).Seconds() // Find peak QPS from recent window peakQPS := 0.0 for _, qps := range qlp.recentQPS { if qps > peakQPS { peakQPS = qps } } // Find peak hour peakHour := 0 maxCount := int64(0) for h, count := range qlp.hourCounts { if count > maxCount { maxCount = count peakHour = h } } return LoadStats{ TotalQueries: qlp.totalQueries, UptimeSeconds: uptime, AverageQPS: float64(qlp.totalQueries) / uptime, CurrentQPS: qlp.qpsFilter.State(), PeakQPS: peakQPS, PeakHour: peakHour, } } // Reset clears all load data. func (qlp *QueryLoadPredictor) Reset() { qlp.mu.Lock() defer qlp.mu.Unlock() qlp.qpsFilter = filter.NewKalmanVelocity(qlp.config.FilterConfig) qlp.totalQueries = 0 qlp.startTime = time.Now() qlp.currentBucket = time.Now().Truncate(time.Duration(qlp.config.BucketDurationSeconds) * time.Second) qlp.bucketCount = 0 qlp.recentQPS = make([]float64, qlp.windowSize) qlp.recentQPSIdx = 0 qlp.hourCounts = [24]int64{} qlp.hourSums = [24]float64{} qlp.baselineQPS = 0 qlp.baselineStdDev = 0 }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server