Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
scenario_test.go70.4 kB
package replication import ( "context" "fmt" "sync" "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // ============================================================================= // SCENARIO TEST INFRASTRUCTURE // ============================================================================= // TestCluster represents a test cluster with multiple nodes. type TestCluster struct { t *testing.T nodes map[string]*TestNode transports map[string]*MockTransport connections map[string]map[string]*MockPeerConn // from -> to -> conn mu sync.RWMutex } // TestNode represents a node in the test cluster. type TestNode struct { ID string Storage *MockStorage Replicator Replicator Transport *MockTransport Config *Config Role string // "leader", "follower", "primary", "standby" } // NewTestCluster creates a new test cluster. func NewTestCluster(t *testing.T) *TestCluster { return &TestCluster{ t: t, nodes: make(map[string]*TestNode), transports: make(map[string]*MockTransport), connections: make(map[string]map[string]*MockPeerConn), } } // AddNode adds a node to the cluster. func (c *TestCluster) AddNode(id string, config *Config) *TestNode { c.mu.Lock() defer c.mu.Unlock() storage := NewMockStorage() transport := NewMockTransport() node := &TestNode{ ID: id, Storage: storage, Transport: transport, Config: config, } c.nodes[id] = node c.transports[id] = transport c.connections[id] = make(map[string]*MockPeerConn) return node } // ConnectNodes creates mock connections between nodes. func (c *TestCluster) ConnectNodes(from, to string) *MockPeerConn { c.mu.Lock() defer c.mu.Unlock() conn := &MockPeerConn{ addr: to, connected: true, } if c.connections[from] == nil { c.connections[from] = make(map[string]*MockPeerConn) } c.connections[from][to] = conn return conn } // GetConnection returns the connection between two nodes. func (c *TestCluster) GetConnection(from, to string) *MockPeerConn { c.mu.RLock() defer c.mu.RUnlock() if conns, ok := c.connections[from]; ok { return conns[to] } return nil } // Shutdown shuts down all nodes. func (c *TestCluster) Shutdown() { c.mu.Lock() defer c.mu.Unlock() for _, node := range c.nodes { if node.Replicator != nil { node.Replicator.Shutdown() } if node.Transport != nil { node.Transport.Close() } } } // ============================================================================= // A. STANDALONE (SINGLE NODE) TESTS // ============================================================================= func TestScenario_Standalone_A_BasicOperations(t *testing.T) { storage := NewMockStorage() config := DefaultConfig() config.Mode = ModeStandalone config.NodeID = "standalone-1" replicator := NewStandaloneReplicator(config, storage) ctx := context.Background() require.NoError(t, replicator.Start(ctx)) // Test A1: Write operations should succeed t.Run("A1_WriteOperations", func(t *testing.T) { for i := 0; i < 10; i++ { cmd := &Command{ Type: CmdCreateNode, Data: []byte(fmt.Sprintf("node-%d", i)), Timestamp: time.Now(), } err := replicator.Apply(cmd, time.Second) require.NoError(t, err) } assert.Equal(t, 10, storage.GetApplyCount()) }) // Test A2: Status should always report healthy t.Run("A2_HealthStatus", func(t *testing.T) { status := replicator.Health() assert.True(t, status.Healthy) assert.Equal(t, "standalone", status.Role) // Standalone reports as "standalone", not "leader" }) // Test A3: Should report as leader t.Run("A3_LeaderStatus", func(t *testing.T) { assert.True(t, replicator.IsLeader()) }) // Test A4: No peers should be returned t.Run("A4_NoPeers", func(t *testing.T) { // Standalone has no peers status := replicator.Health() assert.Empty(t, status.Peers) }) replicator.Shutdown() } func TestScenario_Standalone_B_Resilience(t *testing.T) { storage := NewMockStorage() config := DefaultConfig() config.Mode = ModeStandalone config.NodeID = "standalone-1" // Test B1: Recovery after restart t.Run("B1_RecoveryAfterRestart", func(t *testing.T) { replicator := NewStandaloneReplicator(config, storage) ctx := context.Background() require.NoError(t, replicator.Start(ctx)) // Write data cmd := &Command{Type: CmdCreateNode, Data: []byte("before-restart")} require.NoError(t, replicator.Apply(cmd, time.Second)) replicator.Shutdown() // Restart with same storage replicator2 := NewStandaloneReplicator(config, storage) require.NoError(t, replicator2.Start(ctx)) // Should be able to continue writing cmd2 := &Command{Type: CmdCreateNode, Data: []byte("after-restart")} require.NoError(t, replicator2.Apply(cmd2, time.Second)) assert.Equal(t, 2, storage.GetApplyCount()) replicator2.Shutdown() }) // Test B2: Concurrent writes t.Run("B2_ConcurrentWrites", func(t *testing.T) { storage := NewMockStorage() replicator := NewStandaloneReplicator(config, storage) ctx := context.Background() require.NoError(t, replicator.Start(ctx)) var wg sync.WaitGroup errors := make(chan error, 100) for i := 0; i < 100; i++ { wg.Add(1) go func(n int) { defer wg.Done() cmd := &Command{Type: CmdCreateNode, Data: []byte{byte(n)}} if err := replicator.Apply(cmd, time.Second); err != nil { errors <- err } }(i) } wg.Wait() close(errors) var errCount int for range errors { errCount++ } assert.Equal(t, 0, errCount) assert.Equal(t, 100, storage.GetApplyCount()) replicator.Shutdown() }) } func TestScenario_Standalone_C_EdgeCases(t *testing.T) { config := DefaultConfig() config.Mode = ModeStandalone config.NodeID = "standalone-1" // Test C1: Empty command t.Run("C1_EmptyCommand", func(t *testing.T) { storage := NewMockStorage() replicator := NewStandaloneReplicator(config, storage) ctx := context.Background() require.NoError(t, replicator.Start(ctx)) cmd := &Command{Type: CmdCreateNode, Data: nil} err := replicator.Apply(cmd, time.Second) assert.NoError(t, err) // Should handle gracefully replicator.Shutdown() }) // Test C2: Large payload t.Run("C2_LargePayload", func(t *testing.T) { storage := NewMockStorage() replicator := NewStandaloneReplicator(config, storage) ctx := context.Background() require.NoError(t, replicator.Start(ctx)) largeData := make([]byte, 1024*1024) // 1MB cmd := &Command{Type: CmdCreateNode, Data: largeData} err := replicator.Apply(cmd, time.Second) assert.NoError(t, err) replicator.Shutdown() }) // Test C3: Zero timeout t.Run("C3_ZeroTimeout", func(t *testing.T) { storage := NewMockStorage() replicator := NewStandaloneReplicator(config, storage) ctx := context.Background() require.NoError(t, replicator.Start(ctx)) cmd := &Command{Type: CmdCreateNode, Data: []byte("test")} // Zero timeout should still work for standalone err := replicator.Apply(cmd, 0) assert.NoError(t, err) replicator.Shutdown() }) } func TestScenario_Standalone_D_Performance(t *testing.T) { if testing.Short() { t.Skip("Skipping performance test in short mode") } config := DefaultConfig() config.Mode = ModeStandalone config.NodeID = "standalone-1" // Test D1: High throughput t.Run("D1_HighThroughput", func(t *testing.T) { storage := NewMockStorage() replicator := NewStandaloneReplicator(config, storage) ctx := context.Background() require.NoError(t, replicator.Start(ctx)) start := time.Now() count := 10000 for i := 0; i < count; i++ { cmd := &Command{Type: CmdCreateNode, Data: []byte{byte(i % 256)}} require.NoError(t, replicator.Apply(cmd, time.Second)) } elapsed := time.Since(start) opsPerSec := float64(count) / elapsed.Seconds() t.Logf("Standalone throughput: %.0f ops/sec", opsPerSec) assert.Greater(t, opsPerSec, 1000.0, "Should achieve >1000 ops/sec") replicator.Shutdown() }) } // ============================================================================= // B. HOT STANDBY TESTS - PRIMARY PERSPECTIVE // ============================================================================= func TestScenario_HAStandby_Primary_A_BasicOperations(t *testing.T) { cluster := NewTestCluster(t) defer cluster.Shutdown() // Setup primary primaryConfig := DefaultConfig() primaryConfig.Mode = ModeHAStandby primaryConfig.NodeID = "primary-1" primaryConfig.HAStandby.Role = "primary" primaryConfig.HAStandby.PeerAddr = "standby:7688" primaryNode := cluster.AddNode("primary-1", primaryConfig) primary, err := NewHAStandbyReplicator(primaryConfig, primaryNode.Storage) require.NoError(t, err) primary.SetTransport(primaryNode.Transport) primaryNode.Replicator = primary ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, primary.Start(ctx)) // Test A1: Primary should accept writes t.Run("A1_AcceptWrites", func(t *testing.T) { cmd := &Command{Type: CmdCreateNode, Data: []byte("from-primary")} err := primary.Apply(cmd, time.Second) assert.NoError(t, err) }) // Test A2: Primary should report as leader t.Run("A2_ReportAsLeader", func(t *testing.T) { assert.True(t, primary.IsLeader()) status := primary.Health() assert.Equal(t, "primary", status.Role) }) // Test A3: Primary should track WAL position t.Run("A3_TrackWALPosition", func(t *testing.T) { // Apply more commands for i := 0; i < 5; i++ { cmd := &Command{Type: CmdCreateNode, Data: []byte{byte(i)}} require.NoError(t, primary.Apply(cmd, time.Second)) } // WAL position should advance pos, err := primaryNode.Storage.GetWALPosition() require.NoError(t, err) assert.Greater(t, pos, uint64(0)) }) // Test A4: Primary health check t.Run("A4_HealthCheck", func(t *testing.T) { status := primary.Health() assert.True(t, status.Healthy) }) cancel() primaryNode.Transport.Close() } func TestScenario_HAStandby_Primary_B_StandbyConnection(t *testing.T) { cluster := NewTestCluster(t) defer cluster.Shutdown() primaryConfig := DefaultConfig() primaryConfig.Mode = ModeHAStandby primaryConfig.NodeID = "primary-1" primaryConfig.HAStandby.Role = "primary" primaryConfig.HAStandby.PeerAddr = "standby:7688" primaryNode := cluster.AddNode("primary-1", primaryConfig) primary, err := NewHAStandbyReplicator(primaryConfig, primaryNode.Storage) require.NoError(t, err) primary.SetTransport(primaryNode.Transport) primaryNode.Replicator = primary ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, primary.Start(ctx)) // Test B1: Handle standby disconnect gracefully t.Run("B1_StandbyDisconnect", func(t *testing.T) { // Even with disconnected standby, primary should accept writes cmd := &Command{Type: CmdCreateNode, Data: []byte("during-disconnect")} err := primary.Apply(cmd, time.Second) assert.NoError(t, err) }) // Test B2: Handle reconnection t.Run("B2_StandbyReconnect", func(t *testing.T) { // Simulate standby reconnection by creating connection conn := cluster.ConnectNodes("primary-1", "standby") assert.NotNil(t, conn) }) cancel() primaryNode.Transport.Close() } func TestScenario_HAStandby_Primary_C_Failover(t *testing.T) { cluster := NewTestCluster(t) defer cluster.Shutdown() primaryConfig := DefaultConfig() primaryConfig.Mode = ModeHAStandby primaryConfig.NodeID = "primary-1" primaryConfig.HAStandby.Role = "primary" primaryConfig.HAStandby.PeerAddr = "standby:7688" primaryConfig.HAStandby.AutoFailover = true primaryNode := cluster.AddNode("primary-1", primaryConfig) primary, err := NewHAStandbyReplicator(primaryConfig, primaryNode.Storage) require.NoError(t, err) primary.SetTransport(primaryNode.Transport) primaryNode.Replicator = primary ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, primary.Start(ctx)) // Test C1: Primary should detect standby health t.Run("C1_DetectStandbyHealth", func(t *testing.T) { status := primary.Health() // With no standby connected, health reporting should indicate this assert.NotNil(t, status) }) // Test C2: Primary should continue operating if standby fails t.Run("C2_ContinueWithoutStandby", func(t *testing.T) { for i := 0; i < 10; i++ { cmd := &Command{Type: CmdCreateNode, Data: []byte{byte(i)}} err := primary.Apply(cmd, time.Second) assert.NoError(t, err) } }) cancel() primaryNode.Transport.Close() } func TestScenario_HAStandby_Primary_D_HighLatency(t *testing.T) { if testing.Short() { t.Skip("Skipping high-latency test in short mode") } cluster := NewTestCluster(t) defer cluster.Shutdown() primaryConfig := DefaultConfig() primaryConfig.Mode = ModeHAStandby primaryConfig.NodeID = "primary-1" primaryConfig.HAStandby.Role = "primary" primaryConfig.HAStandby.PeerAddr = "standby:7688" primaryConfig.HAStandby.HeartbeatInterval = 5 * time.Second primaryConfig.HAStandby.FailoverTimeout = 30 * time.Second primaryNode := cluster.AddNode("primary-1", primaryConfig) // Use chaos transport with high latency chaosConfig := CrossRegionChaosConfig() chaosConfig.CrossRegionLatency = 2000 * time.Millisecond // 2 second latency chaosTransport := NewChaosTransport(primaryNode.Transport, chaosConfig) primary, err := NewHAStandbyReplicator(primaryConfig, primaryNode.Storage) require.NoError(t, err) primary.SetTransport(chaosTransport) primaryNode.Replicator = primary ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, primary.Start(ctx)) // Test D1: Handle 2000ms+ latency t.Run("D1_HighLatencyOperations", func(t *testing.T) { cmd := &Command{Type: CmdCreateNode, Data: []byte("high-latency")} err := primary.Apply(cmd, 10*time.Second) // Generous timeout assert.NoError(t, err) }) cancel() primaryNode.Transport.Close() } // ============================================================================= // B. HOT STANDBY TESTS - STANDBY PERSPECTIVE // ============================================================================= func TestScenario_HAStandby_Standby_A_BasicOperations(t *testing.T) { cluster := NewTestCluster(t) defer cluster.Shutdown() // Setup standby standbyConfig := DefaultConfig() standbyConfig.Mode = ModeHAStandby standbyConfig.NodeID = "standby-1" standbyConfig.HAStandby.Role = "standby" standbyConfig.HAStandby.PeerAddr = "primary:7688" standbyNode := cluster.AddNode("standby-1", standbyConfig) standby, err := NewHAStandbyReplicator(standbyConfig, standbyNode.Storage) require.NoError(t, err) standby.SetTransport(standbyNode.Transport) standbyNode.Replicator = standby ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, standby.Start(ctx)) // Test A1: Standby should reject writes t.Run("A1_RejectWrites", func(t *testing.T) { cmd := &Command{Type: CmdCreateNode, Data: []byte("from-standby")} err := standby.Apply(cmd, time.Second) assert.Error(t, err) // Should be rejected }) // Test A2: Standby should not report as leader t.Run("A2_NotLeader", func(t *testing.T) { assert.False(t, standby.IsLeader()) status := standby.Health() assert.Equal(t, "standby", status.Role) }) // Test A3: Standby should track primary address t.Run("A3_TrackPrimaryAddress", func(t *testing.T) { leaderAddr := standby.LeaderAddr() assert.Contains(t, leaderAddr, "primary") }) // Test A4: Standby should be healthy when connected t.Run("A4_HealthWhenConnected", func(t *testing.T) { status := standby.Health() // May not be healthy if not connected, but should not crash assert.NotNil(t, status) }) cancel() standbyNode.Transport.Close() } func TestScenario_HAStandby_Standby_B_WALReplication(t *testing.T) { // Setup both primary and standby primaryStorage := NewMockStorage() standbyStorage := NewMockStorage() primaryConfig := DefaultConfig() primaryConfig.Mode = ModeHAStandby primaryConfig.NodeID = "primary-1" primaryConfig.HAStandby.Role = "primary" primaryConfig.HAStandby.PeerAddr = "standby:7688" standbyConfig := DefaultConfig() standbyConfig.Mode = ModeHAStandby standbyConfig.NodeID = "standby-1" standbyConfig.HAStandby.Role = "standby" standbyConfig.HAStandby.PeerAddr = "primary:7688" primaryTransport := NewMockTransport() standbyTransport := NewMockTransport() primary, err := NewHAStandbyReplicator(primaryConfig, primaryStorage) require.NoError(t, err) primary.SetTransport(primaryTransport) standby, err := NewHAStandbyReplicator(standbyConfig, standbyStorage) require.NoError(t, err) standby.SetTransport(standbyTransport) ctx, cancel := context.WithCancel(context.Background()) require.NoError(t, primary.Start(ctx)) require.NoError(t, standby.Start(ctx)) // Test B1: WAL applier should process entries in order t.Run("B1_WALOrderPreserved", func(t *testing.T) { applier := NewWALApplier(standbyStorage) entries := []*WALEntry{ {Position: 1, Command: &Command{Type: CmdCreateNode, Data: []byte("1")}}, {Position: 2, Command: &Command{Type: CmdCreateNode, Data: []byte("2")}}, {Position: 3, Command: &Command{Type: CmdCreateNode, Data: []byte("3")}}, } lastPos, err := applier.ApplyBatch(entries) require.NoError(t, err) assert.Equal(t, uint64(3), lastPos) assert.Equal(t, 3, standbyStorage.GetApplyCount()) }) // Test B2: WAL applier should handle gaps t.Run("B2_HandleWALGaps", func(t *testing.T) { storage := NewMockStorage() applier := NewWALApplier(storage) // Apply entries 1-3 entries1 := []*WALEntry{ {Position: 1, Command: &Command{Type: CmdCreateNode, Data: []byte("1")}}, {Position: 2, Command: &Command{Type: CmdCreateNode, Data: []byte("2")}}, {Position: 3, Command: &Command{Type: CmdCreateNode, Data: []byte("3")}}, } _, err := applier.ApplyBatch(entries1) require.NoError(t, err) // Skip to position 10 (gap from 4-9) entries2 := []*WALEntry{ {Position: 10, Command: &Command{Type: CmdCreateNode, Data: []byte("10")}}, } _, err = applier.ApplyBatch(entries2) require.NoError(t, err) }) cancel() primaryTransport.Close() standbyTransport.Close() primary.Shutdown() standby.Shutdown() } func TestScenario_HAStandby_Standby_C_Promotion(t *testing.T) { standbyConfig := DefaultConfig() standbyConfig.Mode = ModeHAStandby standbyConfig.NodeID = "standby-1" standbyConfig.HAStandby.Role = "standby" standbyConfig.HAStandby.PeerAddr = "primary:7688" standbyConfig.HAStandby.AutoFailover = true standbyStorage := NewMockStorage() standbyTransport := NewMockTransport() standby, err := NewHAStandbyReplicator(standbyConfig, standbyStorage) require.NoError(t, err) standby.SetTransport(standbyTransport) ctx, cancel := context.WithCancel(context.Background()) require.NoError(t, standby.Start(ctx)) // Test C1: Standby should detect primary failure t.Run("C1_DetectPrimaryFailure", func(t *testing.T) { // Without a real primary, standby should eventually detect failure status := standby.Health() assert.NotNil(t, status) }) // Test C2: After promotion, should accept writes t.Run("C2_AcceptWritesAfterPromotion", func(t *testing.T) { // Note: Manual promotion would be needed in real scenario // For now, verify the standby structure is correct assert.False(t, standby.IsLeader()) // Still standby without promotion }) cancel() standbyTransport.Close() standby.Shutdown() } func TestScenario_HAStandby_Standby_D_HighLatency(t *testing.T) { if testing.Short() { t.Skip("Skipping high-latency test in short mode") } standbyConfig := DefaultConfig() standbyConfig.Mode = ModeHAStandby standbyConfig.NodeID = "standby-1" standbyConfig.HAStandby.Role = "standby" standbyConfig.HAStandby.PeerAddr = "primary:7688" standbyConfig.HAStandby.HeartbeatInterval = 10 * time.Second standbyConfig.HAStandby.FailoverTimeout = 60 * time.Second standbyStorage := NewMockStorage() baseTransport := NewMockTransport() // High latency for cross-region scenario chaosConfig := &ChaosConfig{ CrossRegionLatency: 2500 * time.Millisecond, // 2.5 second latency CrossRegionJitter: 500 * time.Millisecond, } chaosTransport := NewChaosTransport(baseTransport, chaosConfig) standby, err := NewHAStandbyReplicator(standbyConfig, standbyStorage) require.NoError(t, err) standby.SetTransport(chaosTransport) ctx, cancel := context.WithCancel(context.Background()) require.NoError(t, standby.Start(ctx)) // Test D1: Standby should handle high latency without false failover t.Run("D1_NoFalseFailover", func(t *testing.T) { // With high latency settings, standby should not immediately trigger failover time.Sleep(time.Second) status := standby.Health() // Should still be standby, not promoted due to timeout assert.Equal(t, "standby", status.Role) }) cancel() baseTransport.Close() standby.Shutdown() } // ============================================================================= // C. RAFT CLUSTER TESTS - LEADER PERSPECTIVE // ============================================================================= func TestScenario_Raft_Leader_A_BasicOperations(t *testing.T) { leaderConfig := DefaultConfig() leaderConfig.Mode = ModeRaft leaderConfig.NodeID = "leader-1" leaderConfig.Raft.Bootstrap = true leaderStorage := NewMockStorage() leader, err := NewRaftReplicator(leaderConfig, leaderStorage) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, leader.Start(ctx)) // Test A1: Leader should accept writes t.Run("A1_AcceptWrites", func(t *testing.T) { cmd := &Command{Type: CmdCreateNode, Data: []byte("from-leader")} err := leader.Apply(cmd, time.Second) assert.NoError(t, err) }) // Test A2: Leader should report correct status t.Run("A2_LeaderStatus", func(t *testing.T) { assert.True(t, leader.IsLeader()) status := leader.Health() assert.Equal(t, "leader", status.Role) }) // Test A3: Leader should return own address t.Run("A3_LeaderAddress", func(t *testing.T) { addr := leader.LeaderAddr() assert.NotEmpty(t, addr) }) // Test A4: Leader should report healthy t.Run("A4_HealthCheck", func(t *testing.T) { status := leader.Health() assert.True(t, status.Healthy) }) leader.Shutdown() } func TestScenario_Raft_Leader_B_Consensus(t *testing.T) { // Test with 3-node cluster simulation configs := make([]*Config, 3) storages := make([]*MockStorage, 3) replicators := make([]*RaftReplicator, 3) for i := 0; i < 3; i++ { configs[i] = DefaultConfig() configs[i].Mode = ModeRaft configs[i].NodeID = fmt.Sprintf("node-%d", i+1) configs[i].Raft.Bootstrap = (i == 0) // Only first node bootstraps if i > 0 { // Non-bootstrap nodes need peers configured configs[i].Raft.Peers = []PeerConfig{ {ID: "node-1", Addr: "node1:7688"}, } } storages[i] = NewMockStorage() rep, err := NewRaftReplicator(configs[i], storages[i]) require.NoError(t, err) replicators[i] = rep } ctx, cancel := context.WithCancel(context.Background()) defer cancel() for _, rep := range replicators { require.NoError(t, rep.Start(ctx)) } // Test B1: Only one leader should exist t.Run("B1_SingleLeader", func(t *testing.T) { leaderCount := 0 for _, rep := range replicators { if rep.IsLeader() { leaderCount++ } } // In stub implementation, bootstrap node is always leader assert.GreaterOrEqual(t, leaderCount, 1) }) // Test B2: Leader should handle write t.Run("B2_LeaderHandleWrite", func(t *testing.T) { for _, rep := range replicators { if rep.IsLeader() { cmd := &Command{Type: CmdCreateNode, Data: []byte("consensus-write")} err := rep.Apply(cmd, time.Second) assert.NoError(t, err) break } } }) for _, rep := range replicators { rep.Shutdown() } } func TestScenario_Raft_Leader_C_FollowerFailure(t *testing.T) { // Bootstrap as single-node leader (follower failure is simulated via health check) // In production, the leader would have actual followers connected leaderConfig := DefaultConfig() leaderConfig.Mode = ModeRaft leaderConfig.NodeID = "leader-1" leaderConfig.Raft.Bootstrap = true // No peers - bootstrap as single node, followers would join dynamically leaderStorage := NewMockStorage() leader, err := NewRaftReplicator(leaderConfig, leaderStorage) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, leader.Start(ctx)) // Test C1: Leader should handle writes as single-node cluster t.Run("C1_ContinueWithMajority", func(t *testing.T) { // Single-node cluster has majority of 1 cmd := &Command{Type: CmdCreateNode, Data: []byte("with-majority")} err := leader.Apply(cmd, time.Second) assert.NoError(t, err) }) // Test C2: Leader should report peer status in health t.Run("C2_DetectFollowerHealth", func(t *testing.T) { status := leader.Health() // Peers map may be empty for single-node, but should not be nil assert.NotNil(t, status) }) leader.Shutdown() } func TestScenario_Raft_Leader_D_HighLatency(t *testing.T) { if testing.Short() { t.Skip("Skipping high-latency test in short mode") } leaderConfig := DefaultConfig() leaderConfig.Mode = ModeRaft leaderConfig.NodeID = "leader-1" leaderConfig.Raft.Bootstrap = true leaderConfig.Raft.ElectionTimeout = 10 * time.Second leaderConfig.Raft.HeartbeatTimeout = 5 * time.Second leaderConfig.Raft.CommitTimeout = 5 * time.Second leaderStorage := NewMockStorage() leader, err := NewRaftReplicator(leaderConfig, leaderStorage) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, leader.Start(ctx)) // Test D1: Handle high latency commits t.Run("D1_HighLatencyCommit", func(t *testing.T) { cmd := &Command{Type: CmdCreateNode, Data: []byte("high-latency-commit")} err := leader.Apply(cmd, 10*time.Second) assert.NoError(t, err) }) leader.Shutdown() } // ============================================================================= // C. RAFT CLUSTER TESTS - FOLLOWER PERSPECTIVE // ============================================================================= func TestScenario_Raft_Follower_A_BasicOperations(t *testing.T) { followerConfig := DefaultConfig() followerConfig.Mode = ModeRaft followerConfig.NodeID = "follower-1" followerConfig.Raft.Bootstrap = false followerConfig.Raft.Peers = []PeerConfig{ {ID: "leader-1", Addr: "leader:7688"}, } followerStorage := NewMockStorage() follower, err := NewRaftReplicator(followerConfig, followerStorage) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, follower.Start(ctx)) // Test A1: Follower should reject direct writes t.Run("A1_RejectDirectWrites", func(t *testing.T) { cmd := &Command{Type: CmdCreateNode, Data: []byte("from-follower")} err := follower.Apply(cmd, time.Second) // Should be ErrNotLeader or similar assert.Error(t, err) }) // Test A2: Follower should not report as leader t.Run("A2_NotLeader", func(t *testing.T) { assert.False(t, follower.IsLeader()) status := follower.Health() assert.Equal(t, "follower", status.Role) }) // Test A3: Follower should know leader address t.Run("A3_KnowLeaderAddress", func(t *testing.T) { addr := follower.LeaderAddr() // May be empty if not connected, but should not panic _ = addr }) // Test A4: Follower should report status t.Run("A4_ReportStatus", func(t *testing.T) { status := follower.Health() assert.NotNil(t, status) }) follower.Shutdown() } func TestScenario_Raft_Follower_B_LogReplication(t *testing.T) { followerConfig := DefaultConfig() followerConfig.Mode = ModeRaft followerConfig.NodeID = "follower-1" followerConfig.Raft.Bootstrap = false followerConfig.Raft.Peers = []PeerConfig{{ID: "leader-1", Addr: "leader1:7688"}} followerStorage := NewMockStorage() follower, err := NewRaftReplicator(followerConfig, followerStorage) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, follower.Start(ctx)) // Test B1: Follower should apply replicated log entries t.Run("B1_ApplyReplicatedEntries", func(t *testing.T) { // In real implementation, entries would come from leader // For now, verify the applier works applier := NewWALApplier(followerStorage) entries := []*WALEntry{ {Position: 1, Command: &Command{Type: CmdCreateNode, Data: []byte("1")}}, {Position: 2, Command: &Command{Type: CmdCreateNode, Data: []byte("2")}}, } lastPos, err := applier.ApplyBatch(entries) require.NoError(t, err) assert.Equal(t, uint64(2), lastPos) }) // Test B2: Follower should maintain log consistency t.Run("B2_LogConsistency", func(t *testing.T) { pos, err := followerStorage.GetWALPosition() require.NoError(t, err) assert.GreaterOrEqual(t, pos, uint64(0)) }) follower.Shutdown() } func TestScenario_Raft_Follower_C_LeaderElection(t *testing.T) { followerConfig := DefaultConfig() followerConfig.Mode = ModeRaft followerConfig.NodeID = "follower-1" followerConfig.Raft.Bootstrap = false followerConfig.Raft.Peers = []PeerConfig{{ID: "leader-1", Addr: "leader1:7688"}} followerConfig.Raft.ElectionTimeout = 500 * time.Millisecond followerStorage := NewMockStorage() follower, err := NewRaftReplicator(followerConfig, followerStorage) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, follower.Start(ctx)) // Test C1: Follower should be ready for election t.Run("C1_ReadyForElection", func(t *testing.T) { status := follower.Health() assert.NotNil(t, status) }) // Test C2: Follower should become candidate when leader unavailable t.Run("C2_BecomeCandidate", func(t *testing.T) { // In stub implementation, this would require timeout // Just verify no panic status := follower.Health() assert.NotNil(t, status) }) follower.Shutdown() } func TestScenario_Raft_Follower_D_HighLatency(t *testing.T) { if testing.Short() { t.Skip("Skipping high-latency test in short mode") } followerConfig := DefaultConfig() followerConfig.Mode = ModeRaft followerConfig.NodeID = "follower-1" followerConfig.Raft.Bootstrap = false followerConfig.Raft.Peers = []PeerConfig{{ID: "leader-1", Addr: "leader1:7688"}} followerConfig.Raft.ElectionTimeout = 30 * time.Second // Very long for high latency followerConfig.Raft.HeartbeatTimeout = 10 * time.Second followerStorage := NewMockStorage() follower, err := NewRaftReplicator(followerConfig, followerStorage) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, follower.Start(ctx)) // Test D1: Follower should not trigger false elections t.Run("D1_NoFalseElections", func(t *testing.T) { // With long timeout, should remain follower time.Sleep(500 * time.Millisecond) assert.False(t, follower.IsLeader()) }) follower.Shutdown() } // ============================================================================= // D. MULTI-REGION TESTS // ============================================================================= func TestScenario_MultiRegion_A_LocalCluster(t *testing.T) { regionConfig := DefaultConfig() regionConfig.Mode = ModeMultiRegion regionConfig.NodeID = "us-east-1" regionConfig.MultiRegion.RegionID = "us-east" regionConfig.MultiRegion.LocalCluster.Bootstrap = true regionStorage := NewMockStorage() region, err := NewMultiRegionReplicator(regionConfig, regionStorage) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, region.Start(ctx)) // Test A1: Local writes should succeed t.Run("A1_LocalWrites", func(t *testing.T) { cmd := &Command{Type: CmdCreateNode, Data: []byte("local-write")} err := region.Apply(cmd, time.Second) assert.NoError(t, err) }) // Test A2: Should report region info t.Run("A2_RegionInfo", func(t *testing.T) { status := region.Health() assert.NotNil(t, status) }) // Test A3: Should handle local consensus t.Run("A3_LocalConsensus", func(t *testing.T) { for i := 0; i < 5; i++ { cmd := &Command{Type: CmdCreateNode, Data: []byte{byte(i)}} err := region.Apply(cmd, time.Second) assert.NoError(t, err) } }) // Test A4: Should be healthy t.Run("A4_HealthStatus", func(t *testing.T) { status := region.Health() assert.True(t, status.Healthy) }) region.Shutdown() } func TestScenario_MultiRegion_B_CrossRegion(t *testing.T) { usEastConfig := DefaultConfig() usEastConfig.Mode = ModeMultiRegion usEastConfig.NodeID = "us-east-1" usEastConfig.MultiRegion.RegionID = "us-east" usEastConfig.MultiRegion.LocalCluster.Bootstrap = true usEastConfig.MultiRegion.RemoteRegions = []RemoteRegionConfig{ {RegionID: "eu-west", Addrs: []string{"eu-coordinator:7688"}, Priority: 1}, } usEastConfig.MultiRegion.CrossRegionSyncMode = SyncAsync euWestConfig := DefaultConfig() euWestConfig.Mode = ModeMultiRegion euWestConfig.NodeID = "eu-west-1" euWestConfig.MultiRegion.RegionID = "eu-west" euWestConfig.MultiRegion.LocalCluster.Bootstrap = true euWestConfig.MultiRegion.RemoteRegions = []RemoteRegionConfig{ {RegionID: "us-east", Addrs: []string{"us-coordinator:7688"}, Priority: 1}, } euWestConfig.MultiRegion.CrossRegionSyncMode = SyncAsync usEastStorage := NewMockStorage() euWestStorage := NewMockStorage() usEast, err := NewMultiRegionReplicator(usEastConfig, usEastStorage) require.NoError(t, err) euWest, err := NewMultiRegionReplicator(euWestConfig, euWestStorage) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, usEast.Start(ctx)) require.NoError(t, euWest.Start(ctx)) // Test B1: Both regions should accept local writes t.Run("B1_BothRegionsAcceptWrites", func(t *testing.T) { cmdUS := &Command{Type: CmdCreateNode, Data: []byte("us-write")} require.NoError(t, usEast.Apply(cmdUS, time.Second)) cmdEU := &Command{Type: CmdCreateNode, Data: []byte("eu-write")} require.NoError(t, euWest.Apply(cmdEU, time.Second)) }) // Test B2: Both regions should report healthy t.Run("B2_BothRegionsHealthy", func(t *testing.T) { usStatus := usEast.Health() euStatus := euWest.Health() assert.True(t, usStatus.Healthy) assert.True(t, euStatus.Healthy) }) usEast.Shutdown() euWest.Shutdown() } func TestScenario_MultiRegion_C_RegionFailover(t *testing.T) { primaryRegionConfig := DefaultConfig() primaryRegionConfig.Mode = ModeMultiRegion primaryRegionConfig.NodeID = "primary-region-1" primaryRegionConfig.MultiRegion.RegionID = "primary" primaryRegionConfig.MultiRegion.LocalCluster.Bootstrap = true primaryRegionConfig.MultiRegion.RemoteRegions = []RemoteRegionConfig{ {RegionID: "secondary", Addrs: []string{"secondary:7688"}, Priority: 1}, } primaryStorage := NewMockStorage() primaryRegion, err := NewMultiRegionReplicator(primaryRegionConfig, primaryStorage) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, primaryRegion.Start(ctx)) // Test C1: Primary region should handle writes t.Run("C1_PrimaryHandlesWrites", func(t *testing.T) { for i := 0; i < 10; i++ { cmd := &Command{Type: CmdCreateNode, Data: []byte{byte(i)}} err := primaryRegion.Apply(cmd, time.Second) assert.NoError(t, err) } }) // Test C2: Primary should track remote region status t.Run("C2_TrackRemoteRegions", func(t *testing.T) { status := primaryRegion.Health() assert.NotNil(t, status) }) primaryRegion.Shutdown() } func TestScenario_MultiRegion_D_HighLatency(t *testing.T) { if testing.Short() { t.Skip("Skipping high-latency cross-region test in short mode") } // Simulate US-EU cross-region (100-200ms typical) usConfig := DefaultConfig() usConfig.Mode = ModeMultiRegion usConfig.NodeID = "us-1" usConfig.MultiRegion.RegionID = "us" usConfig.MultiRegion.LocalCluster.Bootstrap = true usConfig.MultiRegion.CrossRegionBatchTimeout = 5 * time.Second usStorage := NewMockStorage() us, err := NewMultiRegionReplicator(usConfig, usStorage) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, us.Start(ctx)) // Test D1: Local writes should not be affected by cross-region latency t.Run("D1_LocalWritesUnaffected", func(t *testing.T) { start := time.Now() for i := 0; i < 10; i++ { cmd := &Command{Type: CmdCreateNode, Data: []byte{byte(i)}} err := us.Apply(cmd, time.Second) require.NoError(t, err) } elapsed := time.Since(start) t.Logf("10 local writes took: %v", elapsed) // Local writes should be fast (< 1 second for 10) assert.Less(t, elapsed, 5*time.Second) }) // Test D2: Handle 2000ms+ cross-region latency t.Run("D2_HighCrossRegionLatency", func(t *testing.T) { // The async replication should handle high latency without blocking cmd := &Command{Type: CmdCreateNode, Data: []byte("cross-region-latency")} err := us.Apply(cmd, time.Second) // Local write should still be fast assert.NoError(t, err) }) us.Shutdown() } // ============================================================================= // CROSS-CUTTING SCENARIO TESTS // ============================================================================= func TestScenario_CrossCutting_A_ModeTransitions(t *testing.T) { storage := NewMockStorage() // Test A1: Standalone to HA transition t.Run("A1_StandaloneToHA", func(t *testing.T) { // Start standalone standaloneConfig := DefaultConfig() standaloneConfig.Mode = ModeStandalone standaloneConfig.NodeID = "node-1" standalone := NewStandaloneReplicator(standaloneConfig, storage) ctx := context.Background() require.NoError(t, standalone.Start(ctx)) // Write some data cmd := &Command{Type: CmdCreateNode, Data: []byte("standalone-data")} require.NoError(t, standalone.Apply(cmd, time.Second)) standalone.Shutdown() // Transition to HA primary haConfig := DefaultConfig() haConfig.Mode = ModeHAStandby haConfig.NodeID = "node-1" haConfig.HAStandby.Role = "primary" haConfig.HAStandby.PeerAddr = "standby:7688" transport := NewMockTransport() ha, err := NewHAStandbyReplicator(haConfig, storage) require.NoError(t, err) ha.SetTransport(transport) haCtx, cancel := context.WithCancel(context.Background()) require.NoError(t, ha.Start(haCtx)) // Should still work cmd2 := &Command{Type: CmdCreateNode, Data: []byte("ha-data")} require.NoError(t, ha.Apply(cmd2, time.Second)) cancel() transport.Close() ha.Shutdown() }) // Test A2: HA to Raft transition t.Run("A2_HAToRaft", func(t *testing.T) { storage := NewMockStorage() // Start as HA primary haConfig := DefaultConfig() haConfig.Mode = ModeHAStandby haConfig.NodeID = "node-1" haConfig.HAStandby.Role = "primary" haConfig.HAStandby.PeerAddr = "standby:7688" transport := NewMockTransport() ha, err := NewHAStandbyReplicator(haConfig, storage) require.NoError(t, err) ha.SetTransport(transport) haCtx, cancel := context.WithCancel(context.Background()) require.NoError(t, ha.Start(haCtx)) cmd := &Command{Type: CmdCreateNode, Data: []byte("ha-data")} require.NoError(t, ha.Apply(cmd, time.Second)) cancel() transport.Close() ha.Shutdown() // Transition to Raft raftConfig := DefaultConfig() raftConfig.Mode = ModeRaft raftConfig.NodeID = "node-1" raftConfig.Raft.Bootstrap = true raft, err := NewRaftReplicator(raftConfig, storage) require.NoError(t, err) raftCtx, cancel2 := context.WithCancel(context.Background()) require.NoError(t, raft.Start(raftCtx)) cmd2 := &Command{Type: CmdCreateNode, Data: []byte("raft-data")} require.NoError(t, raft.Apply(cmd2, time.Second)) cancel2() raft.Shutdown() }) } func TestScenario_CrossCutting_B_ConcurrentModes(t *testing.T) { // Test B1: Multiple standalone instances (no conflict, separate storage) t.Run("B1_MultipleStandalone", func(t *testing.T) { var wg sync.WaitGroup errors := make(chan error, 3) for i := 0; i < 3; i++ { wg.Add(1) go func(n int) { defer wg.Done() storage := NewMockStorage() config := DefaultConfig() config.Mode = ModeStandalone config.NodeID = fmt.Sprintf("standalone-%d", n) rep := NewStandaloneReplicator(config, storage) ctx := context.Background() if err := rep.Start(ctx); err != nil { errors <- err return } for j := 0; j < 100; j++ { cmd := &Command{Type: CmdCreateNode, Data: []byte{byte(j)}} if err := rep.Apply(cmd, time.Second); err != nil { errors <- err return } } rep.Shutdown() }(i) } wg.Wait() close(errors) var errCount int for err := range errors { t.Logf("Error: %v", err) errCount++ } assert.Equal(t, 0, errCount) }) } func TestScenario_CrossCutting_C_ConfigValidation(t *testing.T) { // Test C1: Invalid mode t.Run("C1_InvalidMode", func(t *testing.T) { config := DefaultConfig() config.Mode = "invalid" err := config.Validate() assert.Error(t, err) }) // Test C2: Missing required fields t.Run("C2_MissingNodeID", func(t *testing.T) { config := DefaultConfig() config.Mode = ModeHAStandby config.NodeID = "" config.HAStandby.Role = "primary" config.HAStandby.PeerAddr = "standby:7688" err := config.Validate() assert.Error(t, err) }) // Test C3: Invalid HA role t.Run("C3_InvalidHARole", func(t *testing.T) { config := DefaultConfig() config.Mode = ModeHAStandby config.NodeID = "node-1" config.HAStandby.Role = "invalid" err := config.Validate() assert.Error(t, err) }) // Test C4: Missing peer address t.Run("C4_MissingPeerAddr", func(t *testing.T) { config := DefaultConfig() config.Mode = ModeHAStandby config.NodeID = "node-1" config.HAStandby.Role = "primary" config.HAStandby.PeerAddr = "" err := config.Validate() assert.Error(t, err) }) } func TestScenario_CrossCutting_D_Metrics(t *testing.T) { // Test D1: WAL metrics t.Run("D1_WALMetrics", func(t *testing.T) { storage := NewMockStorage() config := DefaultConfig() config.Mode = ModeStandalone config.NodeID = "metrics-test" rep := NewStandaloneReplicator(config, storage) ctx := context.Background() require.NoError(t, rep.Start(ctx)) // Apply some commands for i := 0; i < 100; i++ { cmd := &Command{Type: CmdCreateNode, Data: []byte{byte(i)}} require.NoError(t, rep.Apply(cmd, time.Second)) } // Check WAL position pos, err := storage.GetWALPosition() require.NoError(t, err) assert.Equal(t, uint64(100), pos) rep.Shutdown() }) // Test D2: Status reporting t.Run("D2_StatusReporting", func(t *testing.T) { storage := NewMockStorage() config := DefaultConfig() config.Mode = ModeStandalone config.NodeID = "status-test" rep := NewStandaloneReplicator(config, storage) ctx := context.Background() require.NoError(t, rep.Start(ctx)) status := rep.Health() assert.True(t, status.Healthy) assert.Equal(t, "standalone", status.Role) // Standalone reports as "standalone" assert.NotEmpty(t, status.NodeID) rep.Shutdown() }) } // ============================================================================= // LATENCY-SPECIFIC TESTS FOR CROSS-REGION (2000ms+) // These tests ACTUALLY simulate network latency - not stubbed or skipped // ============================================================================= // TestScenario_HighLatency_DirectConnection tests that actual latency is applied // to peer connections. This verifies the ChaosPeerConn is working correctly. func TestScenario_HighLatency_DirectConnection(t *testing.T) { // Test various latencies directly on the connection layer testCases := []struct { name string latency time.Duration minExpected time.Duration // Minimum expected elapsed time maxExpected time.Duration // Maximum expected elapsed time }{ {"100ms", 100 * time.Millisecond, 100 * time.Millisecond, 200 * time.Millisecond}, {"500ms", 500 * time.Millisecond, 500 * time.Millisecond, 700 * time.Millisecond}, {"1000ms", 1000 * time.Millisecond, 1000 * time.Millisecond, 1300 * time.Millisecond}, {"2000ms", 2000 * time.Millisecond, 2000 * time.Millisecond, 2500 * time.Millisecond}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // Create a mock peer connection basePeer := &MockPeerConn{ connected: true, } // Configure chaos with the specific latency chaosConfig := &ChaosConfig{ CrossRegionLatency: tc.latency, CrossRegionJitter: tc.latency / 20, // 5% jitter } // Wrap in chaos peer chaosPeer := NewChaosPeerConn(basePeer, chaosConfig) // Prepare a WAL entry entries := []*WALEntry{ { Position: 1, Command: &Command{ Type: CmdCreateNode, Data: []byte("latency-test-data"), }, }, } // Measure actual time taken ctx, cancel := context.WithTimeout(context.Background(), tc.maxExpected+5*time.Second) defer cancel() start := time.Now() _, err := chaosPeer.SendWALBatch(ctx, entries) elapsed := time.Since(start) t.Logf("SendWALBatch with %v configured latency took %v", tc.latency, elapsed) // Verify latency was actually simulated require.NoError(t, err, "SendWALBatch should succeed") assert.GreaterOrEqual(t, elapsed, tc.minExpected, "Latency should be at least %v, got %v", tc.minExpected, elapsed) assert.LessOrEqual(t, elapsed, tc.maxExpected, "Latency should be at most %v, got %v", tc.maxExpected, elapsed) }) } } // TestScenario_HighLatency_Heartbeat tests that heartbeats also experience latency. func TestScenario_HighLatency_Heartbeat(t *testing.T) { latencies := []time.Duration{ 200 * time.Millisecond, 500 * time.Millisecond, 1000 * time.Millisecond, } for _, latency := range latencies { t.Run(fmt.Sprintf("Heartbeat_%dms", latency.Milliseconds()), func(t *testing.T) { basePeer := &MockPeerConn{ connected: true, } chaosConfig := &ChaosConfig{ CrossRegionLatency: latency, CrossRegionJitter: latency / 10, } chaosPeer := NewChaosPeerConn(basePeer, chaosConfig) req := &HeartbeatRequest{ NodeID: "test-primary", WALPosition: 100, } ctx, cancel := context.WithTimeout(context.Background(), latency*5) defer cancel() start := time.Now() _, err := chaosPeer.SendHeartbeat(ctx, req) elapsed := time.Since(start) t.Logf("SendHeartbeat with %v configured latency took %v", latency, elapsed) require.NoError(t, err) assert.GreaterOrEqual(t, elapsed, latency, "Heartbeat should take at least %v, got %v", latency, elapsed) }) } } // TestScenario_HighLatency_CrossRegion_2000ms_Plus tests extreme cross-region latency. // This is NOT SKIPPED - it tests actual 2000ms+ latencies. func TestScenario_HighLatency_CrossRegion_2000ms_Plus(t *testing.T) { // Cross-region latencies we need to handle // US-EU: ~100-150ms // US-Asia: ~200-300ms // Extreme cases: 2000ms+ (bad network, satellite, etc.) testCases := []struct { name string latency time.Duration operations int description string }{ { name: "US_to_EU_150ms", latency: 150 * time.Millisecond, operations: 5, description: "Typical US-EU latency", }, { name: "US_to_Asia_300ms", latency: 300 * time.Millisecond, operations: 5, description: "Typical US-Asia latency", }, { name: "Degraded_Network_1000ms", latency: 1000 * time.Millisecond, operations: 3, description: "Degraded network conditions", }, { name: "Extreme_2000ms", latency: 2000 * time.Millisecond, operations: 2, description: "Extreme latency scenario", }, { name: "Satellite_2500ms", latency: 2500 * time.Millisecond, operations: 2, description: "Satellite connection latency", }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { basePeer := &MockPeerConn{connected: true} chaosConfig := &ChaosConfig{ CrossRegionLatency: tc.latency, CrossRegionJitter: tc.latency / 10, // 10% jitter } chaosPeer := NewChaosPeerConn(basePeer, chaosConfig) t.Logf("Testing %s: %v latency with %d operations", tc.description, tc.latency, tc.operations) totalTime := time.Duration(0) for i := 0; i < tc.operations; i++ { entries := []*WALEntry{ { Position: uint64(i + 1), Command: &Command{ Type: CmdCreateNode, Data: []byte(fmt.Sprintf("cross-region-op-%d", i)), }, }, } ctx, cancel := context.WithTimeout(context.Background(), tc.latency*3) start := time.Now() _, err := chaosPeer.SendWALBatch(ctx, entries) elapsed := time.Since(start) cancel() require.NoError(t, err, "Operation %d should succeed", i) totalTime += elapsed // Each operation should take at least the configured latency assert.GreaterOrEqual(t, elapsed, tc.latency*9/10, // Allow 10% under for timing variance "Operation %d should take at least %v, got %v", i, tc.latency*9/10, elapsed) } avgTime := totalTime / time.Duration(tc.operations) t.Logf("Completed %d operations, avg latency: %v, total: %v", tc.operations, avgTime, totalTime) // Average should be close to configured latency assert.GreaterOrEqual(t, avgTime, tc.latency*9/10, "Average latency should be at least %v", tc.latency*9/10) }) } } // TestScenario_HighLatency_Transport_Connection tests transport layer connection latency. func TestScenario_HighLatency_Transport_Connection(t *testing.T) { latencies := []time.Duration{ 100 * time.Millisecond, 500 * time.Millisecond, 1000 * time.Millisecond, } for _, latency := range latencies { t.Run(fmt.Sprintf("Connect_%dms", latency.Milliseconds()), func(t *testing.T) { baseTransport := NewMockTransport() defer baseTransport.Close() chaosConfig := &ChaosConfig{ CrossRegionLatency: latency, CrossRegionJitter: latency / 10, } chaosTransport := NewChaosTransport(baseTransport, chaosConfig) ctx, cancel := context.WithTimeout(context.Background(), latency*3) defer cancel() start := time.Now() conn, err := chaosTransport.Connect(ctx, "peer:7688") elapsed := time.Since(start) t.Logf("Transport.Connect with %v configured latency took %v", latency, elapsed) require.NoError(t, err) require.NotNil(t, conn) // Connection should take at least the configured latency assert.GreaterOrEqual(t, elapsed, latency*9/10, "Connect should take at least %v, got %v", latency*9/10, elapsed) }) } } // TestScenario_HighLatency_ContextTimeout verifies operations timeout correctly with latency. func TestScenario_HighLatency_ContextTimeout(t *testing.T) { basePeer := &MockPeerConn{connected: true} // Configure 5 second latency chaosConfig := &ChaosConfig{ CrossRegionLatency: 5 * time.Second, } chaosPeer := NewChaosPeerConn(basePeer, chaosConfig) entries := []*WALEntry{ {Position: 1, Command: &Command{Type: CmdCreateNode, Data: []byte("timeout-test")}}, } // Use a 1 second timeout - should fail because latency is 5 seconds ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() start := time.Now() _, err := chaosPeer.SendWALBatch(ctx, entries) elapsed := time.Since(start) t.Logf("Operation with 5s latency and 1s timeout took %v", elapsed) // Should timeout require.Error(t, err, "Should timeout when latency exceeds context deadline") assert.Contains(t, err.Error(), "context deadline exceeded", "Error should indicate timeout") // Should take approximately 1 second (the timeout duration) assert.GreaterOrEqual(t, elapsed, 900*time.Millisecond, "Should wait close to timeout duration") assert.LessOrEqual(t, elapsed, 1500*time.Millisecond, "Should not wait much longer than timeout") } // TestScenario_HighLatency_MultipleOperations_Sequential tests sequential operations with latency. func TestScenario_HighLatency_MultipleOperations_Sequential(t *testing.T) { basePeer := &MockPeerConn{connected: true} latency := 200 * time.Millisecond chaosConfig := &ChaosConfig{ CrossRegionLatency: latency, CrossRegionJitter: 20 * time.Millisecond, } chaosPeer := NewChaosPeerConn(basePeer, chaosConfig) numOperations := 10 start := time.Now() for i := 0; i < numOperations; i++ { entries := []*WALEntry{ {Position: uint64(i), Command: &Command{Type: CmdCreateNode, Data: []byte{byte(i)}}}, } ctx, cancel := context.WithTimeout(context.Background(), latency*2) _, err := chaosPeer.SendWALBatch(ctx, entries) cancel() require.NoError(t, err, "Operation %d should succeed", i) } totalElapsed := time.Since(start) expectedMin := latency * time.Duration(numOperations) * 9 / 10 // 90% of expected t.Logf("%d sequential operations with %v latency took %v (expected ~%v)", numOperations, latency, totalElapsed, latency*time.Duration(numOperations)) assert.GreaterOrEqual(t, totalElapsed, expectedMin, "Total time should be at least %v, got %v", expectedMin, totalElapsed) } // TestScenario_HighLatency_Concurrent tests concurrent operations with latency. func TestScenario_HighLatency_Concurrent(t *testing.T) { basePeer := &MockPeerConn{connected: true} latency := 300 * time.Millisecond chaosConfig := &ChaosConfig{ CrossRegionLatency: latency, CrossRegionJitter: 30 * time.Millisecond, } chaosPeer := NewChaosPeerConn(basePeer, chaosConfig) numWorkers := 5 var wg sync.WaitGroup results := make(chan time.Duration, numWorkers) start := time.Now() for i := 0; i < numWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() entries := []*WALEntry{ {Position: uint64(workerID), Command: &Command{Type: CmdCreateNode, Data: []byte{byte(workerID)}}}, } ctx, cancel := context.WithTimeout(context.Background(), latency*3) defer cancel() opStart := time.Now() _, _ = chaosPeer.SendWALBatch(ctx, entries) results <- time.Since(opStart) }(i) } wg.Wait() close(results) totalElapsed := time.Since(start) var totalLatency time.Duration count := 0 for elapsed := range results { totalLatency += elapsed count++ } avgLatency := totalLatency / time.Duration(count) t.Logf("%d concurrent workers with %v latency completed in %v, avg per-worker: %v", numWorkers, latency, totalElapsed, avgLatency) // Each worker should experience the full latency assert.GreaterOrEqual(t, avgLatency, latency*9/10, "Average per-worker latency should be at least %v", latency*9/10) // But total time should be less than sequential (due to concurrency) // With true parallelism, should complete in ~1x latency, not Nx assert.LessOrEqual(t, totalElapsed, latency*time.Duration(numWorkers), "Concurrent operations should complete faster than sequential") } // ============================================================================= // STATISTICS TRACKING // ============================================================================= // TestStats tracks test execution statistics. type TestStats struct { mu sync.Mutex testsRun int64 testsPassed int64 testsFailed int64 latencyMax time.Duration latencyTotal time.Duration operationsCount int64 } var globalStats = &TestStats{} func (s *TestStats) RecordTest(passed bool, latency time.Duration) { s.mu.Lock() defer s.mu.Unlock() atomic.AddInt64(&s.testsRun, 1) if passed { atomic.AddInt64(&s.testsPassed, 1) } else { atomic.AddInt64(&s.testsFailed, 1) } s.latencyTotal += latency if latency > s.latencyMax { s.latencyMax = latency } atomic.AddInt64(&s.operationsCount, 1) } func (s *TestStats) Summary() string { s.mu.Lock() defer s.mu.Unlock() avgLatency := time.Duration(0) if s.operationsCount > 0 { avgLatency = s.latencyTotal / time.Duration(s.operationsCount) } return fmt.Sprintf( "Tests: %d run, %d passed, %d failed | Max latency: %v | Avg latency: %v", s.testsRun, s.testsPassed, s.testsFailed, s.latencyMax, avgLatency, ) } // ============================================================================= // MULTI-REGION REPLICATION TESTS // ============================================================================= func TestMultiRegion_SingleRegion_Bootstrap(t *testing.T) { // Test A: Single region (no remote regions) acts as primary storage := NewMockStorage() config := DefaultConfig() config.Mode = ModeMultiRegion config.NodeID = "us-east-1" config.MultiRegion.RegionID = "us-east" config.MultiRegion.LocalCluster.Bootstrap = true config.MultiRegion.RemoteRegions = nil // No remote regions replicator, err := NewMultiRegionReplicator(config, storage) require.NoError(t, err) ctx := context.Background() require.NoError(t, replicator.Start(ctx)) // Should be primary region when no remotes assert.True(t, replicator.IsPrimaryRegion()) assert.Equal(t, "us-east", replicator.RegionID()) assert.Equal(t, ModeMultiRegion, replicator.Mode()) // Wait for Raft leader election require.NoError(t, replicator.WaitForLeader(ctx)) assert.True(t, replicator.IsLeader()) // Should be able to apply commands cmd := &Command{Type: CmdCreateNode, Data: []byte("test")} require.NoError(t, replicator.Apply(cmd, time.Second)) // Health check health := replicator.Health() assert.True(t, health.Healthy) assert.Equal(t, "us-east", health.Region) assert.True(t, health.IsLeader) replicator.Shutdown() } func TestMultiRegion_TwoRegions_CrossRegionConnection(t *testing.T) { // Test B: Two regions - verify cross-region connection storage1 := NewMockStorage() storage2 := NewMockStorage() // Primary region (us-east) config1 := DefaultConfig() config1.Mode = ModeMultiRegion config1.NodeID = "us-east-1" config1.MultiRegion.RegionID = "us-east" config1.MultiRegion.LocalCluster.Bootstrap = true config1.MultiRegion.RemoteRegions = []RemoteRegionConfig{ {RegionID: "eu-west", Addrs: []string{"eu-west-1:7688"}, Priority: 1}, } // Secondary region (eu-west) config2 := DefaultConfig() config2.Mode = ModeMultiRegion config2.NodeID = "eu-west-1" config2.MultiRegion.RegionID = "eu-west" config2.MultiRegion.LocalCluster.Bootstrap = true config2.MultiRegion.RemoteRegions = []RemoteRegionConfig{ {RegionID: "us-east", Addrs: []string{"us-east-1:7688"}, Priority: 1}, } region1, err := NewMultiRegionReplicator(config1, storage1) require.NoError(t, err) region2, err := NewMultiRegionReplicator(config2, storage2) require.NoError(t, err) // Set up mock transports transport1 := NewMockTransport() transport2 := NewMockTransport() region1.SetTransport(transport1) region2.SetTransport(transport2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Start both regions require.NoError(t, region1.Start(ctx)) require.NoError(t, region2.Start(ctx)) // Wait for both to elect leaders require.NoError(t, region1.WaitForLeader(ctx)) require.NoError(t, region2.WaitForLeader(ctx)) // Verify both regions are running assert.True(t, region1.IsLeader()) assert.True(t, region2.IsLeader()) // Primary region should have eu-west as peer health1 := region1.Health() assert.Equal(t, "us-east", health1.Region) health2 := region2.Health() assert.Equal(t, "eu-west", health2.Region) // Cancel context first to stop listeners cancel() transport1.Close() transport2.Close() region1.Shutdown() region2.Shutdown() } func TestMultiRegion_RegionFailover(t *testing.T) { // Test C: Region failover - secondary becomes primary storage1 := NewMockStorage() storage2 := NewMockStorage() // Primary region (us-east) config1 := DefaultConfig() config1.Mode = ModeMultiRegion config1.NodeID = "us-east-1" config1.MultiRegion.RegionID = "us-east" config1.MultiRegion.LocalCluster.Bootstrap = true config1.MultiRegion.RemoteRegions = nil // Start as sole primary // Secondary region (eu-west) config2 := DefaultConfig() config2.Mode = ModeMultiRegion config2.NodeID = "eu-west-1" config2.MultiRegion.RegionID = "eu-west" config2.MultiRegion.LocalCluster.Bootstrap = true config2.MultiRegion.RemoteRegions = []RemoteRegionConfig{ {RegionID: "us-east", Addrs: []string{"us-east-1:7688"}, Priority: 1}, } region1, err := NewMultiRegionReplicator(config1, storage1) require.NoError(t, err) region2, err := NewMultiRegionReplicator(config2, storage2) require.NoError(t, err) ctx := context.Background() // Start both regions require.NoError(t, region1.Start(ctx)) require.NoError(t, region2.Start(ctx)) // Wait for leaders require.NoError(t, region1.WaitForLeader(ctx)) require.NoError(t, region2.WaitForLeader(ctx)) // Region1 (us-east) is primary (no remote regions) assert.True(t, region1.IsPrimaryRegion()) // Region2 (eu-west) is not primary initially (has remote regions) assert.False(t, region2.IsPrimaryRegion()) // Simulate region1 failure by shutting it down region1.Shutdown() // Region2 should be able to perform failover err = region2.RegionFailover(ctx) require.NoError(t, err) // Now region2 should be primary assert.True(t, region2.IsPrimaryRegion()) // Should still be able to write cmd := &Command{Type: CmdCreateNode, Data: []byte("after failover")} require.NoError(t, region2.Apply(cmd, time.Second)) region2.Shutdown() } func TestMultiRegion_FailoverRequiresLeader(t *testing.T) { // Test D: Failover should fail if not local Raft leader storage := NewMockStorage() config := DefaultConfig() config.Mode = ModeMultiRegion config.NodeID = "eu-west-1" config.MultiRegion.RegionID = "eu-west" config.MultiRegion.LocalCluster.Bootstrap = false config.MultiRegion.LocalCluster.Peers = []PeerConfig{ {ID: "eu-west-2", Addr: "eu-west-2:7688"}, } config.MultiRegion.RemoteRegions = []RemoteRegionConfig{ {RegionID: "us-east", Addrs: []string{"us-east-1:7688"}, Priority: 1}, } region, err := NewMultiRegionReplicator(config, storage) require.NoError(t, err) // Use failing transport so it can't become leader failingTransport := &FailingTransport{connectError: fmt.Errorf("connection refused")} region.SetTransport(failingTransport) ctx := context.Background() require.NoError(t, region.Start(ctx)) // Node is not leader (can't reach peers) assert.False(t, region.IsLeader()) // Failover should fail because not leader err = region.RegionFailover(ctx) assert.Equal(t, ErrNotLeader, err) // Should still not be primary assert.False(t, region.IsPrimaryRegion()) region.Shutdown() } func TestMultiRegion_WALStreaming(t *testing.T) { // Test E: WAL entries are streamed to remote regions storage1 := NewMockStorage() storage2 := NewMockStorage() // Primary region config1 := DefaultConfig() config1.Mode = ModeMultiRegion config1.NodeID = "us-east-1" config1.MultiRegion.RegionID = "us-east" config1.MultiRegion.LocalCluster.Bootstrap = true config1.MultiRegion.RemoteRegions = []RemoteRegionConfig{ {RegionID: "eu-west", Addrs: []string{"eu-west-1:7688"}, Priority: 1}, } // Secondary region config2 := DefaultConfig() config2.Mode = ModeMultiRegion config2.NodeID = "eu-west-1" config2.MultiRegion.RegionID = "eu-west" config2.MultiRegion.LocalCluster.Bootstrap = true config2.MultiRegion.RemoteRegions = nil // No outbound replication from secondary region1, err := NewMultiRegionReplicator(config1, storage1) require.NoError(t, err) region2, err := NewMultiRegionReplicator(config2, storage2) require.NoError(t, err) // Set up mock transport transport1 := NewMockTransport() region1.SetTransport(transport1) ctx := context.Background() // Start regions require.NoError(t, region1.Start(ctx)) require.NoError(t, region2.Start(ctx)) // Wait for leaders require.NoError(t, region1.WaitForLeader(ctx)) // Apply some commands for i := 0; i < 5; i++ { cmd := &Command{Type: CmdCreateNode, Data: []byte(fmt.Sprintf("node-%d", i))} require.NoError(t, region1.Apply(cmd, time.Second)) } // Verify WAL position increased pos, err := storage1.GetWALPosition() require.NoError(t, err) assert.Equal(t, uint64(5), pos) region1.Shutdown() region2.Shutdown() } func TestMultiRegion_HealthStatus(t *testing.T) { // Test F: Health status correctly reports region info storage := NewMockStorage() config := DefaultConfig() config.Mode = ModeMultiRegion config.NodeID = "us-east-1" config.MultiRegion.RegionID = "us-east" config.MultiRegion.LocalCluster.Bootstrap = true config.MultiRegion.RemoteRegions = nil region, err := NewMultiRegionReplicator(config, storage) require.NoError(t, err) // Before start health := region.Health() assert.Equal(t, "initializing", health.State) assert.False(t, health.Healthy) assert.Equal(t, ModeMultiRegion, health.Mode) assert.Equal(t, "us-east", health.Region) ctx := context.Background() require.NoError(t, region.Start(ctx)) require.NoError(t, region.WaitForLeader(ctx)) // After start health = region.Health() assert.Equal(t, "ready", health.State) assert.True(t, health.Healthy) assert.True(t, health.IsLeader) assert.Equal(t, "us-east-1", health.NodeID) // After shutdown region.Shutdown() health = region.Health() assert.Equal(t, "closed", health.State) assert.False(t, health.Healthy) } func TestMultiRegion_ShutdownCleansUpConnections(t *testing.T) { // Test G: Shutdown properly closes all cross-region connections storage := NewMockStorage() config := DefaultConfig() config.Mode = ModeMultiRegion config.NodeID = "us-east-1" config.MultiRegion.RegionID = "us-east" config.MultiRegion.LocalCluster.Bootstrap = true config.MultiRegion.RemoteRegions = []RemoteRegionConfig{ {RegionID: "eu-west", Addrs: []string{"eu-west-1:7688"}, Priority: 1}, {RegionID: "ap-south", Addrs: []string{"ap-south-1:7688"}, Priority: 2}, } region, err := NewMultiRegionReplicator(config, storage) require.NoError(t, err) // Set up mock transport transport := NewMockTransport() region.SetTransport(transport) ctx := context.Background() require.NoError(t, region.Start(ctx)) require.NoError(t, region.WaitForLeader(ctx)) // Shutdown should close all connections region.Shutdown() // Health should show closed state health := region.Health() assert.Equal(t, "closed", health.State) } func TestMultiRegion_ApplyBeforeStart(t *testing.T) { // Test H: Apply should fail before Start storage := NewMockStorage() config := DefaultConfig() config.Mode = ModeMultiRegion config.NodeID = "us-east-1" config.MultiRegion.RegionID = "us-east" config.MultiRegion.LocalCluster.Bootstrap = true region, err := NewMultiRegionReplicator(config, storage) require.NoError(t, err) cmd := &Command{Type: CmdCreateNode, Data: []byte("test")} err = region.Apply(cmd, time.Second) assert.Equal(t, ErrNotReady, err) } func TestMultiRegion_ApplyAfterShutdown(t *testing.T) { // Test I: Apply should fail after Shutdown storage := NewMockStorage() config := DefaultConfig() config.Mode = ModeMultiRegion config.NodeID = "us-east-1" config.MultiRegion.RegionID = "us-east" config.MultiRegion.LocalCluster.Bootstrap = true region, err := NewMultiRegionReplicator(config, storage) require.NoError(t, err) ctx := context.Background() require.NoError(t, region.Start(ctx)) require.NoError(t, region.WaitForLeader(ctx)) region.Shutdown() cmd := &Command{Type: CmdCreateNode, Data: []byte("test")} err = region.Apply(cmd, time.Second) assert.Equal(t, ErrClosed, err) } func TestMultiRegion_ConcurrentApply(t *testing.T) { // Test J: Concurrent applies should work correctly storage := NewMockStorage() config := DefaultConfig() config.Mode = ModeMultiRegion config.NodeID = "us-east-1" config.MultiRegion.RegionID = "us-east" config.MultiRegion.LocalCluster.Bootstrap = true region, err := NewMultiRegionReplicator(config, storage) require.NoError(t, err) ctx := context.Background() require.NoError(t, region.Start(ctx)) require.NoError(t, region.WaitForLeader(ctx)) // Concurrent applies var wg sync.WaitGroup errChan := make(chan error, 100) for i := 0; i < 100; i++ { wg.Add(1) go func(idx int) { defer wg.Done() cmd := &Command{Type: CmdCreateNode, Data: []byte(fmt.Sprintf("node-%d", idx))} if err := region.Apply(cmd, time.Second); err != nil { errChan <- err } }(i) } wg.Wait() close(errChan) // Check for errors errCount := 0 for err := range errChan { t.Logf("Apply error: %v", err) errCount++ } assert.Equal(t, 0, errCount) // Verify all commands applied pos, err := storage.GetWALPosition() require.NoError(t, err) assert.Equal(t, uint64(100), pos) region.Shutdown() } // MockPeerConnection with configurable handlers for testing type MockPeerConnection struct { connected bool walBatchHandler func([]*WALEntry) (*WALBatchResponse, error) closeHandler func() error } func (m *MockPeerConnection) SendWALBatch(ctx context.Context, entries []*WALEntry) (*WALBatchResponse, error) { if m.walBatchHandler != nil { return m.walBatchHandler(entries) } return &WALBatchResponse{AckedPosition: entries[len(entries)-1].Position}, nil } func (m *MockPeerConnection) SendHeartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error) { return &HeartbeatResponse{NodeID: "mock", Role: "standby"}, nil } func (m *MockPeerConnection) SendFence(ctx context.Context, req *FenceRequest) (*FenceResponse, error) { return &FenceResponse{Fenced: true}, nil } func (m *MockPeerConnection) SendPromote(ctx context.Context, req *PromoteRequest) (*PromoteResponse, error) { return &PromoteResponse{Ready: true}, nil } func (m *MockPeerConnection) SendRaftVote(ctx context.Context, req *RaftVoteRequest) (*RaftVoteResponse, error) { return &RaftVoteResponse{VoteGranted: true, Term: req.Term}, nil } func (m *MockPeerConnection) SendRaftAppendEntries(ctx context.Context, req *RaftAppendEntriesRequest) (*RaftAppendEntriesResponse, error) { return &RaftAppendEntriesResponse{Success: true, Term: req.Term}, nil } func (m *MockPeerConnection) Close() error { if m.closeHandler != nil { return m.closeHandler() } m.connected = false return nil } func (m *MockPeerConnection) IsConnected() bool { return m.connected }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server