package cypher
import (
"context"
"encoding/json"
"fmt"
"math"
"testing"
"github.com/orneryd/nornicdb/pkg/storage"
)
func TestKalmanInit_Default(t *testing.T) {
stateJSON := kalmanInit(nil)
var state KalmanState
if err := json.Unmarshal([]byte(stateJSON), &state); err != nil {
t.Fatalf("Failed to parse state JSON: %v", err)
}
if state.X != 0 {
t.Errorf("Expected initial X=0, got %f", state.X)
}
if state.P != 30.0 {
t.Errorf("Expected initial P=30.0, got %f", state.P)
}
if state.R != 88.0 {
t.Errorf("Expected initial R=88.0, got %f", state.R)
}
}
func TestKalmanInit_WithConfig(t *testing.T) {
config := map[string]interface{}{
"processNoise": 0.2,
"measurementNoise": 100.0,
}
stateJSON := kalmanInit(config)
var state KalmanState
if err := json.Unmarshal([]byte(stateJSON), &state); err != nil {
t.Fatalf("Failed to parse state JSON: %v", err)
}
if state.Q != 0.0002 { // 0.2 * 0.001
t.Errorf("Expected Q=0.0002, got %f", state.Q)
}
if state.R != 100.0 {
t.Errorf("Expected R=100.0, got %f", state.R)
}
}
func TestKalmanProcess_ConvergesToValue(t *testing.T) {
stateJSON := kalmanInit(nil)
// Feed constant value, should converge
target := 10.0
for i := 0; i < 20; i++ {
result := kalmanProcess(target, stateJSON, 0)
stateJSON = result["state"].(string)
}
var state KalmanState
json.Unmarshal([]byte(stateJSON), &state)
if math.Abs(state.X-target) > 0.5 {
t.Errorf("Filter should converge to ~10.0, got %f", state.X)
}
}
func TestKalmanProcess_SmoothsNoise(t *testing.T) {
stateJSON := kalmanInit(nil)
// Feed noisy values around 5.0
values := []float64{5.5, 4.5, 5.2, 4.8, 5.1, 4.9, 5.3, 4.7, 5.0, 5.0}
var lastFiltered float64
for _, v := range values {
result := kalmanProcess(v, stateJSON, 0)
stateJSON = result["state"].(string)
lastFiltered = result["value"].(float64)
}
// Should converge near 5.0
if math.Abs(lastFiltered-5.0) > 1.0 {
t.Errorf("Expected filtered value near 5.0, got %f", lastFiltered)
}
}
func TestKalmanPredict(t *testing.T) {
stateJSON := kalmanInit(nil)
// Process some values with upward trend
for i := 1; i <= 10; i++ {
result := kalmanProcess(float64(i), stateJSON, 0)
stateJSON = result["state"].(string)
}
// Get current state
currentState := kalmanStateValue(stateJSON)
// Predict forward - should be at or above current state for upward trend
// Note: basic Kalman doesn't track velocity as well as velocity Kalman
prediction := kalmanPredict(stateJSON, 3)
// Just verify prediction is reasonable (not wildly off)
if prediction < currentState-5.0 {
t.Errorf("Expected prediction >= %f-5, got %f", currentState, prediction)
}
}
func TestKalmanVelocityInit_Default(t *testing.T) {
stateJSON := kalmanVelocityInit(0, 0, false)
var state KalmanVelocityState
if err := json.Unmarshal([]byte(stateJSON), &state); err != nil {
t.Fatalf("Failed to parse state JSON: %v", err)
}
if state.Pos != 0 {
t.Errorf("Expected initial Pos=0, got %f", state.Pos)
}
if state.Vel != 0 {
t.Errorf("Expected initial Vel=0, got %f", state.Vel)
}
if state.Dt != 1.0 {
t.Errorf("Expected Dt=1.0, got %f", state.Dt)
}
}
func TestKalmanVelocityInit_WithInitial(t *testing.T) {
stateJSON := kalmanVelocityInit(10.0, 0.5, true)
var state KalmanVelocityState
json.Unmarshal([]byte(stateJSON), &state)
if state.Pos != 10.0 {
t.Errorf("Expected Pos=10.0, got %f", state.Pos)
}
if state.Vel != 0.5 {
t.Errorf("Expected Vel=0.5, got %f", state.Vel)
}
}
func TestKalmanVelocityProcess_TracksVelocity(t *testing.T) {
stateJSON := kalmanVelocityInit(0, 0, false)
// Linear trend: 0, 1, 2, 3, 4, 5
for i := 0; i <= 5; i++ {
result := kalmanVelocityProcess(float64(i), stateJSON)
stateJSON = result["state"].(string)
// After a few iterations, velocity should stabilize near 1.0
if i >= 3 {
velocity := result["velocity"].(float64)
if math.Abs(velocity-1.0) > 0.5 {
t.Errorf("At i=%d, expected velocity near 1.0, got %f", i, velocity)
}
}
}
}
func TestKalmanVelocityPredict(t *testing.T) {
stateJSON := kalmanVelocityInit(10.0, 2.0, true)
// With pos=10, vel=2, dt=1, predict 5 steps should give ~20
prediction := kalmanVelocityPredict(stateJSON, 5)
if math.Abs(prediction-20.0) > 0.1 {
t.Errorf("Expected prediction ~20.0, got %f", prediction)
}
}
func TestKalmanAdaptiveInit_Default(t *testing.T) {
stateJSON := kalmanAdaptiveInit(nil)
var state KalmanAdaptiveState
if err := json.Unmarshal([]byte(stateJSON), &state); err != nil {
t.Fatalf("Failed to parse state JSON: %v", err)
}
if state.Mode != "basic" {
t.Errorf("Expected initial mode='basic', got %s", state.Mode)
}
if state.TrendThreshold != 0.1 {
t.Errorf("Expected TrendThreshold=0.1, got %f", state.TrendThreshold)
}
}
func TestKalmanAdaptiveInit_WithConfig(t *testing.T) {
config := map[string]interface{}{
"trendThreshold": 0.2,
"initialMode": "velocity",
}
stateJSON := kalmanAdaptiveInit(config)
var state KalmanAdaptiveState
json.Unmarshal([]byte(stateJSON), &state)
if state.TrendThreshold != 0.2 {
t.Errorf("Expected TrendThreshold=0.2, got %f", state.TrendThreshold)
}
if state.Mode != "velocity" {
t.Errorf("Expected mode='velocity', got %s", state.Mode)
}
}
func TestKalmanAdaptiveProcess_SwitchesToVelocityOnTrend(t *testing.T) {
config := map[string]interface{}{
"hysteresis": float64(3), // Quick switch for testing
}
stateJSON := kalmanAdaptiveInit(config)
// Feed strong upward trend
for i := 0; i < 20; i++ {
result := kalmanAdaptiveProcess(float64(i)*2.0, stateJSON)
stateJSON = result["state"].(string)
// Eventually should switch to velocity mode
if i >= 15 && result["mode"].(string) != "velocity" {
// May or may not switch depending on hysteresis
}
}
// Just verify it doesn't crash
var state KalmanAdaptiveState
if err := json.Unmarshal([]byte(stateJSON), &state); err != nil {
t.Fatalf("State should be valid JSON: %v", err)
}
}
func TestKalmanReset_Basic(t *testing.T) {
stateJSON := kalmanInit(nil)
// Process some data
for i := 0; i < 5; i++ {
result := kalmanProcess(float64(i*10), stateJSON, 0)
stateJSON = result["state"].(string)
}
// Reset
resetJSON := kalmanReset(stateJSON)
var state KalmanState
json.Unmarshal([]byte(resetJSON), &state)
if state.X != 0 {
t.Errorf("Expected reset X=0, got %f", state.X)
}
if state.Observations != 0 {
t.Errorf("Expected reset observations=0, got %d", state.Observations)
}
}
func TestKalmanReset_Velocity(t *testing.T) {
stateJSON := kalmanVelocityInit(50, 5, true)
// Reset
resetJSON := kalmanReset(stateJSON)
var state KalmanVelocityState
json.Unmarshal([]byte(resetJSON), &state)
if state.Pos != 0 {
t.Errorf("Expected reset Pos=0, got %f", state.Pos)
}
}
func TestKalmanReset_Adaptive(t *testing.T) {
stateJSON := kalmanAdaptiveInit(nil)
// Reset
resetJSON := kalmanReset(stateJSON)
var state KalmanAdaptiveState
json.Unmarshal([]byte(resetJSON), &state)
if state.Mode != "basic" {
t.Errorf("Expected reset mode='basic', got %s", state.Mode)
}
}
func TestKalmanProcess_InvalidState(t *testing.T) {
result := kalmanProcess(5.0, "invalid json", 0)
// Should return original measurement on error
if result["value"].(float64) != 5.0 {
t.Errorf("Expected original measurement on error, got %v", result["value"])
}
if result["error"] == nil {
t.Error("Expected error field to be set")
}
}
func TestKalmanVelocityProcess_InvalidState(t *testing.T) {
result := kalmanVelocityProcess(5.0, "invalid json")
if result["value"].(float64) != 5.0 {
t.Errorf("Expected original measurement on error, got %v", result["value"])
}
if result["error"] == nil {
t.Error("Expected error field to be set")
}
}
// ============================================================================
// Cypher Query Integration Tests
// These tests execute actual Cypher queries to verify Kalman functions work
// end-to-end through the query parser and executor.
// ============================================================================
func TestCypherKalmanInit(t *testing.T) {
store := setupKalmanTestStorage(t)
exec := NewStorageExecutor(store)
ctx := context.Background()
tests := []struct {
name string
query string
}{
{"Default init", "RETURN kalman.init()"},
{"Init with config", "RETURN kalman.init({processNoise: 0.5, measurementNoise: 50.0})"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := exec.Execute(ctx, tt.query, nil)
if err != nil {
t.Fatalf("Query failed: %v", err)
}
if len(result.Rows) != 1 {
t.Fatalf("Expected 1 row, got %d", len(result.Rows))
}
stateJSON, ok := result.Rows[0][0].(string)
if !ok {
t.Fatalf("Expected string result, got %T", result.Rows[0][0])
}
// Verify it's valid JSON
var state KalmanState
if err := json.Unmarshal([]byte(stateJSON), &state); err != nil {
t.Errorf("Result is not valid KalmanState JSON: %v", err)
}
})
}
}
func TestCypherKalmanProcess(t *testing.T) {
store := setupKalmanTestStorage(t)
exec := NewStorageExecutor(store)
ctx := context.Background()
// First get initial state
initResult, _ := exec.Execute(ctx, "RETURN kalman.init()", nil)
initialState := initResult.Rows[0][0].(string)
// Process a measurement
query := fmt.Sprintf("RETURN kalman.process(100.0, '%s')", initialState)
result, err := exec.Execute(ctx, query, nil)
if err != nil {
t.Fatalf("Query failed: %v", err)
}
// Result should be a map with 'value' and 'state'
resultMap, ok := result.Rows[0][0].(map[string]interface{})
if !ok {
t.Fatalf("Expected map result, got %T: %v", result.Rows[0][0], result.Rows[0][0])
}
// Check 'value' field exists and is numeric
if _, hasValue := resultMap["value"]; !hasValue {
t.Error("Result missing 'value' field")
}
// Check 'state' field exists and is valid JSON
stateStr, hasState := resultMap["state"].(string)
if !hasState {
t.Error("Result missing 'state' field")
}
var state KalmanState
if err := json.Unmarshal([]byte(stateStr), &state); err != nil {
t.Errorf("State is not valid JSON: %v", err)
}
}
func TestCypherKalmanPredict(t *testing.T) {
store := setupKalmanTestStorage(t)
exec := NewStorageExecutor(store)
ctx := context.Background()
// Get initial state and process some values
initResult, _ := exec.Execute(ctx, "RETURN kalman.init()", nil)
state := initResult.Rows[0][0].(string)
// Process several measurements to build up state
for i := 1; i <= 5; i++ {
query := fmt.Sprintf("RETURN kalman.process(%d.0, '%s')", i*10, state)
result, _ := exec.Execute(ctx, query, nil)
resultMap := result.Rows[0][0].(map[string]interface{})
state = resultMap["state"].(string)
}
// Now predict
query := fmt.Sprintf("RETURN kalman.predict('%s', 3)", state)
result, err := exec.Execute(ctx, query, nil)
if err != nil {
t.Fatalf("Predict query failed: %v", err)
}
prediction, ok := toFloat64(result.Rows[0][0])
if !ok {
t.Fatalf("Expected numeric prediction, got %T", result.Rows[0][0])
}
// After processing 10, 20, 30, 40, 50 - prediction should be reasonable
if prediction < 30.0 || prediction > 70.0 {
t.Errorf("Prediction %f seems unreasonable after 10-50 series", prediction)
}
t.Logf("✓ Prediction after 10-50 series: %.2f", prediction)
}
func TestCypherKalmanVelocityPrediction(t *testing.T) {
store := setupKalmanTestStorage(t)
exec := NewStorageExecutor(store)
ctx := context.Background()
// Initialize velocity filter
initResult, _ := exec.Execute(ctx, "RETURN kalman.velocity.init()", nil)
state := initResult.Rows[0][0].(string)
// Process linear trend: 0, 10, 20, 30, 40, 50
for i := 0; i <= 5; i++ {
query := fmt.Sprintf("RETURN kalman.velocity.process(%d.0, '%s')", i*10, state)
result, _ := exec.Execute(ctx, query, nil)
resultMap := result.Rows[0][0].(map[string]interface{})
state = resultMap["state"].(string)
if i >= 2 {
velocity := resultMap["velocity"].(float64)
t.Logf(" Step %d: velocity=%.2f", i, velocity)
}
}
// Predict 5 steps ahead - should be around 100 (50 + 5*10)
query := fmt.Sprintf("RETURN kalman.velocity.predict('%s', 5)", state)
result, err := exec.Execute(ctx, query, nil)
if err != nil {
t.Fatalf("Predict query failed: %v", err)
}
prediction, _ := toFloat64(result.Rows[0][0])
t.Logf("✓ After 0-50 linear trend, predict +5 steps: %.2f (expected ~100)", prediction)
// Should be in ballpark of 100 (allow some filter lag)
if prediction < 70.0 || prediction > 130.0 {
t.Errorf("Prediction %f not in expected range [70, 130]", prediction)
}
}
func TestCypherKalmanAdaptiveModeSwitching(t *testing.T) {
store := setupKalmanTestStorage(t)
exec := NewStorageExecutor(store)
ctx := context.Background()
// Initialize adaptive filter with low hysteresis for quick switching
initResult, _ := exec.Execute(ctx, "RETURN kalman.adaptive.init({hysteresis: 3})", nil)
state := initResult.Rows[0][0].(string)
var lastMode string
// Process strong trend - should eventually switch to velocity mode
for i := 0; i < 15; i++ {
query := fmt.Sprintf("RETURN kalman.adaptive.process(%d.0, '%s')", i*5, state)
result, _ := exec.Execute(ctx, query, nil)
resultMap := result.Rows[0][0].(map[string]interface{})
state = resultMap["state"].(string)
lastMode = resultMap["mode"].(string)
if i%5 == 0 {
t.Logf(" Step %d: mode=%s, value=%.2f", i, lastMode, resultMap["value"].(float64))
}
}
t.Logf("✓ Final mode after strong trend: %s", lastMode)
}
func TestCypherKalmanReset(t *testing.T) {
store := setupKalmanTestStorage(t)
exec := NewStorageExecutor(store)
ctx := context.Background()
// Initialize and process some data
initResult, _ := exec.Execute(ctx, "RETURN kalman.init()", nil)
state := initResult.Rows[0][0].(string)
for i := 1; i <= 5; i++ {
query := fmt.Sprintf("RETURN kalman.process(%d.0, '%s')", i*100, state)
result, _ := exec.Execute(ctx, query, nil)
resultMap := result.Rows[0][0].(map[string]interface{})
state = resultMap["state"].(string)
}
// Get current estimate before reset
beforeQuery := fmt.Sprintf("RETURN kalman.state('%s')", state)
beforeResult, _ := exec.Execute(ctx, beforeQuery, nil)
beforeValue, _ := toFloat64(beforeResult.Rows[0][0])
t.Logf(" Before reset: estimate=%.2f", beforeValue)
// Reset the filter
resetQuery := fmt.Sprintf("RETURN kalman.reset('%s')", state)
resetResult, err := exec.Execute(ctx, resetQuery, nil)
if err != nil {
t.Fatalf("Reset query failed: %v", err)
}
resetState := resetResult.Rows[0][0].(string)
// Get estimate after reset - should be 0
afterQuery := fmt.Sprintf("RETURN kalman.state('%s')", resetState)
afterResult, _ := exec.Execute(ctx, afterQuery, nil)
afterValue, _ := toFloat64(afterResult.Rows[0][0])
t.Logf(" After reset: estimate=%.2f", afterValue)
if afterValue != 0 {
t.Errorf("Expected estimate=0 after reset, got %.2f", afterValue)
}
}
func TestCypherKalmanNoiseSmoothingDemo(t *testing.T) {
store := setupKalmanTestStorage(t)
exec := NewStorageExecutor(store)
ctx := context.Background()
// Simulate noisy sensor readings around 50.0
noisyReadings := []float64{52.3, 48.1, 51.5, 47.8, 50.2, 53.1, 49.5, 50.8, 48.9, 51.2}
initResult, _ := exec.Execute(ctx, "RETURN kalman.init({measurementNoise: 10.0})", nil)
state := initResult.Rows[0][0].(string)
t.Log("Noise smoothing demonstration:")
t.Log(" Raw → Filtered")
var lastFiltered float64
for _, raw := range noisyReadings {
query := fmt.Sprintf("RETURN kalman.process(%.1f, '%s')", raw, state)
result, _ := exec.Execute(ctx, query, nil)
resultMap := result.Rows[0][0].(map[string]interface{})
state = resultMap["state"].(string)
lastFiltered = resultMap["value"].(float64)
t.Logf(" %.1f → %.2f", raw, lastFiltered)
}
// Final filtered value should be close to 50
if math.Abs(lastFiltered-50.0) > 3.0 {
t.Errorf("Expected filtered value near 50.0, got %.2f", lastFiltered)
}
t.Logf("✓ Converged to %.2f (true value: 50.0)", lastFiltered)
}
func TestCypherKalmanStockPricePrediction(t *testing.T) {
// Simulates the AP News → Stock prediction use case from docs
store := setupKalmanTestStorage(t)
exec := NewStorageExecutor(store)
ctx := context.Background()
// Simulated stock prices with upward trend + noise
// Represents sentiment-adjusted prices from news analysis
stockPrices := []float64{100.0, 102.5, 101.8, 104.2, 103.5, 106.1, 105.8, 108.3, 107.9, 110.5}
initResult, _ := exec.Execute(ctx, "RETURN kalman.velocity.init()", nil)
state := initResult.Rows[0][0].(string)
t.Log("Stock price trend analysis (AP News → Stock scenario):")
t.Log(" Price → Smoothed (Velocity)")
for i, price := range stockPrices {
query := fmt.Sprintf("RETURN kalman.velocity.process(%.1f, '%s')", price, state)
result, _ := exec.Execute(ctx, query, nil)
resultMap := result.Rows[0][0].(map[string]interface{})
state = resultMap["state"].(string)
smoothed := resultMap["value"].(float64)
velocity := resultMap["velocity"].(float64)
t.Logf(" Day %d: $%.2f → $%.2f (trend: %+.2f/day)", i+1, price, smoothed, velocity)
}
// Predict next 3 days
query := fmt.Sprintf("RETURN kalman.velocity.predict('%s', 3)", state)
result, _ := exec.Execute(ctx, query, nil)
prediction, _ := toFloat64(result.Rows[0][0])
t.Logf("✓ Predicted price in 3 days: $%.2f", prediction)
// Should predict higher than last price given upward trend
lastPrice := stockPrices[len(stockPrices)-1]
if prediction <= lastPrice {
t.Logf("Note: Prediction %.2f not higher than last price %.2f (filter may be conservative)", prediction, lastPrice)
}
}
func TestCypherKalmanWithNodeProperty(t *testing.T) {
// Tests storing/retrieving Kalman state in node properties
store := setupKalmanTestStorage(t)
exec := NewStorageExecutor(store)
ctx := context.Background()
// Create a sensor node with initial Kalman state
_, err := exec.Execute(ctx, `
CREATE (s:Sensor {
name: 'Temperature',
kalmanState: kalman.init({measurementNoise: 5.0})
})
`, nil)
if err != nil {
t.Fatalf("Failed to create sensor node: %v", err)
}
// Read back the state
result, err := exec.Execute(ctx, `
MATCH (s:Sensor {name: 'Temperature'})
RETURN s.kalmanState
`, nil)
if err != nil {
t.Fatalf("Failed to read sensor: %v", err)
}
stateJSON := result.Rows[0][0].(string)
var state KalmanState
if err := json.Unmarshal([]byte(stateJSON), &state); err != nil {
t.Fatalf("Invalid Kalman state in node property: %v", err)
}
t.Logf("✓ Kalman state stored in node property: R=%.1f", state.R)
// Now update the node with processed state using direct function calls
// (avoiding the complexity of embedding JSON in queries)
currentState := stateJSON
for _, temp := range []float64{20.5, 21.2, 20.8} {
// Process new reading using direct function call
resultMap := kalmanProcess(temp, currentState, 0)
newState := resultMap["state"].(string)
filtered := resultMap["value"].(float64)
// Update in-memory state for next iteration
currentState = newState
t.Logf(" Updated sensor: raw=%.1f, filtered=%.2f", temp, filtered)
}
// Verify state is valid JSON after multiple updates
var finalState KalmanState
if err := json.Unmarshal([]byte(currentState), &finalState); err != nil {
t.Fatalf("Final state is not valid JSON: %v", err)
}
if finalState.Observations != 3 {
t.Errorf("Expected 3 observations, got %d", finalState.Observations)
}
t.Log("✓ Successfully processed multiple readings through Kalman filter")
}
// Helper for Kalman test setup
func setupKalmanTestStorage(t *testing.T) *storage.MemoryEngine {
t.Helper()
return storage.NewMemoryEngine()
}
// ============================================================================
// Benchmark tests
// ============================================================================
func BenchmarkKalmanProcess(b *testing.B) {
stateJSON := kalmanInit(nil)
b.ResetTimer()
for i := 0; i < b.N; i++ {
result := kalmanProcess(float64(i%100), stateJSON, 0)
stateJSON = result["state"].(string)
}
}
func BenchmarkKalmanVelocityProcess(b *testing.B) {
stateJSON := kalmanVelocityInit(0, 0, false)
b.ResetTimer()
for i := 0; i < b.N; i++ {
result := kalmanVelocityProcess(float64(i%100), stateJSON)
stateJSON = result["state"].(string)
}
}
func BenchmarkKalmanAdaptiveProcess(b *testing.B) {
stateJSON := kalmanAdaptiveInit(nil)
b.ResetTimer()
for i := 0; i < b.N; i++ {
result := kalmanAdaptiveProcess(float64(i%100), stateJSON)
stateJSON = result["state"].(string)
}
}
func BenchmarkCypherKalmanProcess(b *testing.B) {
store := storage.NewMemoryEngine()
exec := NewStorageExecutor(store)
ctx := context.Background()
// Get initial state
initResult, _ := exec.Execute(ctx, "RETURN kalman.init()", nil)
state := initResult.Rows[0][0].(string)
b.ResetTimer()
for i := 0; i < b.N; i++ {
query := fmt.Sprintf("RETURN kalman.process(%d.0, '%s')", i%100, state)
result, _ := exec.Execute(ctx, query, nil)
resultMap := result.Rows[0][0].(map[string]interface{})
state = resultMap["state"].(string)
}
}