Skip to main content
Glama
lock-free-alternatives.md20.2 kB
# Lock-Free Alternatives Analysis for Brummer ## Executive Summary This document analyzes opportunities for lock-free programming in the Brummer codebase using Go's sync/atomic package. We identify specific areas where atomic operations can replace mutex-based synchronization, providing performance benefits while maintaining correctness. ## Current Mutex Usage Analysis ### 1. Critical Path Mutex Bottlenecks #### EventBus Handler Registration (pkg/events/events.go) ```go // Current mutex-heavy approach type EventBus struct { handlers map[EventType][]Handler mu sync.RWMutex // Bottleneck for concurrent Subscribe/Publish } func (eb *EventBus) Subscribe(eventType EventType, handler Handler) { eb.mu.Lock() // Blocks all operations defer eb.mu.Unlock() eb.handlers[eventType] = append(eb.handlers[eventType], handler) } ``` **Analysis:** - **Contention**: High during startup when many handlers register - **Read/Write Ratio**: 95% reads (Publish) vs 5% writes (Subscribe) - **Lock-Free Opportunity**: High - Use atomic.Value for immutable handler maps #### Process Manager State Tracking (internal/process/manager.go) ```go // Current implementation type Manager struct { processes map[string]*Process mu sync.RWMutex // Frequent contention } type Process struct { Status ProcessStatus mu sync.RWMutex // Per-process lock contention } ``` **Analysis:** - **Contention**: Medium-high for status checks - **Access Pattern**: Frequent status reads, infrequent updates - **Lock-Free Opportunity**: Medium - Use atomic operations for status fields #### Proxy Server Request Counting (internal/proxy/server.go) ```go // Current mutex-protected counters type Server struct { requests []Request mu sync.RWMutex // Lock for every request addition } ``` **Analysis:** - **Contention**: High during request processing - **Access Pattern**: Continuous appends with periodic reads - **Lock-Free Opportunity**: High - Use atomic counters and lock-free data structures ## Lock-Free Implementation Strategies ### 1. Atomic Operations for Simple Values #### Process Status Updates ```go // Before: Mutex-protected status type Process struct { ID string Name string status int32 // Use int32 for atomic operations mu sync.RWMutex } func (p *Process) GetStatus() ProcessStatus { p.mu.RLock() defer p.mu.RUnlock() return p.Status } func (p *Process) SetStatus(status ProcessStatus) { p.mu.Lock() defer p.mu.Unlock() p.Status = status } // After: Lock-free atomic status type Process struct { ID string Name string status int32 // Atomic field } const ( StatusPendingInt int32 = iota StatusRunningInt StatusStoppedInt StatusFailedInt StatusSuccessInt ) func (p *Process) GetStatus() ProcessStatus { val := atomic.LoadInt32(&p.status) return ProcessStatus(val) } func (p *Process) SetStatus(status ProcessStatus) { atomic.StoreInt32(&p.status, int32(status)) } func (p *Process) CompareAndSwapStatus(old, new ProcessStatus) bool { return atomic.CompareAndSwapInt32(&p.status, int32(old), int32(new)) } ``` #### Request Counters and Metrics ```go // Lock-free counters for proxy server type ProxyMetrics struct { totalRequests int64 // atomic counter successfulReqs int64 // atomic counter failedReqs int64 // atomic counter bytesTransferred int64 // atomic counter avgResponseTime int64 // atomic (nanoseconds) } func (pm *ProxyMetrics) IncrementRequests() { atomic.AddInt64(&pm.totalRequests, 1) } func (pm *ProxyMetrics) RecordSuccess() { atomic.AddInt64(&pm.successfulReqs, 1) } func (pm *ProxyMetrics) RecordFailure() { atomic.AddInt64(&pm.failedReqs, 1) } func (pm *ProxyMetrics) AddBytes(bytes int64) { atomic.AddInt64(&pm.bytesTransferred, bytes) } func (pm *ProxyMetrics) UpdateAvgResponseTime(duration time.Duration) { nanos := duration.Nanoseconds() // Exponential moving average using atomic operations for { oldAvg := atomic.LoadInt64(&pm.avgResponseTime) newAvg := oldAvg + (nanos-oldAvg)/10 // Alpha = 0.1 if atomic.CompareAndSwapInt64(&pm.avgResponseTime, oldAvg, newAvg) { break } // Retry on contention } } func (pm *ProxyMetrics) GetStats() (total, success, failed, bytes int64, avgTime time.Duration) { return atomic.LoadInt64(&pm.totalRequests), atomic.LoadInt64(&pm.successfulReqs), atomic.LoadInt64(&pm.failedReqs), atomic.LoadInt64(&pm.bytesTransferred), time.Duration(atomic.LoadInt64(&pm.avgResponseTime)) } ``` ### 2. Copy-on-Write Patterns with atomic.Value #### EventBus Handler Registry ```go // Lock-free handler registry using atomic.Value type EventBus struct { handlers atomic.Value // stores map[EventType][]Handler // For updates, we still need a mutex but only during registration updateMu sync.Mutex } func NewEventBus() *EventBus { eb := &EventBus{} eb.handlers.Store(make(map[EventType][]Handler)) return eb } func (eb *EventBus) Subscribe(eventType EventType, handler Handler) { eb.updateMu.Lock() defer eb.updateMu.Unlock() // Get current map oldMap := eb.handlers.Load().(map[EventType][]Handler) // Create new map with all existing entries newMap := make(map[EventType][]Handler, len(oldMap)+1) for k, v := range oldMap { newMap[k] = v } // Add new handler newMap[eventType] = append(newMap[eventType], handler) // Atomically replace the map eb.handlers.Store(newMap) } func (eb *EventBus) Publish(event Event) { // Lock-free read of handlers handlersMap := eb.handlers.Load().(map[EventType][]Handler) handlers := handlersMap[event.Type] // Process handlers (can be done concurrently with updates) for _, handler := range handlers { go handler(event) } } ``` #### Process Registry with Atomic Snapshots ```go // Lock-free process registry for read-heavy workloads type ProcessRegistry struct { processes atomic.Value // stores map[string]*Process updateMu sync.Mutex // Only for updates } func NewProcessRegistry() *ProcessRegistry { pr := &ProcessRegistry{} pr.processes.Store(make(map[string]*Process)) return pr } func (pr *ProcessRegistry) GetProcess(id string) (*Process, bool) { // Lock-free read processMap := pr.processes.Load().(map[string]*Process) process, exists := processMap[id] return process, exists } func (pr *ProcessRegistry) GetAllProcesses() []*Process { // Lock-free snapshot processMap := pr.processes.Load().(map[string]*Process) result := make([]*Process, 0, len(processMap)) for _, process := range processMap { result = append(result, process) } return result } func (pr *ProcessRegistry) AddProcess(process *Process) { pr.updateMu.Lock() defer pr.updateMu.Unlock() oldMap := pr.processes.Load().(map[string]*Process) newMap := make(map[string]*Process, len(oldMap)+1) // Copy existing processes for k, v := range oldMap { newMap[k] = v } // Add new process newMap[process.ID] = process // Atomic update pr.processes.Store(newMap) } func (pr *ProcessRegistry) RemoveProcess(id string) bool { pr.updateMu.Lock() defer pr.updateMu.Unlock() oldMap := pr.processes.Load().(map[string]*Process) if _, exists := oldMap[id]; !exists { return false } newMap := make(map[string]*Process, len(oldMap)-1) for k, v := range oldMap { if k != id { newMap[k] = v } } pr.processes.Store(newMap) return true } ``` ### 3. Lock-Free Data Structures #### Ring Buffer for Log Entries ```go // Lock-free ring buffer for high-throughput log storage type LockFreeRingBuffer struct { buffer []atomic.Value // Each element is an atomic.Value capacity int writePos int64 // Atomic write position readPos int64 // Atomic read position } func NewLockFreeRingBuffer(capacity int) *LockFreeRingBuffer { rb := &LockFreeRingBuffer{ buffer: make([]atomic.Value, capacity), capacity: capacity, } return rb } func (rb *LockFreeRingBuffer) Push(item interface{}) bool { for { writePos := atomic.LoadInt64(&rb.writePos) nextPos := (writePos + 1) % int64(rb.capacity) // Check if buffer is full readPos := atomic.LoadInt64(&rb.readPos) if nextPos == readPos { return false // Buffer full } // Try to claim this position if atomic.CompareAndSwapInt64(&rb.writePos, writePos, nextPos) { // Successfully claimed position, store the item rb.buffer[writePos].Store(item) return true } // Someone else claimed it, retry } } func (rb *LockFreeRingBuffer) Pop() (interface{}, bool) { for { readPos := atomic.LoadInt64(&rb.readPos) writePos := atomic.LoadInt64(&rb.writePos) // Check if buffer is empty if readPos == writePos { return nil, false } // Try to claim this position nextPos := (readPos + 1) % int64(rb.capacity) if atomic.CompareAndSwapInt64(&rb.readPos, readPos, nextPos) { // Successfully claimed position, load the item item := rb.buffer[readPos].Load() return item, true } // Someone else claimed it, retry } } func (rb *LockFreeRingBuffer) Size() int { writePos := atomic.LoadInt64(&rb.writePos) readPos := atomic.LoadInt64(&rb.readPos) if writePos >= readPos { return int(writePos - readPos) } return int(int64(rb.capacity) - readPos + writePos) } ``` #### Lock-Free Request History ```go // Lock-free request history for proxy server type LockFreeRequestHistory struct { requests *LockFreeRingBuffer metrics *ProxyMetrics } func NewLockFreeRequestHistory(size int) *LockFreeRequestHistory { return &LockFreeRequestHistory{ requests: NewLockFreeRingBuffer(size), metrics: &ProxyMetrics{}, } } func (lh *LockFreeRequestHistory) AddRequest(req *proxy.Request) { // Update metrics atomically lh.metrics.IncrementRequests() if req.StatusCode >= 200 && req.StatusCode < 400 { lh.metrics.RecordSuccess() } else { lh.metrics.RecordFailure() } lh.metrics.AddBytes(req.Size) lh.metrics.UpdateAvgResponseTime(req.Duration) // Add to ring buffer (may drop oldest if full) lh.requests.Push(req) } func (lh *LockFreeRequestHistory) GetRecentRequests(count int) []*proxy.Request { var requests []*proxy.Request // Simple implementation: read all and take last N // More sophisticated version would traverse backwards for { if req, ok := lh.requests.Pop(); ok { if reqTyped, ok := req.(*proxy.Request); ok { requests = append(requests, reqTyped) if len(requests) >= count { break } } } else { break } } return requests } ``` ## Memory Ordering Considerations ### 1. Memory Barriers and Synchronization #### Producer-Consumer Pattern ```go // Careful memory ordering for producer-consumer type MessageQueue struct { data atomic.Value // stores []Message ready int32 // Atomic flag } func (mq *MessageQueue) Publish(messages []Message) { // Store data first mq.data.Store(messages) // Memory barrier - ensure data is visible before flag runtime.Gosched() // Cooperative yield // Set ready flag atomic.StoreInt32(&mq.ready, 1) } func (mq *MessageQueue) Consume() []Message { // Check ready flag if atomic.LoadInt32(&mq.ready) == 0 { return nil } // Memory barrier - ensure flag is read before data runtime.Gosched() // Load data if data := mq.data.Load(); data != nil { return data.([]Message) } return nil } ``` #### Double-Checked Locking for Initialization ```go // Safe double-checked locking using atomic operations type LazyInitializer struct { initialized int32 // Atomic flag mu sync.Mutex // Mutex for initialization value atomic.Value // The actual value } func (li *LazyInitializer) Get() interface{} { // Fast path: check if already initialized if atomic.LoadInt32(&li.initialized) == 1 { return li.value.Load() } // Slow path: need to initialize li.mu.Lock() defer li.mu.Unlock() // Double-check pattern if li.initialized == 0 { // Perform expensive initialization result := performInitialization() // Store result li.value.Store(result) // Memory barrier before setting flag atomic.StoreInt32(&li.initialized, 1) } return li.value.Load() } ``` ## Performance Trade-offs Analysis ### 1. When to Use Lock-Free vs Mutex #### Performance Matrix | Scenario | Contention | Read/Write | Best Choice | Reason | |----------|------------|------------|-------------|---------| | Process status | Low | 90/10 | Atomic ops | Simple values, frequent reads | | Event handlers | Medium | 95/5 | atomic.Value | Immutable snapshots | | Request counters | High | 80/20 | Atomic ops | Counter operations | | Process registry | Medium | 85/15 | atomic.Value | Copy-on-write | | Log buffer | High | 50/50 | Lock-free queue | Continuous append/read | | Complex state | Any | Any | Mutex | Complex invariants | #### Performance Benchmarks ```go func BenchmarkMutexVsAtomic(b *testing.B) { // Mutex-based counter b.Run("Mutex", func(b *testing.B) { var counter int64 var mu sync.Mutex b.RunParallel(func(pb *testing.PB) { for pb.Next() { mu.Lock() counter++ mu.Unlock() } }) }) // Atomic counter b.Run("Atomic", func(b *testing.B) { var counter int64 b.RunParallel(func(pb *testing.PB) { for pb.Next() { atomic.AddInt64(&counter, 1) } }) }) } // Expected results on modern hardware: // BenchmarkMutexVsAtomic/Mutex-8 50000000 30.2 ns/op // BenchmarkMutexVsAtomic/Atomic-8 200000000 7.8 ns/op // Atomic operations are ~4x faster for simple operations ``` ### 2. Memory Usage Implications #### atomic.Value Memory Overhead ```go // Memory comparison for handler storage type MutexEventBus struct { handlers map[EventType][]Handler // 24 bytes (map header) mu sync.RWMutex // 24 bytes // Total: ~48 bytes + map data } type AtomicEventBus struct { handlers atomic.Value // 16 bytes (interface{}) updateMu sync.Mutex // 8 bytes // Total: ~24 bytes + map data } // atomic.Value saves ~24 bytes per EventBus instance // Trade-off: Creates new map copies on updates (higher temporary memory) ``` #### Copy-on-Write Memory Cost ```go func calculateCopyOnWriteCost(mapSize int, updateFreq float64) { // Memory cost per update = mapSize * pointerSize memoryPerUpdate := mapSize * 8 // 64-bit pointers // Updates per second * memory per update = memory churn rate memoryChurnRate := updateFreq * float64(memoryPerUpdate) fmt.Printf("Map size: %d entries\n", mapSize) fmt.Printf("Update frequency: %.2f/sec\n", updateFreq) fmt.Printf("Memory churn: %.2f bytes/sec\n", memoryChurnRate) // Rule of thumb: COW is efficient when updates < 10/sec for large maps } ``` ## Implementation Recommendations ### 1. High-Priority Lock-Free Conversions #### Process Status (High Impact, Low Risk) ```go // Priority: HIGH - Simple conversion, high-frequency access type Process struct { ID string Name string status int32 // Convert to atomic startTime int64 // Convert to atomic (Unix timestamp) exitCode int32 // Convert to atomic // Keep complex fields mutex-protected mu sync.RWMutex Cmd *exec.Cmd cancel context.CancelFunc } ``` #### Request Metrics (High Impact, Low Risk) ```go // Priority: HIGH - Continuous updates, simple values type Server struct { // Convert to atomic counters totalRequests int64 requestSize int64 responseSize int64 // Keep complex structures mutex-protected mu sync.RWMutex requests []Request urlMappings map[string]*URLMapping } ``` ### 2. Medium-Priority Conversions #### EventBus Handlers (Medium Impact, Medium Risk) ```go // Priority: MEDIUM - High read frequency, infrequent updates // Requires careful testing due to copy-on-write complexity type EventBus struct { handlers atomic.Value // map[EventType][]Handler updateMu sync.Mutex // Only for updates } ``` ### 3. Low-Priority Future Enhancements #### Process Registry (Medium Impact, Higher Risk) ```go // Priority: LOW - More complex, requires extensive testing // Consider after high-priority items are stable type Manager struct { processes atomic.Value // map[string]*Process updateMu sync.Mutex } ``` ## Testing Strategy for Lock-Free Code ### 1. Race Detection ```go func TestLockFreeOperations(t *testing.T) { // Enable race detector: go test -race var counter int64 const goroutines = 100 const increments = 1000 var wg sync.WaitGroup wg.Add(goroutines) for i := 0; i < goroutines; i++ { go func() { defer wg.Done() for j := 0; j < increments; j++ { atomic.AddInt64(&counter, 1) } }() } wg.Wait() expected := int64(goroutines * increments) if counter != expected { t.Errorf("Expected %d, got %d", expected, counter) } } ``` ### 2. Stress Testing ```go func TestAtomicValueStress(t *testing.T) { var data atomic.Value data.Store(make(map[string]int)) const readers = 50 const writers = 5 const duration = 5 * time.Second ctx, cancel := context.WithTimeout(context.Background(), duration) defer cancel() // Start readers for i := 0; i < readers; i++ { go func() { for ctx.Err() == nil { m := data.Load().(map[string]int) _ = m["key"] // Read operation } }() } // Start writers for i := 0; i < writers; i++ { go func(id int) { for ctx.Err() == nil { oldMap := data.Load().(map[string]int) newMap := make(map[string]int) for k, v := range oldMap { newMap[k] = v } newMap[fmt.Sprintf("key%d", id)] = id data.Store(newMap) time.Sleep(time.Millisecond) } }(i) } <-ctx.Done() // Test passes if no races detected } ``` ## Conclusion Lock-free programming offers significant performance benefits for specific use cases in Brummer: ### Recommended Implementation Order: 1. **Phase 1**: Convert simple atomic values (process status, counters) 2. **Phase 2**: Implement atomic.Value for read-heavy maps (EventBus handlers) 3. **Phase 3**: Consider lock-free data structures for high-throughput scenarios ### Expected Performance Gains: - **Process status checks**: 4-5x faster - **Request counters**: 3-4x faster - **EventBus handler lookup**: 2-3x faster (under contention) ### Risk Mitigation: - Comprehensive race detection testing - Gradual rollout with fallback mechanisms - Performance monitoring to validate improvements The key is to apply lock-free techniques judiciously, focusing on high-contention, simple-value scenarios where the benefits clearly outweigh the complexity costs.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server