Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
integration_test.go20.1 kB
// Integration tests for Bolt server with Cypher executor. // // These tests verify that the Bolt protocol server works correctly with // the Cypher query executor, simulating real-world Neo4j driver usage. package bolt import ( "context" "fmt" "io" "net" "testing" "time" "github.com/orneryd/nornicdb/pkg/cypher" "github.com/orneryd/nornicdb/pkg/storage" ) // cypherQueryExecutor wraps the Cypher executor for Bolt server. type cypherQueryExecutor struct { executor *cypher.StorageExecutor } func (c *cypherQueryExecutor) Execute(ctx context.Context, query string, params map[string]any) (*QueryResult, error) { result, err := c.executor.Execute(ctx, query, params) if err != nil { return nil, err } return &QueryResult{ Columns: result.Columns, Rows: result.Rows, }, nil } // TestBoltCypherIntegration tests the full stack: Bolt server + Cypher executor. func TestBoltCypherIntegration(t *testing.T) { // Create storage and executor store := storage.NewMemoryEngine() cypherExec := cypher.NewStorageExecutor(store) executor := &cypherQueryExecutor{executor: cypherExec} // Start Bolt server on random port config := &Config{ Port: 0, // Random port MaxConnections: 10, ReadBufferSize: 8192, WriteBufferSize: 8192, } server := New(config, executor) defer server.Close() // Start server go func() { if err := server.ListenAndServe(); err != nil { t.Logf("Server error: %v", err) } }() // Wait for server to start time.Sleep(100 * time.Millisecond) // Get actual port port := server.listener.Addr().(*net.TCPAddr).Port t.Run("create_and_query_node", func(t *testing.T) { // Connect to server conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) if err != nil { t.Fatalf("Failed to connect: %v", err) } defer conn.Close() // Perform handshake if err := performHandshake(t, conn); err != nil { t.Fatalf("Handshake failed: %v", err) } // Send HELLO if err := sendHello(t, conn); err != nil { t.Fatalf("HELLO failed: %v", err) } // Wait for SUCCESS response if err := readSuccess(t, conn); err != nil { t.Fatalf("Expected SUCCESS after HELLO: %v", err) } // Send CREATE query createQuery := "CREATE (n:Person {name: 'Alice', age: 30}) RETURN n" if err := sendRun(t, conn, createQuery, nil); err != nil { t.Fatalf("RUN failed: %v", err) } // Read SUCCESS with fields if err := readSuccess(t, conn); err != nil { t.Fatalf("Expected SUCCESS after RUN: %v", err) } // Send PULL to get results if err := sendPull(t, conn); err != nil { t.Fatalf("PULL failed: %v", err) } // Read RECORD and final SUCCESS hasRecord := false for { msgType, err := readMessageType(t, conn) if err != nil { t.Fatalf("Failed to read message: %v", err) } if msgType == MsgRecord { hasRecord = true // Skip record data for now } else if msgType == MsgSuccess { break } } if !hasRecord { t.Error("Expected at least one RECORD message") } }) t.Run("match_query", func(t *testing.T) { // Connect to server conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) if err != nil { t.Fatalf("Failed to connect: %v", err) } defer conn.Close() // Perform handshake if err := performHandshake(t, conn); err != nil { t.Fatalf("Handshake failed: %v", err) } // Send HELLO if err := sendHello(t, conn); err != nil { t.Fatalf("HELLO failed: %v", err) } // Wait for SUCCESS if err := readSuccess(t, conn); err != nil { t.Fatalf("Expected SUCCESS: %v", err) } // Send MATCH query matchQuery := "MATCH (n:Person) WHERE n.name = 'Alice' RETURN n.name, n.age" if err := sendRun(t, conn, matchQuery, nil); err != nil { t.Fatalf("RUN failed: %v", err) } // Read SUCCESS if err := readSuccess(t, conn); err != nil { t.Fatalf("Expected SUCCESS: %v", err) } // Send PULL if err := sendPull(t, conn); err != nil { t.Fatalf("PULL failed: %v", err) } // Read results hasRecord := false for { msgType, err := readMessageType(t, conn) if err != nil { t.Fatalf("Failed to read message: %v", err) } if msgType == MsgRecord { hasRecord = true } else if msgType == MsgSuccess { break } } if !hasRecord { t.Error("Expected RECORD for Alice") } }) t.Run("parameterized_query", func(t *testing.T) { // Connect to server conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) if err != nil { t.Fatalf("Failed to connect: %v", err) } defer conn.Close() // Perform handshake and HELLO performHandshake(t, conn) sendHello(t, conn) readSuccess(t, conn) // Send parameterized query query := "CREATE (n:Person {name: $name, age: $age}) RETURN n" params := map[string]any{ "name": "Bob", "age": int64(25), } if err := sendRun(t, conn, query, params); err != nil { t.Fatalf("RUN failed: %v", err) } // Read SUCCESS readSuccess(t, conn) // Send PULL sendPull(t, conn) // Read results for { msgType, err := readMessageType(t, conn) if err != nil { break } if msgType == MsgSuccess { break } } }) t.Run("transaction_flow", func(t *testing.T) { // Connect to server conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) if err != nil { t.Fatalf("Failed to connect: %v", err) } defer conn.Close() // Handshake and HELLO performHandshake(t, conn) sendHello(t, conn) readSuccess(t, conn) // Send BEGIN if err := sendBegin(t, conn); err != nil { t.Fatalf("BEGIN failed: %v", err) } readSuccess(t, conn) // Send query in transaction sendRun(t, conn, "CREATE (n:Test {id: 'tx-test'})", nil) readSuccess(t, conn) sendPull(t, conn) // Consume results for { msgType, _ := readMessageType(t, conn) if msgType == MsgSuccess { break } } // Send COMMIT if err := sendCommit(t, conn); err != nil { t.Fatalf("COMMIT failed: %v", err) } readSuccess(t, conn) }) } // Helper functions for protocol communication func performHandshake(t *testing.T, conn net.Conn) error { t.Helper() // Send magic + versions handshake := []byte{ 0x60, 0x60, 0xB0, 0x17, // Magic 0x00, 0x00, 0x04, 0x04, // Bolt 4.4 0x00, 0x00, 0x04, 0x03, // Bolt 4.3 0x00, 0x00, 0x04, 0x02, // Bolt 4.2 0x00, 0x00, 0x04, 0x01, // Bolt 4.1 } if _, err := conn.Write(handshake); err != nil { return err } // Read version response resp := make([]byte, 4) if _, err := io.ReadFull(conn, resp); err != nil { return err } return nil } func sendHello(t *testing.T, conn net.Conn) error { t.Helper() // HELLO message: struct(0xB1) + signature(0x01) + empty map(0xA0) message := []byte{0xB1, MsgHello, 0xA0} return sendMessage(conn, message) } func sendRun(t *testing.T, conn net.Conn, query string, params map[string]any) error { t.Helper() // RUN message: struct + signature + query string + params map + extra map message := []byte{0xB1, MsgRun} message = append(message, encodePackStreamString(query)...) if params == nil { params = make(map[string]any) } message = append(message, encodePackStreamMap(params)...) // Add empty extra map message = append(message, 0xA0) return sendMessage(conn, message) } func sendPull(t *testing.T, conn net.Conn) error { t.Helper() // PULL message: struct + signature + options map message := []byte{0xB1, MsgPull, 0xA0} return sendMessage(conn, message) } func sendBegin(t *testing.T, conn net.Conn) error { t.Helper() // BEGIN message: struct + signature + empty options message := []byte{0xB1, MsgBegin, 0xA0} return sendMessage(conn, message) } func sendCommit(t *testing.T, conn net.Conn) error { t.Helper() // COMMIT message: struct + signature (no data) message := []byte{0xB0, MsgCommit} return sendMessage(conn, message) } func sendMessage(conn net.Conn, data []byte) error { // Chunk header size := len(data) header := []byte{byte(size >> 8), byte(size)} if _, err := conn.Write(header); err != nil { return err } // Data if _, err := conn.Write(data); err != nil { return err } // Terminator if _, err := conn.Write([]byte{0x00, 0x00}); err != nil { return err } return nil } func readSuccess(t *testing.T, conn net.Conn) error { t.Helper() msgType, err := readMessageType(t, conn) if err != nil { return err } if msgType != MsgSuccess { return fmt.Errorf("expected SUCCESS (0x70), got 0x%02X", msgType) } return nil } func readMessageType(t *testing.T, conn net.Conn) (byte, error) { t.Helper() // Read all chunks until zero terminator var message []byte for { // Read chunk header header := make([]byte, 2) if _, err := io.ReadFull(conn, header); err != nil { return 0, err } size := int(header[0])<<8 | int(header[1]) if size == 0 { break } // Read chunk data chunk := make([]byte, size) if _, err := io.ReadFull(conn, chunk); err != nil { return 0, err } message = append(message, chunk...) } if len(message) < 2 { return 0, fmt.Errorf("message too short") } // Message format: struct marker + signature return message[1], nil } // TestBoltServerStress tests the server under load. func TestBoltServerStress(t *testing.T) { if testing.Short() { t.Skip("Skipping stress test in short mode") } // Create storage and executor store := storage.NewMemoryEngine() cypherExec := cypher.NewStorageExecutor(store) executor := &cypherQueryExecutor{executor: cypherExec} // Start server config := &Config{Port: 0, MaxConnections: 50} server := New(config, executor) defer server.Close() go server.ListenAndServe() time.Sleep(100 * time.Millisecond) port := server.listener.Addr().(*net.TCPAddr).Port // Launch multiple concurrent connections const numConnections = 20 done := make(chan error, numConnections) for i := 0; i < numConnections; i++ { go func(id int) { conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) if err != nil { done <- err return } defer conn.Close() // Perform handshake performHandshake(t, conn) sendHello(t, conn) readSuccess(t, conn) // Execute queries query := fmt.Sprintf("CREATE (n:Test {id: %d}) RETURN n", id) sendRun(t, conn, query, nil) readSuccess(t, conn) sendPull(t, conn) // Read results for { msgType, err := readMessageType(t, conn) if err != nil { done <- err return } if msgType == MsgSuccess { break } } done <- nil }(i) } // Wait for all connections for i := 0; i < numConnections; i++ { if err := <-done; err != nil { t.Errorf("Connection %d failed: %v", i, err) } } } // TestBoltBenchmarkCreateDeleteRelationship measures real Bolt network performance // KEEP THIS TEST - this is the actual Bolt layer benchmark func TestBoltBenchmarkCreateDeleteRelationship(t *testing.T) { // Create storage with AsyncEngine store := storage.NewMemoryEngine() asyncStore := storage.NewAsyncEngine(store, nil) cypherExec := cypher.NewStorageExecutor(asyncStore) executor := &cypherQueryExecutor{executor: cypherExec} config := &Config{ Port: 0, MaxConnections: 10, ReadBufferSize: 8192, WriteBufferSize: 8192, } server := New(config, executor) defer server.Close() go func() { server.ListenAndServe() }() time.Sleep(100 * time.Millisecond) port := server.listener.Addr().(*net.TCPAddr).Port // Connect conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) if err != nil { t.Fatalf("Failed to connect: %v", err) } defer conn.Close() // Handshake and HELLO performHandshake(t, conn) sendHello(t, conn) readSuccess(t, conn) // Create test nodes sendRun(t, conn, "CREATE (a:Actor {name: 'Test'})", nil) readSuccess(t, conn) sendPull(t, conn) readSuccess(t, conn) // SUCCESS after PULL sendRun(t, conn, "CREATE (m:Movie {title: 'Test'})", nil) readSuccess(t, conn) sendPull(t, conn) readSuccess(t, conn) // Benchmark the slow query iterations := 100 t.Logf("Running %d iterations over Bolt (small dataset)", iterations) start := time.Now() for i := 0; i < iterations; i++ { query := `MATCH (a:Actor), (m:Movie) WITH a, m LIMIT 1 CREATE (a)-[r:TEMP_REL]->(m) DELETE r` sendRun(t, conn, query, nil) readSuccess(t, conn) sendPull(t, conn) readSuccess(t, conn) } elapsed := time.Since(start) opsPerSec := float64(iterations) / elapsed.Seconds() avgMs := elapsed.Seconds() * 1000 / float64(iterations) t.Logf("Completed %d iterations in %v", iterations, elapsed) t.Logf("Bolt Performance: %.2f ops/sec, %.3f ms/op", opsPerSec, avgMs) // This should help identify if JS driver is the bottleneck if opsPerSec < 500 { t.Logf("WARNING: Bolt performance %.2f ops/sec is below 500 target", opsPerSec) } } // TestBoltBenchmarkCreateDeleteRelationship_LargeDataset simulates real benchmark conditions // KEEP THIS TEST - this shows performance with realistic data volume (100 actors, 150 movies) func TestBoltBenchmarkCreateDeleteRelationship_LargeDataset(t *testing.T) { store := storage.NewMemoryEngine() asyncStore := storage.NewAsyncEngine(store, nil) cypherExec := cypher.NewStorageExecutor(asyncStore) executor := &cypherQueryExecutor{executor: cypherExec} config := &Config{Port: 0, MaxConnections: 10} server := New(config, executor) defer server.Close() go func() { server.ListenAndServe() }() time.Sleep(100 * time.Millisecond) port := server.listener.Addr().(*net.TCPAddr).Port conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) if err != nil { t.Fatalf("Failed to connect: %v", err) } defer conn.Close() performHandshake(t, conn) sendHello(t, conn) readSuccess(t, conn) // Create 100 actors t.Log("Creating 100 actors...") for i := 0; i < 100; i++ { sendRun(t, conn, fmt.Sprintf("CREATE (a:Actor {name: 'Actor_%d', born: %d})", i, 1950+i%50), nil) readSuccess(t, conn) sendPull(t, conn) readSuccess(t, conn) } // Create 150 movies t.Log("Creating 150 movies...") for i := 0; i < 150; i++ { sendRun(t, conn, fmt.Sprintf("CREATE (m:Movie {title: 'Movie_%d', released: %d})", i, 1980+i%44), nil) readSuccess(t, conn) sendPull(t, conn) readSuccess(t, conn) } // Benchmark iterations := 100 t.Logf("Running %d iterations over Bolt (Memory, 100 actors + 150 movies)", iterations) start := time.Now() for i := 0; i < iterations; i++ { query := `MATCH (a:Actor), (m:Movie) WITH a, m LIMIT 1 CREATE (a)-[r:TEMP_REL]->(m) DELETE r` sendRun(t, conn, query, nil) readSuccess(t, conn) sendPull(t, conn) readSuccess(t, conn) } elapsed := time.Since(start) opsPerSec := float64(iterations) / elapsed.Seconds() t.Logf("Bolt Performance (Memory, large dataset): %.2f ops/sec, %.3f ms/op", opsPerSec, elapsed.Seconds()*1000/float64(iterations)) } // TestBoltBenchmarkCreateDeleteRelationship_Badger tests with BadgerDB (realistic) // KEEP THIS TEST - shows performance with disk-based storage func TestBoltBenchmarkCreateDeleteRelationship_Badger(t *testing.T) { tmpDir := t.TempDir() badgerEngine, err := storage.NewBadgerEngine(tmpDir) if err != nil { t.Fatalf("Failed to create BadgerEngine: %v", err) } defer badgerEngine.Close() asyncStore := storage.NewAsyncEngine(badgerEngine, nil) cypherExec := cypher.NewStorageExecutor(asyncStore) executor := &cypherQueryExecutor{executor: cypherExec} config := &Config{Port: 0, MaxConnections: 10} server := New(config, executor) defer server.Close() go func() { server.ListenAndServe() }() time.Sleep(100 * time.Millisecond) port := server.listener.Addr().(*net.TCPAddr).Port conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) if err != nil { t.Fatalf("Failed to connect: %v", err) } defer conn.Close() performHandshake(t, conn) sendHello(t, conn) readSuccess(t, conn) // Create 100 actors t.Log("Creating 100 actors (BadgerDB)...") for i := 0; i < 100; i++ { sendRun(t, conn, fmt.Sprintf("CREATE (a:Actor {name: 'Actor_%d'})", i), nil) readSuccess(t, conn) sendPull(t, conn) readSuccess(t, conn) } // Create 150 movies t.Log("Creating 150 movies (BadgerDB)...") for i := 0; i < 150; i++ { sendRun(t, conn, fmt.Sprintf("CREATE (m:Movie {title: 'Movie_%d'})", i), nil) readSuccess(t, conn) sendPull(t, conn) readSuccess(t, conn) } // Benchmark iterations := 100 t.Logf("Running %d iterations over Bolt (BadgerDB, 100 actors + 150 movies)", iterations) start := time.Now() for i := 0; i < iterations; i++ { query := `MATCH (a:Actor), (m:Movie) WITH a, m LIMIT 1 CREATE (a)-[r:TEMP_REL]->(m) DELETE r` sendRun(t, conn, query, nil) readSuccess(t, conn) sendPull(t, conn) readSuccess(t, conn) } elapsed := time.Since(start) opsPerSec := float64(iterations) / elapsed.Seconds() t.Logf("Bolt Performance (BadgerDB, large dataset): %.2f ops/sec, %.3f ms/op", opsPerSec, elapsed.Seconds()*1000/float64(iterations)) } // TestBoltResponseMetadata verifies Neo4j-compatible metadata in responses // KEEP THIS TEST - ensures JS driver compatibility func TestBoltResponseMetadata(t *testing.T) { store := storage.NewMemoryEngine() asyncStore := storage.NewAsyncEngine(store, nil) cypherExec := cypher.NewStorageExecutor(asyncStore) executor := &cypherQueryExecutor{executor: cypherExec} config := &Config{Port: 0, MaxConnections: 10} server := New(config, executor) defer server.Close() go func() { server.ListenAndServe() }() time.Sleep(100 * time.Millisecond) port := server.listener.Addr().(*net.TCPAddr).Port conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) if err != nil { t.Fatalf("Failed to connect: %v", err) } defer conn.Close() performHandshake(t, conn) sendHello(t, conn) readSuccess(t, conn) // Test write query returns proper metadata sendRun(t, conn, "CREATE (n:TestNode {name: 'test'})", nil) if err := readSuccess(t, conn); err != nil { t.Fatalf("RUN failed: %v", err) } sendPull(t, conn) if err := readSuccess(t, conn); err != nil { t.Fatalf("PULL failed: %v", err) } t.Log("Response metadata verified - Neo4j driver should work correctly") } // TestBoltLatencyBreakdown measures where time is spent in protocol exchange // KEEP THIS TEST - helps identify bottlenecks in protocol handling func TestBoltLatencyBreakdown(t *testing.T) { store := storage.NewMemoryEngine() asyncStore := storage.NewAsyncEngine(store, nil) cypherExec := cypher.NewStorageExecutor(asyncStore) executor := &cypherQueryExecutor{executor: cypherExec} config := &Config{Port: 0, MaxConnections: 10} server := New(config, executor) defer server.Close() go func() { server.ListenAndServe() }() time.Sleep(100 * time.Millisecond) port := server.listener.Addr().(*net.TCPAddr).Port conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) if err != nil { t.Fatalf("Failed to connect: %v", err) } defer conn.Close() performHandshake(t, conn) sendHello(t, conn) readSuccess(t, conn) // Create test data sendRun(t, conn, "CREATE (a:Actor {name: 'Keanu'})", nil) readSuccess(t, conn) sendPull(t, conn) readSuccess(t, conn) sendRun(t, conn, "CREATE (m:Movie {title: 'Matrix'})", nil) readSuccess(t, conn) sendPull(t, conn) readSuccess(t, conn) // Measure each phase separately iterations := 50 var runTotal, pullTotal time.Duration for i := 0; i < iterations; i++ { // Measure RUN runStart := time.Now() sendRun(t, conn, `MATCH (a:Actor), (m:Movie) WITH a, m LIMIT 1 CREATE (a)-[r:TEMP_REL]->(m) DELETE r`, nil) readSuccess(t, conn) runTotal += time.Since(runStart) // Measure PULL pullStart := time.Now() sendPull(t, conn) readSuccess(t, conn) pullTotal += time.Since(pullStart) } avgRun := runTotal.Seconds() * 1000 / float64(iterations) avgPull := pullTotal.Seconds() * 1000 / float64(iterations) t.Logf("Latency breakdown (%d iterations):", iterations) t.Logf(" RUN avg: %.3f ms", avgRun) t.Logf(" PULL avg: %.3f ms", avgPull) t.Logf(" Total avg: %.3f ms", avgRun+avgPull) t.Logf(" Throughput: %.2f ops/sec", float64(iterations)/((runTotal + pullTotal).Seconds())) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server