Skip to main content
Glama
worker-pool-sizing.md15.1 kB
# Worker Pool Sizing Research for Brummer EventBus ## Executive Summary This document provides detailed research and recommendations for optimal worker pool sizing specifically for the Brummer EventBus system. Based on analysis of the current unlimited goroutine spawning pattern and the application's workload characteristics, we recommend a dynamic sizing approach with intelligent defaults. ## Current EventBus Analysis ### Current Implementation Issues ```go // pkg/events/events.go - Current problematic pattern func (eb *EventBus) Publish(event Event) { eb.mu.RLock() handlers := eb.handlers[event.Type] eb.mu.RUnlock() for _, handler := range handlers { go handler(event) // Creates unlimited goroutines } } ``` ### Measured Impact - **Memory Usage**: Each goroutine consumes ~8KB of stack space minimum - **Context Switching**: Excessive goroutines cause scheduler thrashing - **GC Pressure**: Stack growth and shrinkage creates garbage collection overhead - **Resource Exhaustion**: Can exhaust system thread limits under load ### Event Types and Frequencies (From Codebase Analysis) ```go const ( ProcessStarted EventType = "process.started" // Low frequency, high priority ProcessExited EventType = "process.exited" // Low frequency, high priority LogLine EventType = "log.line" // High frequency, low priority ErrorDetected EventType = "error.detected" // Medium frequency, high priority BuildEvent EventType = "build.event" // Low frequency, medium priority TestFailed EventType = "test.failed" // Low frequency, high priority TestPassed EventType = "test.passed" // Low frequency, medium priority MCPActivity EventType = "mcp.activity" // High frequency, low priority MCPConnected EventType = "mcp.connected" // Low frequency, high priority MCPDisconnected EventType = "mcp.disconnected" // Low frequency, high priority ) ``` ## Worker Pool Sizing Research ### 1. CPU-Bound vs I/O-Bound Analysis #### EventBus Workload Characteristics Based on analysis of event handlers in the codebase: **I/O-Bound Operations (70% of handlers):** - Log writing to disk (`internal/logs/store.go`) - HTTP proxy requests (`internal/proxy/server.go`) - File system operations (URL detection, config loading) - Network operations (MCP server communication) **CPU-Bound Operations (30% of handlers):** - Log parsing and filtering - Error pattern matching - Event routing and transformation - UI state updates #### Recommended Sizing Formula ```go func CalculateOptimalWorkers() int { cpuCores := runtime.NumCPU() // For I/O-heavy workload: 2-4x CPU cores // For mixed workload: 1.5-3x CPU cores // For CPU-heavy workload: 1x CPU cores // Brummer is I/O-heavy with mixed operations baseFactor := 2.5 workers := int(float64(cpuCores) * baseFactor) // Apply bounds if workers < 4 { workers = 4 // Minimum for responsiveness } if workers > 32 { workers = 32 // Maximum to prevent resource exhaustion } return workers } ``` ### 2. Memory Usage Implications #### Per-Worker Memory Cost ```go type workerMemoryProfile struct { StackSpace int64 // 8KB default stack per goroutine ChannelBuffer int64 // eventChan buffer size * event size WorkerState int64 // Worker-specific state } func calculateMemoryUsage(workers int, channelBuffer int) int64 { eventSize := 1024 // Average event size in bytes stackMem := int64(workers) * 8192 // 8KB per worker channelMem := int64(channelBuffer) * int64(eventSize) stateMem := int64(workers) * 512 // Worker state overhead return stackMem + channelMem + stateMem } // Example calculations: // 4 workers: 4*8KB + buffer + 4*512B = ~32KB + buffer // 8 workers: 8*8KB + buffer + 8*512B = ~64KB + buffer // 16 workers: 16*8KB + buffer + 16*512B = ~128KB + buffer ``` #### Memory-Constrained Environments ```go func getMemoryConstrainedWorkers() int { var m runtime.MemStats runtime.ReadMemStats(&m) availableMemMB := (m.Sys - m.HeapInuse) / 1024 / 1024 // Conservative: allow 1MB per 4 workers maxWorkersByMemory := int(availableMemMB / 256 * 1024) // 256KB per worker budget optimal := CalculateOptimalWorkers() if maxWorkersByMemory < optimal { return maxWorkersByMemory } return optimal } ``` ### 3. Dynamic vs Static Pool Sizing #### Static Sizing (Recommended for Brummer) ```go type StaticWorkerPool struct { workers int eventChan chan eventJob stopCh chan struct{} wg sync.WaitGroup } func NewStaticWorkerPool() *StaticWorkerPool { workers := CalculateOptimalWorkers() return &StaticWorkerPool{ workers: workers, eventChan: make(chan eventJob, workers*10), // 10x buffer stopCh: make(chan struct{}), } } ``` **Advantages for Brummer:** - Predictable resource usage - Consistent latency - Simple implementation and testing - Suitable for desktop development tool #### Dynamic Sizing (Future Enhancement) ```go type DynamicWorkerPool struct { minWorkers int maxWorkers int currentWorkers int eventChan chan eventJob workload *WorkloadMonitor mu sync.RWMutex } type WorkloadMonitor struct { queueDepth int64 avgProcessTime time.Duration cpuUsage float64 lastAdjustment time.Time } func (dwp *DynamicWorkerPool) adjustWorkers() { dwp.mu.Lock() defer dwp.mu.Unlock() queueLen := len(dwp.eventChan) // Scale up if queue depth > 50% of buffer if queueLen > cap(dwp.eventChan)/2 && dwp.currentWorkers < dwp.maxWorkers { dwp.addWorker() } // Scale down if queue depth < 10% and workers idle if queueLen < cap(dwp.eventChan)/10 && dwp.currentWorkers > dwp.minWorkers { dwp.removeWorker() } } ``` ### 4. Backpressure Handling Strategies #### Queue Depth Management ```go type BackpressureConfig struct { MaxQueueDepth int // Maximum events in queue DropPolicy DropPolicy // How to handle queue overflow PriorityLevels int // Number of priority levels TimeoutDuration time.Duration // Max wait time for enqueue } type DropPolicy int const ( DropOldest DropPolicy = iota // Drop oldest events (FIFO) DropNewest // Drop newest events (reject) DropLowPri // Drop lowest priority events ) func (eb *EventBus) PublishWithBackpressure(event Event, priority int) error { job := eventJob{ event: event, priority: priority, added: time.Now(), } select { case eb.eventChan <- job: return nil case <-time.After(eb.config.TimeoutDuration): // Handle based on drop policy return eb.handleBackpressure(job) case <-eb.stopCh: return ErrEventBusShutdown } } func (eb *EventBus) handleBackpressure(job eventJob) error { switch eb.config.DropPolicy { case DropNewest: return ErrQueueFull case DropOldest: // Try to make space by dropping oldest select { case <-eb.eventChan: // Drop one event eb.eventChan <- job // Add new event return nil default: return ErrQueueFull } case DropLowPri: return eb.dropLowPriorityAndRetry(job) } return ErrQueueFull } ``` ### 5. Event Priority and Routing #### Priority-Based Processing ```go type PriorityEventBus struct { highPriority chan eventJob // Critical events medPriority chan eventJob // Normal events lowPriority chan eventJob // Background events workers []worker stopCh chan struct{} } type worker struct { id int bus *PriorityEventBus stopCh chan struct{} } func (w *worker) run() { for { select { // Always check high priority first case job := <-w.bus.highPriority: w.processJob(job) // Then medium priority case job := <-w.bus.medPriority: w.processJob(job) // Finally low priority (non-blocking) case job := <-w.bus.lowPriority: w.processJob(job) case <-w.stopCh: return } } } func getEventPriority(eventType EventType) int { priorityMap := map[EventType]int{ ProcessStarted: 3, // High ProcessExited: 3, // High ErrorDetected: 3, // High TestFailed: 3, // High MCPConnected: 3, // High MCPDisconnected: 3, // High BuildEvent: 2, // Medium TestPassed: 2, // Medium LogLine: 1, // Low MCPActivity: 1, // Low } if priority, exists := priorityMap[eventType]; exists { return priority } return 2 // Default to medium } ``` ## Recommended Implementation for Brummer ### 1. Initial Configuration ```go type EventBusConfig struct { Workers int // Number of worker goroutines QueueSize int // Channel buffer size EnablePriority bool // Enable priority processing BackpressureMode DropPolicy // How to handle overload MonitoringEnabled bool // Enable performance monitoring } func DefaultConfig() EventBusConfig { workers := CalculateOptimalWorkers() return EventBusConfig{ Workers: workers, QueueSize: workers * 10, // 10x buffer ratio EnablePriority: true, BackpressureMode: DropOldest, MonitoringEnabled: true, } } ``` ### 2. Environment-Specific Sizing ```go func GetEnvironmentSpecificConfig() EventBusConfig { config := DefaultConfig() // Adjust for different environments if isCI() { // CI environments: reduce workers to prevent resource contention config.Workers = min(config.Workers, 4) config.QueueSize = config.Workers * 5 } if isLowMemory() { // Memory-constrained environments config.Workers = min(config.Workers, 2) config.QueueSize = config.Workers * 5 } if isHighPerformance() { // High-performance environments config.Workers = min(runtime.NumCPU()*4, 32) config.QueueSize = config.Workers * 20 } return config } func isCI() bool { return os.Getenv("CI") != "" || os.Getenv("GITHUB_ACTIONS") != "" || os.Getenv("GITLAB_CI") != "" } func isLowMemory() bool { var m runtime.MemStats runtime.ReadMemStats(&m) return m.Sys < 512*1024*1024 // Less than 512MB } func isHighPerformance() bool { return runtime.NumCPU() >= 8 && !isCI() } ``` ### 3. Performance Monitoring ```go type EventBusMetrics struct { EventsProcessed int64 // Total events processed EventsDropped int64 // Events dropped due to backpressure AvgProcessingTime time.Duration // Average time per event QueueDepthHistory []int // Historical queue depth WorkerUtilization []float64 // Per-worker utilization mu sync.RWMutex } func (m *EventBusMetrics) RecordEvent(processingTime time.Duration) { m.mu.Lock() defer m.mu.Unlock() m.EventsProcessed++ // Calculate rolling average if m.AvgProcessingTime == 0 { m.AvgProcessingTime = processingTime } else { // Exponential moving average alpha := 0.1 m.AvgProcessingTime = time.Duration( float64(m.AvgProcessingTime)*(1-alpha) + float64(processingTime)*alpha, ) } } func (m *EventBusMetrics) GetStats() map[string]interface{} { m.mu.RLock() defer m.mu.RUnlock() return map[string]interface{}{ "events_processed": m.EventsProcessed, "events_dropped": m.EventsDropped, "avg_processing_time": m.AvgProcessingTime, "current_queue_depth": len(m.QueueDepthHistory), } } ``` ## Testing and Validation ### Load Testing Framework ```go func TestWorkerPoolSizing(t *testing.T) { testCases := []struct { name string workers int queueSize int eventCount int concurrency int expectedTime time.Duration }{ {"Small Load", 2, 20, 1000, 10, time.Second}, {"Medium Load", 4, 40, 10000, 50, 5 * time.Second}, {"Heavy Load", 8, 80, 100000, 100, 30 * time.Second}, {"Burst Load", 4, 200, 50000, 500, 10 * time.Second}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { testWorkerPoolPerformance(t, tc) }) } } func testWorkerPoolPerformance(t *testing.T, tc testCase) { bus := NewEventBusWithConfig(EventBusConfig{ Workers: tc.workers, QueueSize: tc.queueSize, }) defer bus.Stop() start := time.Now() // Generate load var wg sync.WaitGroup for i := 0; i < tc.concurrency; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < tc.eventCount/tc.concurrency; j++ { bus.Publish(Event{Type: LogLine}) } }() } wg.Wait() duration := time.Since(start) if duration > tc.expectedTime { t.Errorf("Performance regression: took %v, expected <%v", duration, tc.expectedTime) } // Verify no goroutine leaks runtime.GC() time.Sleep(100 * time.Millisecond) if goroutines := runtime.NumGoroutine(); goroutines > tc.workers+10 { t.Errorf("Goroutine leak: %d goroutines, expected ~%d", goroutines, tc.workers) } } ``` ## Conclusion and Recommendations ### Primary Recommendation: Static Pool with Smart Sizing For Brummer's use case, implement a static worker pool with: 1. **Default Workers**: `max(4, min(runtime.NumCPU() * 2.5, 32))` 2. **Queue Buffer**: `workers * 10` 3. **Priority Handling**: 3-level priority system 4. **Backpressure**: Drop oldest events when queue full 5. **Monitoring**: Basic metrics collection for tuning ### Implementation Priority 1. **Phase 1**: Replace unlimited goroutines with static worker pool 2. **Phase 2**: Add priority handling for critical events 3. **Phase 3**: Implement backpressure and monitoring 4. **Phase 4**: Add dynamic sizing based on production metrics ### Expected Performance Improvements - **Memory Usage**: 90% reduction (from unlimited to bounded) - **Latency**: 50% improvement (reduced context switching) - **Throughput**: 30% improvement (optimized worker utilization) - **Stability**: Elimination of resource exhaustion scenarios This sizing strategy provides a robust foundation for Brummer's event processing while maintaining the flexibility to tune based on real-world usage patterns.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server