// Package bolt implements the Neo4j Bolt protocol server for NornicDB.
//
// This package provides a Bolt protocol server that allows existing Neo4j drivers
// and tools to connect to NornicDB without modification. The server implements
// Bolt 4.x protocol specifications for maximum compatibility.
//
// Neo4j Bolt Protocol Compatibility:
// - Bolt 4.0, 4.1, 4.2, 4.3, 4.4 support
// - PackStream serialization format
// - Transaction management (BEGIN, COMMIT, ROLLBACK)
// - Streaming result sets (RUN, PULL, DISCARD)
// - Authentication handshake
// - Connection pooling support
//
// Supported Neo4j Drivers:
// - Neo4j Java Driver
// - Neo4j Python Driver (neo4j-driver)
// - Neo4j JavaScript Driver
// - Neo4j .NET Driver
// - Neo4j Go Driver
// - Community drivers (Rust, Ruby, etc.)
//
// Example Usage:
//
// // Create Bolt server with Cypher executor
// config := bolt.DefaultConfig()
// config.Port = 7687
// config.MaxConnections = 100
//
// // Implement query executor
// executor := &MyQueryExecutor{db: nornicDB}
//
// server := bolt.New(config, executor)
//
// // Start server
// if err := server.ListenAndServe(); err != nil {
// log.Fatal(err)
// }
//
// // Server is now accepting Bolt connections on port 7687
//
// Client Usage (any Neo4j driver):
//
// // Python example
// from neo4j import GraphDatabase
//
// driver = GraphDatabase.driver("bolt://localhost:7687")
// with driver.session() as session:
// result = session.run("MATCH (n) RETURN count(n)")
// print(result.single()[0])
//
// // Go example
// driver, _ := neo4j.NewDriver("bolt://localhost:7687", neo4j.NoAuth())
// session := driver.NewSession(neo4j.SessionConfig{})
// result, _ := session.Run("MATCH (n) RETURN count(n)", nil)
//
// Protocol Flow:
//
// 1. **Handshake**:
// - Client sends magic number (0x6060B017)
// - Client sends supported versions
// - Server responds with selected version
//
// 2. **Authentication**:
// - Client sends HELLO message with credentials
// - Server responds with SUCCESS or FAILURE
//
// 3. **Query Execution**:
// - Client sends RUN message with Cypher query
// - Server responds with SUCCESS (field names)
// - Client sends PULL to stream results
// - Server sends RECORD messages + final SUCCESS
//
// 4. **Transaction Management**:
// - BEGIN: Start explicit transaction
// - COMMIT: Commit transaction
// - ROLLBACK: Rollback transaction
//
// Message Types:
// - HELLO: Authentication
// - RUN: Execute Cypher query
// - PULL: Stream result records
// - DISCARD: Discard remaining results
// - BEGIN/COMMIT/ROLLBACK: Transaction control
// - RESET: Reset session state
// - GOODBYE: Close connection
//
// PackStream Encoding:
//
// The Bolt protocol uses PackStream for efficient binary serialization:
// - Compact representation of common types
// - Support for nested structures
// - Streaming-friendly format
//
// Performance:
// - Binary protocol (faster than HTTP/JSON)
// - Connection pooling and reuse
// - Streaming results (low memory usage)
// - Pipelining support
//
// ELI12 (Explain Like I'm 12):
//
// Think of the Bolt server like a translator at the United Nations:
//
// 1. **Different languages**: Neo4j drivers speak "Bolt language" but NornicDB
// speaks "NornicDB language". The Bolt server translates between them.
//
// 2. **Same conversation**: The drivers can have the same conversation they
// always had (asking questions in Cypher), they just don't know they're
// talking to a different database!
//
// 3. **Binary messages**: Instead of sending text messages (like HTTP), Bolt
// sends compact binary messages - like sending a compressed file instead
// of a text document. Much faster!
//
// 4. **Streaming**: Instead of waiting for ALL results before sending anything,
// Bolt can send results one-by-one as they're found, like a live news feed.
//
// This lets existing Neo4j tools work with NornicDB without any changes!
package bolt
import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"
"math"
"net"
"strings"
"sync"
"sync/atomic"
)
// Protocol versions supported
const (
BoltV4_4 = 0x0404 // Bolt 4.4
BoltV4_3 = 0x0403 // Bolt 4.3
BoltV4_2 = 0x0402 // Bolt 4.2
BoltV4_1 = 0x0401 // Bolt 4.1
BoltV4_0 = 0x0400 // Bolt 4.0
)
// Message types
const (
MsgHello byte = 0x01
MsgGoodbye byte = 0x02
MsgReset byte = 0x0F
MsgRun byte = 0x10
MsgDiscard byte = 0x2F
MsgPull byte = 0x3F
MsgBegin byte = 0x11
MsgCommit byte = 0x12
MsgRollback byte = 0x13
MsgRoute byte = 0x66
// Response messages
MsgSuccess byte = 0x70
MsgRecord byte = 0x71
MsgIgnored byte = 0x7E
MsgFailure byte = 0x7F
)
// Buffer pool for record serialization (reduces allocations for large result sets)
var recordBufferPool = sync.Pool{
New: func() any {
// Pre-allocate 4KB buffer for typical records
buf := make([]byte, 0, 4096)
return &buf
},
}
// Server implements a Neo4j Bolt protocol server for NornicDB.
//
// The server handles multiple concurrent client connections, each running
// in its own goroutine. It manages the Bolt protocol handshake, authentication,
// and message routing to the configured query executor.
//
// Example:
//
// config := bolt.DefaultConfig()
// executor := &MyExecutor{} // Implements QueryExecutor
// server := bolt.New(config, executor)
//
// go func() {
// if err := server.ListenAndServe(); err != nil {
// log.Printf("Bolt server error: %v", err)
// }
// }()
//
// // Server is now accepting connections
// fmt.Printf("Bolt server listening on bolt://localhost:%d\n", config.Port)
//
// Thread Safety:
//
// The server is thread-safe and handles concurrent connections safely.
type Server struct {
config *Config
listener net.Listener
mu sync.RWMutex
sessions map[string]*Session
closed atomic.Bool
// Query executor (injected dependency)
executor QueryExecutor
}
// QueryExecutor executes Cypher queries for the Bolt server.
//
// This interface allows the Bolt server to be decoupled from the specific
// database implementation. The executor receives Cypher queries and parameters
// from Bolt clients and returns results in a standard format.
//
// Example Implementation:
//
// type MyExecutor struct {
// db *nornicdb.DB
// }
//
// func (e *MyExecutor) Execute(ctx context.Context, query string, params map[string]any) (*QueryResult, error) {
// // Execute query against NornicDB
// result, err := e.db.ExecuteCypher(ctx, query, params)
// if err != nil {
// return nil, err
// }
//
// // Convert to Bolt format
// return &QueryResult{
// Columns: result.Columns,
// Rows: result.Rows,
// }, nil
// }
//
// The executor should handle:
// - Cypher query parsing and execution
// - Parameter substitution
// - Result formatting
// - Error handling and reporting
type QueryExecutor interface {
Execute(ctx context.Context, query string, params map[string]any) (*QueryResult, error)
}
// TransactionalExecutor extends QueryExecutor with transaction support.
//
// If the executor implements this interface, the Bolt server will use
// real transactions for BEGIN/COMMIT/ROLLBACK messages. Otherwise,
// transaction messages are acknowledged but operations are auto-committed.
//
// Example Implementation:
//
// type TxExecutor struct {
// db *nornicdb.DB
// tx *storage.Transaction // Active transaction (nil if none)
// }
//
// func (e *TxExecutor) BeginTransaction(ctx context.Context) error {
// e.tx = storage.NewTransaction(e.db.Engine())
// return nil
// }
//
// func (e *TxExecutor) CommitTransaction(ctx context.Context) error {
// if e.tx == nil {
// return nil
// }
// err := e.tx.Commit()
// e.tx = nil
// return err
// }
//
// func (e *TxExecutor) RollbackTransaction(ctx context.Context) error {
// if e.tx == nil {
// return nil
// }
// err := e.tx.Rollback()
// e.tx = nil
// return err
// }
type TransactionalExecutor interface {
QueryExecutor
BeginTransaction(ctx context.Context, metadata map[string]any) error
CommitTransaction(ctx context.Context) error
RollbackTransaction(ctx context.Context) error
}
// FlushableExecutor extends QueryExecutor with deferred commit support.
// This enables Neo4j-style optimization where writes are buffered until PULL.
type FlushableExecutor interface {
QueryExecutor
// Flush persists all pending writes to storage.
Flush() error
}
// DeferrableExecutor extends FlushableExecutor with deferred flush mode control.
type DeferrableExecutor interface {
FlushableExecutor
// SetDeferFlush enables/disables deferred flush mode.
SetDeferFlush(enabled bool)
}
// QueryResult holds the result of a query.
type QueryResult struct {
Columns []string
Rows [][]any
}
// BoltAuthenticator is the interface for authenticating Bolt protocol connections.
// This supports Neo4j-compatible authentication schemes:
// - "basic": Username/password authentication
// - "bearer": JWT token authentication (for cluster inter-node auth)
// - "none": Anonymous access (if allowed)
//
// The Bolt protocol HELLO message contains authentication credentials:
// - scheme: "basic", "bearer", or "none"
// - principal: username (basic) or empty (bearer/none)
// - credentials: password (basic) or JWT token (bearer)
//
// For cluster deployments, use "bearer" scheme with a shared JWT secret:
//
// # Generate cluster token on any node:
// curl -X POST http://node1:7474/api/v1/auth/cluster-token \
// -H "Authorization: Bearer $ADMIN_TOKEN" \
// -d '{"node_id": "node-2", "role": "admin"}'
//
// # Use token to connect from other nodes:
// driver = GraphDatabase.driver("bolt://node1:7687",
// auth=("", token)) # scheme=bearer when principal is empty
//
// Example Implementation:
//
// type MyAuthenticator struct {
// auth *auth.Authenticator
// }
//
// func (a *MyAuthenticator) Authenticate(scheme, principal, credentials string) (*BoltAuthResult, error) {
// switch scheme {
// case "none":
// if a.allowAnonymous {
// return &BoltAuthResult{Authenticated: true, Roles: []string{"viewer"}}, nil
// }
// return nil, fmt.Errorf("anonymous auth not allowed")
// case "bearer":
// claims, err := a.auth.ValidateToken(credentials)
// if err != nil {
// return nil, err
// }
// return &BoltAuthResult{Authenticated: true, Username: claims.Username, Roles: claims.Roles}, nil
// case "basic":
// // ... username/password validation
// }
// }
type BoltAuthenticator interface {
// Authenticate validates credentials from the Bolt HELLO message.
// Returns auth result on success, error on failure.
// scheme: "basic", "bearer", or "none"
// principal: username (basic), empty (bearer/none)
// credentials: password (basic), JWT token (bearer), empty (none)
Authenticate(scheme, principal, credentials string) (*BoltAuthResult, error)
}
// BoltAuthResult contains the result of Bolt authentication.
type BoltAuthResult struct {
Authenticated bool // Whether authentication succeeded
Username string // Authenticated username
Roles []string // User roles (admin, editor, viewer, etc.)
}
// HasRole checks if the auth result has a specific role.
func (r *BoltAuthResult) HasRole(role string) bool {
for _, r2 := range r.Roles {
if r2 == role {
return true
}
}
return false
}
// HasPermission checks if the auth result has a specific permission based on roles.
// Maps to standard RBAC permissions:
// - admin: read, write, create, delete, admin, schema, user_manage
// - editor: read, write, create, delete
// - viewer: read
func (r *BoltAuthResult) HasPermission(perm string) bool {
rolePerms := map[string][]string{
"admin": {"read", "write", "create", "delete", "admin", "schema", "user_manage"},
"editor": {"read", "write", "create", "delete"},
"viewer": {"read"},
}
for _, role := range r.Roles {
if perms, ok := rolePerms[role]; ok {
for _, p := range perms {
if p == perm {
return true
}
}
}
}
return false
}
// Config holds Bolt protocol server configuration.
//
// All settings have sensible defaults via DefaultConfig(). The configuration
// follows Neo4j Bolt server conventions where applicable.
//
// Authentication:
// - Set Authenticator to enable auth (nil = no auth, accepts all)
// - RequireAuth: if true, connections without valid credentials are rejected
// - AllowAnonymous: if true, "none" auth scheme is accepted (viewer role)
//
// Example:
//
// // Production configuration with auth
// config := &bolt.Config{
// Port: 7687, // Standard Bolt port
// MaxConnections: 1000, // High concurrency
// ReadBufferSize: 32768, // 32KB read buffer
// WriteBufferSize: 32768, // 32KB write buffer
// Authenticator: myAuth,
// RequireAuth: true,
// }
//
// // Development configuration (no auth)
// config = bolt.DefaultConfig()
// config.Port = 7688 // Use different port
type Config struct {
Port int
MaxConnections int
ReadBufferSize int
WriteBufferSize int
LogQueries bool // Log all queries to stdout (for debugging)
// Authentication
Authenticator BoltAuthenticator // Authentication handler (nil = no auth)
RequireAuth bool // Require authentication for all connections
AllowAnonymous bool // Allow "none" auth scheme (grants viewer role)
}
// DefaultConfig returns Neo4j-compatible default Bolt server configuration.
//
// Defaults match Neo4j Bolt server settings:
// - Port 7687 (standard Bolt port)
// - 100 max concurrent connections
// - 8KB read/write buffers
//
// Example:
//
// config := bolt.DefaultConfig()
// server := bolt.New(config, executor)
func DefaultConfig() *Config {
return &Config{
Port: 7687,
MaxConnections: 100,
ReadBufferSize: 8192,
WriteBufferSize: 8192,
}
}
// New creates a new Bolt protocol server with the given configuration and executor.
//
// Parameters:
// - config: Server configuration (uses DefaultConfig() if nil)
// - executor: Query executor for handling Cypher queries (required)
//
// Returns:
// - Server instance ready to start
//
// Example:
//
// config := bolt.DefaultConfig()
// executor := &MyQueryExecutor{db: nornicDB}
// server := bolt.New(config, executor)
//
// // Start server
// if err := server.ListenAndServe(); err != nil {
// log.Fatal(err)
// }
//
// Example 1 - Basic Setup with Cypher Executor:
//
// // Create storage engine
// storage := storage.NewBadgerEngine("./data/nornicdb")
// defer storage.Close()
//
// // Create Cypher executor
// cypherExec := cypher.NewStorageExecutor(storage)
//
// // Create Bolt server
// config := bolt.DefaultConfig()
// config.Port = 7687
//
// server := bolt.New(config, cypherExec)
//
// // Start server (blocks until shutdown)
// log.Fatal(server.ListenAndServe())
//
// Example 2 - Production with Connection Limits:
//
// config := bolt.DefaultConfig()
// config.Port = 7687
// config.MaxConnections = 500 // Handle 500 concurrent clients
// config.ReadBufferSize = 8192 // 8KB buffer
// config.WriteBufferSize = 8192
// config.IdleTimeout = 10 * time.Minute
//
// executor := cypher.NewStorageExecutor(storage)
// server := bolt.New(config, executor)
//
// // Graceful shutdown
// go func() {
// sigChan := make(chan os.Signal, 1)
// signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
// <-sigChan
// log.Println("Shutting down Bolt server...")
// server.Close()
// }()
//
// if err := server.ListenAndServe(); err != nil {
// log.Fatal(err)
// }
//
// Example 3 - Custom Query Executor with Middleware:
//
// // Create custom executor with auth and logging
// type AuthExecutor struct {
// inner cypher.Executor
// auth *auth.Authenticator
// audit *audit.Logger
// }
//
// func (e *AuthExecutor) Execute(ctx context.Context, query string, params map[string]any) (*bolt.QueryResult, error) {
// // Extract user from context
// user := ctx.Value("user").(string)
//
// // Audit log
// e.audit.LogDataAccess(user, user, "query", query, "EXECUTE", true, "")
//
// // Execute query
// result, err := e.inner.Execute(ctx, query, params)
//
// // Convert to Bolt result format
// return &bolt.QueryResult{
// Columns: result.Fields,
// Rows: result.Records,
// }, err
// }
//
// executor := &AuthExecutor{
// inner: cypher.NewStorageExecutor(storage),
// auth: authenticator,
// audit: auditLogger,
// }
//
// server := bolt.New(bolt.DefaultConfig(), executor)
// server.ListenAndServe()
//
// Example 4 - Testing with In-Memory Storage:
//
// func TestMyBoltIntegration(t *testing.T) {
// // In-memory storage for tests
// storage := storage.NewMemoryEngine()
// executor := cypher.NewStorageExecutor(storage)
//
// // Bolt server on random port
// config := bolt.DefaultConfig()
// config.Port = 0 // OS assigns random available port
//
// server := bolt.New(config, executor)
//
// // Start server in background
// go server.ListenAndServe()
// defer server.Close()
//
// // Connect with Neo4j driver
// driver, _ := neo4j.NewDriver(
// fmt.Sprintf("bolt://localhost:%d", server.Port()),
// neo4j.NoAuth(),
// )
// defer driver.Close()
//
// // Run test queries
// session := driver.NewSession(neo4j.SessionConfig{})
// result, _ := session.Run("CREATE (n:Test {value: 42}) RETURN n", nil)
// // ... assertions ...
// }
//
// ELI12:
//
// Think of the Bolt server like a translator at the UN:
//
// - Neo4j drivers speak "Bolt language" (binary protocol)
// - NornicDB speaks "Cypher language" (graph queries)
// - The Bolt server translates between them!
//
// Why do we need this translator?
// 1. Neo4j drivers already exist (Python, Java, JavaScript, Go, etc.)
// 2. Tools like Neo4j Browser, Bloom, and Cypher Shell work out of the box
// 3. No need to write new drivers for every programming language
//
// How it works:
// 1. Driver connects: "Hi, I speak Bolt 4.3"
// 2. Server responds: "Cool, I understand Bolt 4.3"
// 3. Driver sends: "RUN: MATCH (n) RETURN n LIMIT 10"
// 4. Server executes Cypher and sends back results
// 5. Driver receives results in Bolt format
//
// Real-world analogy:
// - HTTP is like writing letters (text-based, verbose)
// - Bolt is like speaking on the phone (binary, efficient)
// - Bolt is ~3-5x faster than HTTP for graph queries!
//
// Compatible Tools:
// - Neo4j Browser (web UI)
// - Neo4j Desktop
// - Cypher Shell (CLI)
// - Neo4j Bloom (graph visualization)
// - Any app using Neo4j drivers
//
// Protocol Advantages:
// - Binary format (smaller, faster)
// - Connection pooling (reuse connections)
// - Streaming results (low memory)
// - Transaction support (BEGIN/COMMIT/ROLLBACK)
// - Pipelining (send multiple queries without waiting)
//
// Performance:
// - Handles 100-500 concurrent connections easily
// - ~1ms overhead per query
// - Streaming results use O(1) memory per connection
// - Binary PackStream is ~40% smaller than JSON
//
// Thread Safety:
//
// Server handles concurrent connections safely.
func New(config *Config, executor QueryExecutor) *Server {
if config == nil {
config = DefaultConfig()
}
return &Server{
config: config,
sessions: make(map[string]*Session),
executor: executor,
}
}
// ListenAndServe starts the Bolt server and begins accepting connections.
//
// The server listens on the configured port and handles incoming Bolt
// connections. Each connection is handled in a separate goroutine.
//
// Returns:
// - nil if server shuts down cleanly
// - Error if failed to bind to port or other startup error
//
// Example:
//
// server := bolt.New(config, executor)
//
// // Start server (blocks until shutdown)
// if err := server.ListenAndServe(); err != nil {
// log.Fatalf("Bolt server failed: %v", err)
// }
//
// The server will print its listening address when started successfully.
func (s *Server) ListenAndServe() error {
addr := fmt.Sprintf(":%d", s.config.Port)
listener, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", addr, err)
}
s.listener = listener
fmt.Printf("Bolt server listening on bolt://localhost:%d\n", s.config.Port)
return s.serve()
}
// serve accepts connections in a loop.
func (s *Server) serve() error {
for {
if s.closed.Load() {
return nil
}
conn, err := s.listener.Accept()
if err != nil {
if s.closed.Load() {
return nil // Clean shutdown
}
continue
}
go s.handleConnection(conn)
}
}
// Close stops the Bolt server.
func (s *Server) Close() error {
s.closed.Store(true)
if s.listener != nil {
return s.listener.Close()
}
return nil
}
// IsClosed returns whether the server is closed.
func (s *Server) IsClosed() bool {
return s.closed.Load()
}
// handleConnection handles a single client connection.
func (s *Server) handleConnection(conn net.Conn) {
defer conn.Close()
// Disable Nagle's algorithm for lower latency
// Without this, small packets get delayed up to 40ms
if tcpConn, ok := conn.(*net.TCPConn); ok {
tcpConn.SetNoDelay(true)
}
// Recover from panics to prevent crashing the server
defer func() {
if r := recover(); r != nil {
fmt.Printf("Recovered from panic in connection handler: %v\n", r)
}
}()
session := &Session{
conn: conn,
reader: bufio.NewReaderSize(conn, 8192), // 8KB read buffer
writer: bufio.NewWriterSize(conn, 8192), // 8KB write buffer
server: s,
executor: s.executor,
messageBuf: make([]byte, 0, 4096), // Pre-allocate 4KB message buffer
}
// Enable deferred flush mode for Neo4j-style write batching
if deferrable, ok := s.executor.(DeferrableExecutor); ok {
deferrable.SetDeferFlush(true)
}
// Ensure cleanup on session end
defer func() {
// Flush any pending writes
if flushable, ok := s.executor.(FlushableExecutor); ok {
flushable.Flush()
}
// Disable deferred flush mode
if deferrable, ok := s.executor.(DeferrableExecutor); ok {
deferrable.SetDeferFlush(false)
}
}()
// Perform handshake
if err := session.handshake(); err != nil {
fmt.Printf("Handshake failed: %v\n", err)
return
}
// Handle messages synchronously (simpler, lower overhead for request-response)
for {
if s.closed.Load() {
return
}
if err := session.handleMessage(); err != nil {
if err == io.EOF {
return
}
errStr := err.Error()
if strings.Contains(errStr, "connection reset") ||
strings.Contains(errStr, "broken pipe") ||
strings.Contains(errStr, "use of closed network connection") {
return
}
fmt.Printf("Message handling error: %v\n", err)
return
}
}
}
// Session represents a client session.
type Session struct {
conn net.Conn
reader *bufio.Reader // Buffered reader for reduced syscalls
writer *bufio.Writer // Buffered writer for reduced syscalls
server *Server
executor QueryExecutor
version uint32
// Authentication state
authenticated bool // Whether HELLO auth succeeded
authResult *BoltAuthResult // Auth result with roles/permissions
// Transaction state
inTransaction bool
txMetadata map[string]any // Transaction metadata from BEGIN
// Query result state (for streaming with PULL)
lastResult *QueryResult
resultIndex int
// Deferred commit state (Neo4j-style optimization)
// Writes are buffered in AsyncEngine until PULL completes
pendingFlush bool
// Query metadata for Neo4j driver compatibility
queryId int64 // Query ID counter for qid field
lastQueryIsWrite bool // Was last query a write operation
// Reusable buffers to reduce allocations
headerBuf [2]byte // For reading chunk headers
messageBuf []byte // Reusable message buffer
// Async message processing (Neo4j-style batching)
messageQueue chan *boltMessage // Incoming messages queue
writeMu sync.Mutex // Protects writer for concurrent access
}
// boltMessage represents a parsed Bolt message ready for processing
type boltMessage struct {
msgType byte
data []byte
}
// handshake performs the Bolt handshake.
func (s *Session) handshake() error {
// Read magic number (4 bytes: 0x60 0x60 0xB0 0x17)
var magic [4]byte
if _, err := io.ReadFull(s.reader, magic[:]); err != nil {
return fmt.Errorf("failed to read magic: %w", err)
}
if magic[0] != 0x60 || magic[1] != 0x60 || magic[2] != 0xB0 || magic[3] != 0x17 {
return fmt.Errorf("invalid magic number: %x", magic)
}
// Read supported versions (4 x 4 bytes)
var versions [16]byte
if _, err := io.ReadFull(s.reader, versions[:]); err != nil {
return fmt.Errorf("failed to read versions: %w", err)
}
// Select highest supported version
s.version = BoltV4_4
// Send selected version using buffered writer
response := []byte{0x00, 0x00, 0x04, 0x04} // Bolt 4.4
if _, err := s.writer.Write(response); err != nil {
return fmt.Errorf("failed to send version: %w", err)
}
if err := s.writer.Flush(); err != nil {
return fmt.Errorf("failed to flush version: %w", err)
}
return nil
}
// readMessage reads a single Bolt message from the connection.
// Returns the parsed message or nil for empty messages.
func (s *Session) readMessage() (*boltMessage, error) {
// Create a new buffer for this message (since we're async, can't reuse)
var msgBuf []byte
// Read chunks until we get a zero-size chunk (message terminator)
var headerBuf [2]byte
for {
if _, err := io.ReadFull(s.reader, headerBuf[:]); err != nil {
return nil, err
}
size := int(headerBuf[0])<<8 | int(headerBuf[1])
if size == 0 {
break
}
// Grow buffer
oldLen := len(msgBuf)
newLen := oldLen + size
if cap(msgBuf) < newLen {
newBuf := make([]byte, newLen, newLen*2)
copy(newBuf, msgBuf)
msgBuf = newBuf
} else {
msgBuf = msgBuf[:newLen]
}
if _, err := io.ReadFull(s.reader, msgBuf[oldLen:newLen]); err != nil {
return nil, err
}
}
if len(msgBuf) == 0 {
return nil, nil // Empty message (no-op)
}
// Parse message type
if len(msgBuf) < 2 {
return nil, fmt.Errorf("message too short: %d bytes", len(msgBuf))
}
// Bolt messages are PackStream structures
structMarker := msgBuf[0]
var msgType byte
var data []byte
if structMarker >= 0xB0 && structMarker <= 0xBF {
msgType = msgBuf[1]
data = msgBuf[2:]
} else {
msgType = msgBuf[0]
data = msgBuf[1:]
}
// Make a copy of data since buffer might be reused
dataCopy := make([]byte, len(data))
copy(dataCopy, data)
return &boltMessage{msgType: msgType, data: dataCopy}, nil
}
// processMessage processes a parsed Bolt message.
func (s *Session) processMessage(msg *boltMessage) error {
// No mutex needed - messages are processed sequentially from the queue
return s.dispatchMessage(msg.msgType, msg.data)
}
// handleMessage handles a single Bolt message synchronously (for compatibility).
// Bolt messages can span multiple chunks - we read until we get a 0-size chunk.
func (s *Session) handleMessage() error {
// Reuse message buffer - reset length but keep capacity
s.messageBuf = s.messageBuf[:0]
// Read chunks until we get a zero-size chunk (message terminator)
for {
// Read chunk header using reusable buffer (no allocation)
if _, err := io.ReadFull(s.reader, s.headerBuf[:]); err != nil {
return err
}
size := int(s.headerBuf[0])<<8 | int(s.headerBuf[1])
if size == 0 {
// Zero-size chunk marks end of message
break
}
// Ensure message buffer has capacity
oldLen := len(s.messageBuf)
newLen := oldLen + size
if cap(s.messageBuf) < newLen {
// Need to grow - double capacity or use needed size
newCap := cap(s.messageBuf) * 2
if newCap < newLen {
newCap = newLen
}
newBuf := make([]byte, newLen, newCap)
copy(newBuf, s.messageBuf)
s.messageBuf = newBuf
} else {
s.messageBuf = s.messageBuf[:newLen]
}
// Read chunk data directly into message buffer
if _, err := io.ReadFull(s.reader, s.messageBuf[oldLen:newLen]); err != nil {
return err
}
}
if len(s.messageBuf) == 0 {
return nil // Empty message (no-op)
}
// Parse and handle message
if len(s.messageBuf) < 2 {
return fmt.Errorf("message too short: %d bytes", len(s.messageBuf))
}
// Bolt messages are PackStream structures
structMarker := s.messageBuf[0]
// Check if it's a tiny struct (0xB0-0xBF)
if structMarker >= 0xB0 && structMarker <= 0xBF {
msgType := s.messageBuf[1]
msgData := s.messageBuf[2:]
return s.dispatchMessage(msgType, msgData)
}
// For non-struct markers, try direct message type (fallback)
return s.dispatchMessage(s.messageBuf[0], s.messageBuf[1:])
}
// dispatchMessage routes the message to the appropriate handler.
func (s *Session) dispatchMessage(msgType byte, data []byte) error {
switch msgType {
case MsgHello:
return s.handleHello(data)
case MsgGoodbye:
return io.EOF
case MsgRun:
return s.handleRun(data)
case MsgPull:
return s.handlePull(data)
case MsgDiscard:
return s.handleDiscard(data)
case MsgReset:
return s.handleReset(data)
case MsgBegin:
return s.handleBegin(data)
case MsgCommit:
return s.handleCommit(data)
case MsgRollback:
return s.handleRollback(data)
case MsgRoute:
return s.handleRoute(data)
default:
return fmt.Errorf("unknown message type: 0x%02X", msgType)
}
}
// handleHello handles the HELLO message with authentication.
// Neo4j HELLO message format:
//
// HELLO { user_agent: String, scheme: String, principal: String, credentials: String, ... }
//
// Authentication schemes:
// - "none": Anonymous access (if AllowAnonymous is true)
// - "basic": Username/password authentication
// - "bearer": JWT token authentication (credentials contains the token)
//
// For cluster authentication with shared JWT:
// - All nodes share the same JWT secret (NORNICDB_JWT_SECRET)
// - Generate a cluster token: POST /api/v1/auth/cluster-token
// - Connect using bearer scheme with the token as credentials
//
// Server-to-server clustering uses the same auth mechanism with service accounts.
func (s *Session) handleHello(data []byte) error {
// Parse HELLO message to extract authentication details
authParams, err := s.parseHelloAuth(data)
if err != nil {
return s.sendFailure("Neo.ClientError.Request.Invalid", fmt.Sprintf("Failed to parse HELLO: %v", err))
}
// Check if authentication is required
if s.server != nil && s.server.config.Authenticator != nil {
scheme := authParams["scheme"]
principal := authParams["principal"]
credentials := authParams["credentials"]
// Handle anonymous auth
if scheme == "none" || scheme == "" {
if !s.server.config.AllowAnonymous {
return s.sendFailure("Neo.ClientError.Security.Unauthorized", "Authentication required")
}
// Anonymous user gets viewer role
s.authenticated = true
s.authResult = &BoltAuthResult{
Authenticated: true,
Username: "anonymous",
Roles: []string{"viewer"},
}
} else if scheme == "basic" {
// Authenticate with provided credentials
result, err := s.server.config.Authenticator.Authenticate(scheme, principal, credentials)
if err != nil {
remoteAddr := "unknown"
if s.conn != nil {
remoteAddr = s.conn.RemoteAddr().String()
}
fmt.Printf("[BOLT] Auth failed for %q from %s: %v\n", principal, remoteAddr, err)
return s.sendFailure("Neo.ClientError.Security.Unauthorized", "Invalid credentials")
}
s.authenticated = true
s.authResult = result
} else if scheme == "bearer" {
// JWT token authentication - used for cluster inter-node auth
result, err := s.server.config.Authenticator.Authenticate(scheme, principal, credentials)
if err != nil {
remoteAddr := "unknown"
if s.conn != nil {
remoteAddr = s.conn.RemoteAddr().String()
}
fmt.Printf("[BOLT] Bearer auth failed from %s: %v\n", remoteAddr, err)
return s.sendFailure("Neo.ClientError.Security.Unauthorized", "Invalid or expired token")
}
s.authenticated = true
s.authResult = result
} else {
return s.sendFailure("Neo.ClientError.Security.Unauthorized", fmt.Sprintf("Unsupported auth scheme: %s", scheme))
}
} else if s.server != nil && s.server.config.RequireAuth {
// Auth required but no authenticator configured - reject all
return s.sendFailure("Neo.ClientError.Security.Unauthorized", "Authentication required but not configured")
} else {
// No auth configured - allow all (development mode)
s.authenticated = true
s.authResult = &BoltAuthResult{
Authenticated: true,
Username: "anonymous",
Roles: []string{"admin"}, // Full access in dev mode
}
}
// Log successful auth
if s.server != nil && s.server.config.LogQueries {
remoteAddr := "unknown"
if s.conn != nil {
remoteAddr = s.conn.RemoteAddr().String()
}
fmt.Printf("[BOLT] Auth success: user=%s roles=%v from=%s\n",
s.authResult.Username, s.authResult.Roles, remoteAddr)
}
return s.sendSuccess(map[string]any{
"server": "NornicDB/0.1.0",
"connection_id": "nornic-1",
"hints": map[string]any{},
})
}
// parseHelloAuth parses authentication parameters from a HELLO message.
// Returns a map with keys: scheme, principal, credentials
func (s *Session) parseHelloAuth(data []byte) (map[string]string, error) {
result := map[string]string{
"scheme": "",
"principal": "",
"credentials": "",
}
if len(data) == 0 {
return result, nil
}
// HELLO is a structure: [extra: Map]
// First byte is marker for structure
marker := data[0]
// Check for tiny struct marker (0xB0-0xBF = struct with 0-15 fields)
// HELLO has signature 0x01 and one field (the extra map)
offset := 0
if marker >= 0xB0 && marker <= 0xBF {
offset = 2 // Skip struct marker and signature byte
} else {
// Try to find the map directly
offset = 0
}
if offset >= len(data) {
return result, nil
}
// Parse the extra map
extraMap, _, err := decodePackStreamMap(data, offset)
if err != nil {
return result, fmt.Errorf("failed to decode HELLO extra map: %w", err)
}
// Extract auth fields
if scheme, ok := extraMap["scheme"].(string); ok {
result["scheme"] = scheme
}
if principal, ok := extraMap["principal"].(string); ok {
result["principal"] = principal
}
if credentials, ok := extraMap["credentials"].(string); ok {
result["credentials"] = credentials
}
return result, nil
}
// handleRun handles the RUN message (execute Cypher).
func (s *Session) handleRun(data []byte) error {
// Check authentication
if s.server != nil && s.server.config.RequireAuth && !s.authenticated {
return s.sendFailure("Neo.ClientError.Security.Unauthorized", "Not authenticated")
}
// Parse PackStream to extract query and params
query, params, err := s.parseRunMessage(data)
if err != nil {
return s.sendFailure("Neo.ClientError.Request.Invalid", fmt.Sprintf("Failed to parse RUN message: %v", err))
}
// Classify query type once (used for auth and deferred flush)
upperQuery := strings.ToUpper(query)
isWrite := strings.Contains(upperQuery, "CREATE") ||
strings.Contains(upperQuery, "DELETE") ||
strings.Contains(upperQuery, "SET ") ||
strings.Contains(upperQuery, "MERGE") ||
strings.Contains(upperQuery, "REMOVE ")
isSchema := strings.Contains(upperQuery, "INDEX") ||
strings.Contains(upperQuery, "CONSTRAINT")
// Check permissions based on query type
if s.authResult != nil {
if isSchema && !s.authResult.HasPermission("schema") {
return s.sendFailure("Neo.ClientError.Security.Forbidden", "Schema operations require schema permission")
}
if isWrite && !s.authResult.HasPermission("write") {
return s.sendFailure("Neo.ClientError.Security.Forbidden", "Write operations require write permission")
}
if !s.authResult.HasPermission("read") {
return s.sendFailure("Neo.ClientError.Security.Forbidden", "Read operations require read permission")
}
}
// Log query if enabled
if s.server != nil && s.server.config.LogQueries {
remoteAddr := "unknown"
if s.conn != nil {
remoteAddr = s.conn.RemoteAddr().String()
}
user := "unknown"
if s.authResult != nil {
user = s.authResult.Username
}
if len(params) > 0 {
fmt.Printf("[BOLT] %s@%s: %s (params: %v)\n", user, remoteAddr, truncateQuery(query, 200), params)
} else {
fmt.Printf("[BOLT] %s@%s: %s\n", user, remoteAddr, truncateQuery(query, 200))
}
}
// Execute query
ctx := context.Background()
result, err := s.executor.Execute(ctx, query, params)
if err != nil {
if s.server != nil && s.server.config.LogQueries {
fmt.Printf("[BOLT] ERROR: %v\n", err)
}
return s.sendFailure("Neo.ClientError.Statement.SyntaxError", err.Error())
}
// Track write operation for deferred flush
if isWrite {
s.pendingFlush = true
}
s.lastQueryIsWrite = isWrite
// Store result for PULL
s.lastResult = result
s.resultIndex = 0
s.queryId++
// Return SUCCESS with field names (Neo4j compatible metadata)
// Note: Neo4j only sends qid for EXPLICIT transactions, not implicit/autocommit
// For implicit transactions, only send fields and t_first
if s.inTransaction {
return s.sendSuccess(map[string]any{
"fields": result.Columns,
"t_first": int64(0),
"qid": s.queryId,
})
}
return s.sendSuccess(map[string]any{
"fields": result.Columns,
"t_first": int64(0),
})
}
// truncateQuery truncates a query for logging.
func truncateQuery(q string, maxLen int) string {
if len(q) <= maxLen {
return q
}
return q[:maxLen] + "..."
}
// parseRunMessage parses a RUN message to extract query and parameters.
// Bolt v4+ RUN message format: [query: String, parameters: Map, extra: Map]
func (s *Session) parseRunMessage(data []byte) (string, map[string]any, error) {
if len(data) == 0 {
return "", nil, fmt.Errorf("empty RUN message")
}
offset := 0
// Parse query string
query, n, err := decodePackStreamString(data, offset)
if err != nil {
return "", nil, fmt.Errorf("failed to parse query: %w", err)
}
offset += n
// Parse parameters map
params := make(map[string]any)
if offset < len(data) {
p, consumed, err := decodePackStreamMap(data, offset)
if err != nil {
// Params parse failed, use empty map
params = make(map[string]any)
} else {
params = p
offset += consumed
}
}
// Bolt v4+ has an extra metadata map after params (for bookmarks, tx_timeout, etc.)
// We can ignore it for now, but we should parse it to avoid issues
// offset is now pointing to the extra map if present
return query, params, nil
}
// handlePull handles the PULL message.
func (s *Session) handlePull(data []byte) error {
if s.lastResult == nil {
// Neo4j doesn't send has_more when false - just empty metadata
return s.sendSuccess(map[string]any{})
}
// Parse PULL options (n = number of records to pull)
pullN := -1 // Default: all records
if len(data) > 0 {
opts, _, err := decodePackStreamMap(data, 0)
if err == nil {
if n, ok := opts["n"]; ok {
switch v := n.(type) {
case int64:
pullN = int(v)
case int:
pullN = v
}
}
}
}
// Stream records - use batched writing for large result sets
remaining := len(s.lastResult.Rows) - s.resultIndex
if pullN > 0 && remaining > pullN {
remaining = pullN
}
// For large batches (>50 records), use batched writing to reduce syscalls
if remaining > 50 {
if err := s.sendRecordsBatched(s.lastResult.Rows[s.resultIndex : s.resultIndex+remaining]); err != nil {
return err
}
s.resultIndex += remaining
} else {
// Small batches: send individually (avoids buffer allocation overhead)
for s.resultIndex < len(s.lastResult.Rows) {
if pullN == 0 {
break
}
row := s.lastResult.Rows[s.resultIndex]
if err := s.sendRecord(row); err != nil {
return err
}
s.resultIndex++
if pullN > 0 {
pullN--
}
}
}
// Check if more records available
hasMore := s.resultIndex < len(s.lastResult.Rows)
// Clear result if done
if !hasMore {
s.lastResult = nil
s.resultIndex = 0
// Neo4j-style deferred commit: flush pending writes after streaming completes
if s.pendingFlush {
if flushable, ok := s.executor.(FlushableExecutor); ok {
flushable.Flush()
}
s.pendingFlush = false
}
// Return metadata for completed query (Neo4j compatibility)
// Neo4j sends: type, bookmark, t_last, stats, db (but NOT has_more when false)
queryType := "r"
if s.lastQueryIsWrite {
queryType = "w"
}
// Build stats matching Neo4j format (only if there are updates)
metadata := map[string]any{
"bookmark": "nornicdb:tx:auto",
"type": queryType,
"t_last": int64(0), // Streaming time
"db": "neo4j", // Default database name
}
// Note: Neo4j does NOT send has_more when it's false
return s.sendSuccess(metadata)
}
// When there are more records, send has_more: true
return s.sendSuccess(map[string]any{
"has_more": true,
})
}
// handleDiscard handles the DISCARD message.
func (s *Session) handleDiscard(data []byte) error {
s.lastResult = nil
s.resultIndex = 0
// Neo4j doesn't send has_more when false - just empty metadata
return s.sendSuccess(map[string]any{})
}
// handleRoute handles the ROUTE message (for cluster routing).
func (s *Session) handleRoute(data []byte) error {
return s.sendSuccess(map[string]any{
"rt": map[string]any{
"ttl": 300,
"servers": []map[string]any{},
},
})
}
// handleReset handles the RESET message.
// Resets the session state and rolls back any active transaction.
func (s *Session) handleReset(data []byte) error {
// Rollback any active transaction
if s.inTransaction {
if txExec, ok := s.executor.(TransactionalExecutor); ok {
ctx := context.Background()
_ = txExec.RollbackTransaction(ctx) // Ignore error on reset
}
}
s.inTransaction = false
s.txMetadata = nil
s.lastResult = nil
s.resultIndex = 0
return s.sendSuccess(nil)
}
// handleBegin handles the BEGIN message.
// If the executor implements TransactionalExecutor, starts a real transaction.
// Otherwise, just tracks the transaction state for protocol compliance.
func (s *Session) handleBegin(data []byte) error {
// Parse BEGIN metadata (contains tx_timeout, bookmarks, etc.)
var metadata map[string]any
if len(data) > 0 {
m, _, err := decodePackStreamMap(data, 0)
if err == nil {
metadata = m
}
}
s.txMetadata = metadata
// If executor supports transactions, start one
if txExec, ok := s.executor.(TransactionalExecutor); ok {
ctx := context.Background()
if err := txExec.BeginTransaction(ctx, metadata); err != nil {
return s.sendFailure("Neo.TransactionError.Begin", err.Error())
}
}
s.inTransaction = true
return s.sendSuccess(nil)
}
// handleCommit handles the COMMIT message.
// If the executor implements TransactionalExecutor, commits the real transaction.
func (s *Session) handleCommit(data []byte) error {
if !s.inTransaction {
return s.sendFailure("Neo.ClientError.Transaction.TransactionNotFound",
"No transaction to commit")
}
// If executor supports transactions, commit
if txExec, ok := s.executor.(TransactionalExecutor); ok {
ctx := context.Background()
if err := txExec.CommitTransaction(ctx); err != nil {
s.inTransaction = false
s.txMetadata = nil
return s.sendFailure("Neo.TransactionError.Commit", err.Error())
}
}
s.inTransaction = false
s.txMetadata = nil
// Return bookmark for client tracking
return s.sendSuccess(map[string]any{
"bookmark": "nornicdb:bookmark:1",
})
}
// handleRollback handles the ROLLBACK message.
// If the executor implements TransactionalExecutor, rolls back the real transaction.
func (s *Session) handleRollback(data []byte) error {
if !s.inTransaction {
// Not an error to rollback when not in transaction (Neo4j behavior)
return s.sendSuccess(nil)
}
// If executor supports transactions, rollback
if txExec, ok := s.executor.(TransactionalExecutor); ok {
ctx := context.Background()
if err := txExec.RollbackTransaction(ctx); err != nil {
// Rollback failed, but we still clear state
s.inTransaction = false
s.txMetadata = nil
return s.sendFailure("Neo.TransactionError.Rollback", err.Error())
}
}
s.inTransaction = false
s.txMetadata = nil
return s.sendSuccess(nil)
}
// sendRecord sends a RECORD response.
func (s *Session) sendRecord(fields []any) error {
// Format: <struct marker 0xB1> <signature 0x71> <list of fields>
buf := []byte{0xB1, MsgRecord}
buf = append(buf, encodePackStreamList(fields)...)
return s.sendChunk(buf)
}
// sendRecordsBatched sends multiple RECORD responses using buffered I/O.
// This dramatically reduces syscall overhead for large result sets.
// For 500 records: ~500 syscalls → 1 syscall = ~8x faster
func (s *Session) sendRecordsBatched(rows [][]any) error {
if len(rows) == 0 {
return nil
}
// Write all records to buffer
for _, row := range rows {
recordData := []byte{0xB1, MsgRecord}
recordData = append(recordData, encodePackStreamList(row)...)
// Write chunk header
size := len(recordData)
s.writer.WriteByte(byte(size >> 8))
s.writer.WriteByte(byte(size))
// Write record data
s.writer.Write(recordData)
// Write terminator
s.writer.WriteByte(0)
s.writer.WriteByte(0)
}
// Don't flush here - let the final SUCCESS message flush everything
return nil
}
// sendSuccess sends a SUCCESS response with PackStream encoding.
// Pre-allocated success header
var successHeader = []byte{0xB1, MsgSuccess}
func (s *Session) sendSuccess(metadata map[string]any) error {
// Reuse buffer from pool for small responses
buf := make([]byte, 0, 128)
buf = append(buf, successHeader...)
buf = append(buf, encodePackStreamMap(metadata)...)
return s.sendChunk(buf)
}
// sendFailure sends a FAILURE response.
func (s *Session) sendFailure(code, message string) error {
buf := []byte{0xB1, MsgFailure}
metadata := map[string]any{
"code": code,
"message": message,
}
buf = append(buf, encodePackStreamMap(metadata)...)
return s.sendChunk(buf)
}
// sendChunk sends a chunk to the client using buffered I/O.
// The buffer is flushed after each complete message response.
func (s *Session) sendChunk(data []byte) error {
size := len(data)
// Write chunk header (2 bytes)
s.writer.WriteByte(byte(size >> 8))
s.writer.WriteByte(byte(size))
// Write data
s.writer.Write(data)
// Write terminator (0x00 0x00)
s.writer.WriteByte(0)
s.writer.WriteByte(0)
// Flush immediately to ensure response is sent
// This is critical for request-response protocols
return s.writer.Flush()
}
// ============================================================================
// PackStream Encoding
// ============================================================================
func encodePackStreamMap(m map[string]any) []byte {
if len(m) == 0 {
return []byte{0xA0}
}
var buf []byte
size := len(m)
if size < 16 {
buf = append(buf, byte(0xA0+size))
} else if size < 256 {
buf = append(buf, 0xD8, byte(size))
} else {
buf = append(buf, 0xD9, byte(size>>8), byte(size))
}
for k, v := range m {
buf = append(buf, encodePackStreamString(k)...)
buf = append(buf, encodePackStreamValue(v)...)
}
return buf
}
func encodePackStreamList(items []any) []byte {
if len(items) == 0 {
return []byte{0x90}
}
var buf []byte
size := len(items)
if size < 16 {
buf = append(buf, byte(0x90+size))
} else if size < 256 {
buf = append(buf, 0xD4, byte(size))
} else {
buf = append(buf, 0xD5, byte(size>>8), byte(size))
}
for _, item := range items {
buf = append(buf, encodePackStreamValue(item)...)
}
return buf
}
func encodePackStreamString(s string) []byte {
length := len(s)
var buf []byte
if length < 16 {
buf = append(buf, byte(0x80+length))
} else if length < 256 {
buf = append(buf, 0xD0, byte(length))
} else if length < 65536 {
buf = append(buf, 0xD1, byte(length>>8), byte(length))
} else {
buf = append(buf, 0xD2, byte(length>>24), byte(length>>16), byte(length>>8), byte(length))
}
buf = append(buf, []byte(s)...)
return buf
}
func encodePackStreamValue(v any) []byte {
switch val := v.(type) {
case nil:
return []byte{0xC0}
case bool:
if val {
return []byte{0xC3}
}
return []byte{0xC2}
// All integer types - encode as INT64 for Neo4j driver compatibility
case int:
return encodePackStreamInt(int64(val))
case int8:
return encodePackStreamInt(int64(val))
case int16:
return encodePackStreamInt(int64(val))
case int32:
return encodePackStreamInt(int64(val))
case int64:
return encodePackStreamInt(val)
case uint:
return encodePackStreamInt(int64(val))
case uint8:
return encodePackStreamInt(int64(val))
case uint16:
return encodePackStreamInt(int64(val))
case uint32:
return encodePackStreamInt(int64(val))
case uint64:
return encodePackStreamInt(int64(val))
// Float types
case float32:
buf := make([]byte, 9)
buf[0] = 0xC1
binary.BigEndian.PutUint64(buf[1:], math.Float64bits(float64(val)))
return buf
case float64:
buf := make([]byte, 9)
buf[0] = 0xC1
binary.BigEndian.PutUint64(buf[1:], math.Float64bits(val))
return buf
case string:
return encodePackStreamString(val)
// List types
case []string:
items := make([]any, len(val))
for i, s := range val {
items[i] = s
}
return encodePackStreamList(items)
case []any:
return encodePackStreamList(val)
case []int:
items := make([]any, len(val))
for i, n := range val {
items[i] = int64(n)
}
return encodePackStreamList(items)
case []int64:
items := make([]any, len(val))
for i, n := range val {
items[i] = n
}
return encodePackStreamList(items)
case []float64:
items := make([]any, len(val))
for i, n := range val {
items[i] = n
}
return encodePackStreamList(items)
case []float32:
items := make([]any, len(val))
for i, n := range val {
items[i] = float64(n)
}
return encodePackStreamList(items)
case []map[string]any:
items := make([]any, len(val))
for i, m := range val {
items[i] = m
}
return encodePackStreamList(items)
// Map types
case map[string]any:
// Check if this is a node (has _nodeId and labels)
if nodeId, hasNodeId := val["_nodeId"]; hasNodeId {
if labels, hasLabels := val["labels"]; hasLabels {
return encodeNode(nodeId, labels, val)
}
}
return encodePackStreamMap(val)
default:
// Unknown type - encode as null
return []byte{0xC0}
}
}
// encodeNode encodes a node as a proper Bolt Node structure (signature 0x4E).
// This makes nodes compatible with Neo4j drivers that expect Node instances with .properties.
// Format: STRUCT(3 fields, signature 0x4E) + id + labels + properties
func encodeNode(nodeId any, labels any, nodeMap map[string]any) []byte {
// Bolt Node structure: B3 4E (tiny struct, 3 fields, signature 'N')
buf := []byte{0xB3, 0x4E}
// Field 1: Node ID (as int64 for Neo4j compatibility)
// Use element_id or _nodeId string, hash it to int64 for now
idStr, _ := nodeId.(string)
// Use a simple hash - Neo4j drivers use int64 IDs
var id int64 = 0
for _, c := range idStr {
id = id*31 + int64(c)
}
buf = append(buf, encodePackStreamInt(id)...)
// Field 2: Labels (list of strings)
labelList := make([]any, 0)
switch l := labels.(type) {
case []string:
for _, s := range l {
labelList = append(labelList, s)
}
case []any:
labelList = l
}
buf = append(buf, encodePackStreamList(labelList)...)
// Field 3: Properties (map) - exclude internal fields
props := make(map[string]any)
for k, v := range nodeMap {
// Skip internal fields
if k == "_nodeId" || k == "labels" {
continue
}
props[k] = v
}
buf = append(buf, encodePackStreamMap(props)...)
return buf
}
func encodePackStreamInt(val int64) []byte {
if val >= -16 && val <= 127 {
return []byte{byte(val)}
}
if val >= -128 && val < -16 {
return []byte{0xC8, byte(val)}
}
if val >= -32768 && val <= 32767 {
return []byte{0xC9, byte(val >> 8), byte(val)}
}
if val >= -2147483648 && val <= 2147483647 {
return []byte{0xCA, byte(val >> 24), byte(val >> 16), byte(val >> 8), byte(val)}
}
return []byte{0xCB, byte(val >> 56), byte(val >> 48), byte(val >> 40), byte(val >> 32),
byte(val >> 24), byte(val >> 16), byte(val >> 8), byte(val)}
}
// ============================================================================
// PackStream Decoding
// ============================================================================
func decodePackStreamString(data []byte, offset int) (string, int, error) {
if offset >= len(data) {
return "", 0, fmt.Errorf("offset out of bounds")
}
startOffset := offset
marker := data[offset]
offset++
var length int
// Tiny string (0x80-0x8F)
if marker >= 0x80 && marker <= 0x8F {
length = int(marker - 0x80)
} else if marker == 0xD0 { // STRING8
if offset >= len(data) {
return "", 0, fmt.Errorf("incomplete STRING8")
}
length = int(data[offset])
offset++
} else if marker == 0xD1 { // STRING16
if offset+1 >= len(data) {
return "", 0, fmt.Errorf("incomplete STRING16")
}
length = int(data[offset])<<8 | int(data[offset+1])
offset += 2
} else if marker == 0xD2 { // STRING32
if offset+3 >= len(data) {
return "", 0, fmt.Errorf("incomplete STRING32")
}
length = int(data[offset])<<24 | int(data[offset+1])<<16 | int(data[offset+2])<<8 | int(data[offset+3])
offset += 4
} else {
return "", 0, fmt.Errorf("not a string marker: 0x%02X", marker)
}
if offset+length > len(data) {
return "", 0, fmt.Errorf("string data out of bounds")
}
str := string(data[offset : offset+length])
return str, (offset + length) - startOffset, nil
}
func decodePackStreamMap(data []byte, offset int) (map[string]any, int, error) {
if offset >= len(data) {
return nil, 0, fmt.Errorf("offset out of bounds")
}
marker := data[offset]
startOffset := offset
offset++
var size int
// Tiny map (0xA0-0xAF)
if marker >= 0xA0 && marker <= 0xAF {
size = int(marker - 0xA0)
} else if marker == 0xD8 { // MAP8
if offset >= len(data) {
return nil, 0, fmt.Errorf("incomplete MAP8")
}
size = int(data[offset])
offset++
} else if marker == 0xD9 { // MAP16
if offset+1 >= len(data) {
return nil, 0, fmt.Errorf("incomplete MAP16")
}
size = int(data[offset])<<8 | int(data[offset+1])
offset += 2
} else {
return nil, 0, fmt.Errorf("not a map marker: 0x%02X", marker)
}
result := make(map[string]any)
for i := 0; i < size; i++ {
// Decode key (must be string)
key, n, err := decodePackStreamString(data, offset)
if err != nil {
return nil, 0, fmt.Errorf("failed to decode map key: %w", err)
}
offset += n
// Decode value
value, n, err := decodePackStreamValue(data, offset)
if err != nil {
return nil, 0, fmt.Errorf("failed to decode map value for key %s: %w", key, err)
}
offset += n
result[key] = value
}
return result, offset - startOffset, nil
}
func decodePackStreamValue(data []byte, offset int) (any, int, error) {
if offset >= len(data) {
return nil, 0, fmt.Errorf("offset out of bounds")
}
marker := data[offset]
// Null
if marker == 0xC0 {
return nil, 1, nil
}
// Boolean
if marker == 0xC2 {
return false, 1, nil
}
if marker == 0xC3 {
return true, 1, nil
}
// Tiny positive int (0x00-0x7F)
if marker <= 0x7F {
return int64(marker), 1, nil
}
// Tiny negative int (0xF0-0xFF = -16 to -1)
if marker >= 0xF0 {
return int64(int8(marker)), 1, nil
}
// INT8
if marker == 0xC8 {
if offset+1 >= len(data) {
return nil, 0, fmt.Errorf("incomplete INT8")
}
return int64(int8(data[offset+1])), 2, nil
}
// INT16
if marker == 0xC9 {
if offset+2 >= len(data) {
return nil, 0, fmt.Errorf("incomplete INT16")
}
val := int16(data[offset+1])<<8 | int16(data[offset+2])
return int64(val), 3, nil
}
// INT32
if marker == 0xCA {
if offset+4 >= len(data) {
return nil, 0, fmt.Errorf("incomplete INT32")
}
val := int32(data[offset+1])<<24 | int32(data[offset+2])<<16 | int32(data[offset+3])<<8 | int32(data[offset+4])
return int64(val), 5, nil
}
// INT64
if marker == 0xCB {
if offset+8 >= len(data) {
return nil, 0, fmt.Errorf("incomplete INT64")
}
val := int64(data[offset+1])<<56 | int64(data[offset+2])<<48 | int64(data[offset+3])<<40 | int64(data[offset+4])<<32 |
int64(data[offset+5])<<24 | int64(data[offset+6])<<16 | int64(data[offset+7])<<8 | int64(data[offset+8])
return val, 9, nil
}
// Float64
if marker == 0xC1 {
if offset+8 >= len(data) {
return nil, 0, fmt.Errorf("incomplete Float64")
}
bits := binary.BigEndian.Uint64(data[offset+1 : offset+9])
return math.Float64frombits(bits), 9, nil
}
// String
if marker >= 0x80 && marker <= 0x8F || marker == 0xD0 || marker == 0xD1 || marker == 0xD2 {
return decodePackStreamString(data, offset)
}
// List
if marker >= 0x90 && marker <= 0x9F || marker == 0xD4 || marker == 0xD5 || marker == 0xD6 {
return decodePackStreamList(data, offset)
}
// Map
if marker >= 0xA0 && marker <= 0xAF || marker == 0xD8 || marker == 0xD9 || marker == 0xDA {
return decodePackStreamMap(data, offset)
}
// Structure (for nodes, relationships, etc.) - skip for now
if marker >= 0xB0 && marker <= 0xBF {
// Tiny structure - skip
return nil, 1, nil
}
return nil, 0, fmt.Errorf("unknown marker: 0x%02X", marker)
}
func decodePackStreamList(data []byte, offset int) ([]any, int, error) {
if offset >= len(data) {
return nil, 0, fmt.Errorf("offset out of bounds")
}
marker := data[offset]
startOffset := offset
offset++
var size int
// Tiny list (0x90-0x9F)
if marker >= 0x90 && marker <= 0x9F {
size = int(marker - 0x90)
} else if marker == 0xD4 { // LIST8
if offset >= len(data) {
return nil, 0, fmt.Errorf("incomplete LIST8")
}
size = int(data[offset])
offset++
} else if marker == 0xD5 { // LIST16
if offset+1 >= len(data) {
return nil, 0, fmt.Errorf("incomplete LIST16")
}
size = int(data[offset])<<8 | int(data[offset+1])
offset += 2
} else if marker == 0xD6 { // LIST32
if offset+3 >= len(data) {
return nil, 0, fmt.Errorf("incomplete LIST32")
}
size = int(data[offset])<<24 | int(data[offset+1])<<16 | int(data[offset+2])<<8 | int(data[offset+3])
offset += 4
} else {
return nil, 0, fmt.Errorf("not a list marker: 0x%02X", marker)
}
result := make([]any, size)
for i := 0; i < size; i++ {
value, n, err := decodePackStreamValue(data, offset)
if err != nil {
return nil, 0, fmt.Errorf("failed to decode list item %d: %w", i, err)
}
result[i] = value
offset += n
}
return result, offset - startOffset, nil
}