Skip to main content
Glama
synchronized-architecture.md19.1 kB
# Synchronized Architecture Design for Brummer ## Executive Summary This document defines the comprehensive synchronized architecture for Brummer that eliminates race conditions while maintaining high performance and backward compatibility. The design implements a layered approach with specific synchronization patterns for each component, prioritizing critical fixes while enabling incremental rollout. ## Architecture Overview ### Current State Analysis Based on code analysis, the following race conditions have been identified: 1. **EventBus**: Unlimited goroutine spawning in `Publish()` method 2. **TUI Model**: Value receivers causing concurrent access issues 3. **Process Manager**: Mixed RWMutex usage and concurrent map access 4. **Log Store**: Hybrid async/sync pattern with timeout fallbacks 5. **Proxy Server**: Multiple mutexes and concurrent request handling 6. **MCP Connection Manager**: Already well-designed with channels ### Design Principles 1. **Lock Hierarchy**: Clear ordering to prevent deadlocks 2. **Bounded Resources**: All goroutines and channels have limits 3. **Channel-First Design**: Prefer channels over mutexes where appropriate 4. **Copy-on-Write**: Use atomic.Value for read-heavy data structures 5. **Progressive Enhancement**: Enable incremental rollout with fallbacks ## Core Synchronization Patterns ### 1. EventBus Worker Pool Architecture **Current Issue**: Unlimited goroutine spawning ```go // Current problematic pattern for _, handler := range handlers { go handler(event) // Creates unlimited goroutines } ``` **New Architecture**: Bounded worker pool with priority queues ```go type EventBus struct { handlers atomic.Value // map[EventType][]Handler (copy-on-write) updateMu sync.Mutex // Only for handler registration // Worker pool workers int eventQueues [3]chan eventJob // Priority levels: High, Medium, Low workerPool *sync.WaitGroup stopCh chan struct{} // Configuration config EventBusConfig metrics *EventBusMetrics } type eventJob struct { Event events.Event Handlers []Handler Priority int Submitted time.Time } type EventBusConfig struct { Workers int // Default: CPU cores * 2.5 QueueSizes [3]int // [High, Medium, Low] queue sizes BackpressureMode BackpressurePolicy MonitoringEnabled bool } ``` **Worker Pool Implementation**: - **Worker Count**: `max(4, min(runtime.NumCPU() * 2.5, 32))` - **Queue Strategy**: Priority-based with separate channels - **Backpressure**: Configurable drop policies (oldest, newest, low-priority) - **Error Isolation**: Handler panics don't crash workers ### 2. TUI Model Synchronization Pattern **Current Issue**: Value receivers with concurrent access ```go func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) // Value receiver ``` **New Architecture**: Pointer receivers with RWMutex protection ```go type Model struct { mu sync.RWMutex // Protects all mutable fields // UI State (protected by mu) currentView View processes []*process.Process logs []logs.LogEntry selectedIndex int // Immutable or atomic fields eventBus *events.EventBus // Immutable after init processManager *process.Manager // Immutable after init // UI Components (have internal synchronization) viewport viewport.Model list list.Model textInput textinput.Model } // All methods converted to pointer receivers func (m *Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.mu.Lock() defer m.mu.Unlock() // Safe concurrent access to all fields return m.updateInternal(msg) } func (m *Model) View() string { m.mu.RLock() defer m.mu.RUnlock() // Safe read access to all fields return m.renderView() } ``` **Key Changes**: - All 60+ methods converted to pointer receivers - Single RWMutex protects all mutable state - Read-heavy operations use RLock - Update operations use Lock - Atomic operations for simple counters ### 3. Process Manager Thread-Safe Architecture **Current Issue**: Mixed mutex patterns and concurrent map access ```go type Manager struct { processes map[string]*Process mu sync.RWMutex // Inconsistent locking patterns } ``` **New Architecture**: Consistent synchronization with atomic status ```go type Manager struct { // Process registry (protected by processMu) processes map[string]*Process processMu sync.RWMutex // Callbacks (protected by callbackMu) logCallbacks []LogCallback callbackMu sync.RWMutex // Configuration (immutable after init) packageJSON *parser.PackageJSON workDir string eventBus *events.EventBus // Cleanup coordination shutdownOnce sync.Once shutdownCh chan struct{} } type Process struct { // Immutable fields ID string Name string Script string StartTime time.Time // Atomic fields (lock-free access) status int32 // ProcessStatus as int32 exitCode int32 // Exit code endTime int64 // Unix timestamp // Protected fields (use processMu from Manager) Cmd *exec.Cmd cancel context.CancelFunc } ``` **Synchronization Strategy**: - **Process Map**: RWMutex for process registry operations - **Process Status**: Atomic operations for status, exit code, end time - **Callbacks**: Separate RWMutex for callback management - **Lock Hierarchy**: Never hold multiple locks simultaneously ### 4. Log Store Single-Path Architecture **Current Issue**: Hybrid async/sync pattern with timeout fallbacks ```go // Problematic timeout-based fallback select { case s.addChan <- req: // Wait for result with timeout case <-time.After(100 * time.Millisecond): return s.addSync(...) // Race condition potential } ``` **New Architecture**: Single channel-based path with bounded buffers ```go type Store struct { // Single-path async architecture addCh chan addRequest searchCh chan searchRequest getCh chan getRequest // Worker state (owned by single goroutine) entries []LogEntry byProcess map[string][]int errors []LogEntry urls []URLEntry // Configuration config LogStoreConfig // Control stopCh chan struct{} workerWG sync.WaitGroup } type LogStoreConfig struct { MaxEntries int // Default: 10000 BatchSize int // Default: 100 BufferSize int // Default: 1000 WorkerTimeout time.Duration // Default: 50ms EnableBatching bool // Default: true } type addRequest struct { processID string processName string content string isError bool responseCh chan *LogEntry } ``` **Key Features**: - **Single Worker**: All operations serialized through one goroutine - **Bounded Buffers**: Request channels have size limits - **Batch Processing**: Multiple entries processed together - **No Timeouts**: Eliminates sync fallback path entirely ### 5. Proxy Server Unified Mutex Architecture **Current Issue**: Multiple mutexes with potential deadlock ```go type Server struct { mu sync.RWMutex // For requests wsMutex sync.RWMutex // For WebSocket clients // Different locks for different data } ``` **New Architecture**: Single mutex with clear sections ```go type Server struct { // Single mutex for all shared state mu sync.RWMutex // State protected by mu requests []Request urlMap map[string]string urlMappings map[string]*URLMapping wsClients map[*websocket.Conn]bool // Atomic fields (lock-free) totalRequests int64 successRequests int64 failedRequests int64 bytesTransferred int64 // Immutable after startup port int mode ProxyMode eventBus *events.EventBus telemetry *TelemetryStore } ``` **Atomic Metrics Pattern**: ```go func (s *Server) recordRequest(req Request) { // Atomic counters (lock-free) atomic.AddInt64(&s.totalRequests, 1) if req.IsError { atomic.AddInt64(&s.failedRequests, 1) } else { atomic.AddInt64(&s.successRequests, 1) } atomic.AddInt64(&s.bytesTransferred, req.Size) // Structured data (requires lock) s.mu.Lock() s.requests = append(s.requests, req) if len(s.requests) > 1000 { s.requests = s.requests[1:] } s.mu.Unlock() } ``` ## Resource Management Architecture ### 1. Goroutine Pool Management **Global Goroutine Budget**: ```go type ResourceManager struct { maxGoroutines int32 // CPU cores * 8 activeGoRoutines int32 // Per-component allocations eventBusWorkers int processWorkers int logWorkers int proxyWorkers int } func (rm *ResourceManager) AllocateGoroutine(component string) bool { current := atomic.LoadInt32(&rm.activeGoRoutines) if current >= rm.maxGoroutines { return false } return atomic.CompareAndSwapInt32(&rm.activeGoRoutines, current, current+1) } ``` ### 2. Memory Management **Bounded Collections**: ```go type BoundedConfig struct { MaxLogEntries int // 10,000 MaxProcessHistory int // 1,000 MaxRequestHistory int // 1,000 MaxEventQueue int // 5,000 MaxErrorEntries int // 500 } ``` **Memory Monitoring**: ```go type MemoryMonitor struct { thresholds struct { warning uint64 // 80% of available critical uint64 // 95% of available } checkInterval time.Duration stopCh chan struct{} } ``` ### 3. Channel Management **Channel Configuration**: ```go type ChannelConfig struct { EventBusQueues [3]int // [100, 500, 1000] High/Med/Low priority LogStore int // 1000 ProcessMgr int // 100 ProxyRequests int // 500 } ``` ## Configuration Schema Integration ### Concurrency Configuration ```go type ConcurrencyConfig struct { // EventBus configuration EventBus struct { Workers int `toml:"worker_pool_size"` QueueSizes [3]int `toml:"queue_sizes"` BackpressureMode string `toml:"backpressure_strategy"` MonitoringEnabled bool `toml:"enable_monitoring"` } `toml:"eventbus"` // Process Manager configuration ProcessManager struct { MaxConcurrentStarts int `toml:"max_concurrent_starts"` StatusReadStrategy string `toml:"status_read_strategy"` WorkerTimeout string `toml:"worker_timeout"` } `toml:"process_manager"` // Log Store configuration LogStore struct { BufferSize int `toml:"buffer_size"` BatchSize int `toml:"batch_size"` WorkerCount int `toml:"worker_count"` TimeoutMs int `toml:"timeout_ms"` } `toml:"log_store"` // Proxy Server configuration ProxyServer struct { RequestBufferSize int `toml:"request_buffer_size"` ConnectionTimeoutMs int `toml:"connection_timeout_ms"` MaxWSClients int `toml:"max_ws_clients"` } `toml:"proxy_server"` // MCP Manager configuration (already good) MCPManager struct { HealthCheckIntervalMs int `toml:"health_check_interval_ms"` SessionTimeoutMs int `toml:"session_timeout_ms"` MaxRetryAttempts int `toml:"max_retry_attempts"` } `toml:"mcp_manager"` // Global resource limits Resources struct { MaxGoroutines int `toml:"max_goroutines"` MemoryLimitMB int `toml:"memory_limit_mb"` MonitoringInterval int `toml:"monitoring_interval_ms"` } `toml:"resources"` } ``` ### Default Configuration ```go func DefaultConcurrencyConfig() ConcurrencyConfig { cpuCores := runtime.NumCPU() return ConcurrencyConfig{ EventBus: struct { Workers int `toml:"worker_pool_size"` QueueSizes [3]int `toml:"queue_sizes"` BackpressureMode string `toml:"backpressure_strategy"` MonitoringEnabled bool `toml:"enable_monitoring"` }{ Workers: max(4, min(cpuCores*2.5, 32)), QueueSizes: [3]int{100, 500, 1000}, BackpressureMode: "drop_oldest", MonitoringEnabled: true, }, ProcessManager: struct { MaxConcurrentStarts int `toml:"max_concurrent_starts"` StatusReadStrategy string `toml:"status_read_strategy"` WorkerTimeout string `toml:"worker_timeout"` }{ MaxConcurrentStarts: 10, StatusReadStrategy: "atomic", WorkerTimeout: "5s", }, LogStore: struct { BufferSize int `toml:"buffer_size"` BatchSize int `toml:"batch_size"` WorkerCount int `toml:"worker_count"` TimeoutMs int `toml:"timeout_ms"` }{ BufferSize: 10000, BatchSize: 100, WorkerCount: 1, TimeoutMs: 50, }, ProxyServer: struct { RequestBufferSize int `toml:"request_buffer_size"` ConnectionTimeoutMs int `toml:"connection_timeout_ms"` MaxWSClients int `toml:"max_ws_clients"` }{ RequestBufferSize: 1000, ConnectionTimeoutMs: 5000, MaxWSClients: 50, }, MCPManager: struct { HealthCheckIntervalMs int `toml:"health_check_interval_ms"` SessionTimeoutMs int `toml:"session_timeout_ms"` MaxRetryAttempts int `toml:"max_retry_attempts"` }{ HealthCheckIntervalMs: 30000, SessionTimeoutMs: 300000, MaxRetryAttempts: 3, }, Resources: struct { MaxGoroutines int `toml:"max_goroutines"` MemoryLimitMB int `toml:"memory_limit_mb"` MonitoringInterval int `toml:"monitoring_interval_ms"` }{ MaxGoroutines: cpuCores * 8, MemoryLimitMB: 512, MonitoringInterval: 10000, }, } } ``` ## Error Handling and Recovery ### 1. Panic Recovery ```go type PanicRecovery struct { component string eventBus *events.EventBus } func (pr *PanicRecovery) WrapHandler(handler func()) func() { return func() { defer func() { if r := recover(); r != nil { pr.eventBus.Publish(events.Event{ Type: events.ErrorDetected, Data: map[string]interface{}{ "component": pr.component, "panic": r, "stack": debug.Stack(), }, }) } }() handler() } } ``` ### 2. Graceful Degradation ```go type GracefulDegradation struct { components map[string]ComponentHealth mu sync.RWMutex } type ComponentHealth struct { Status HealthStatus LastError error ErrorCount int LastHealthTime time.Time } type HealthStatus int const ( Healthy HealthStatus = iota Degraded Failing Failed ) ``` ## Performance Monitoring ### 1. Metrics Collection ```go type SynchronizationMetrics struct { // Lock contention metrics LockContentionCount atomic.Int64 LockWaitTime atomic.Int64 // Channel metrics ChannelBlockedSends atomic.Int64 ChannelBufferUsage map[string]atomic.Int64 // Goroutine metrics ActiveGoroutines atomic.Int64 GoroutineLeaks atomic.Int64 // Performance metrics EventProcessingTime atomic.Int64 LogProcessingRate atomic.Int64 ProxyThroughput atomic.Int64 } ``` ### 2. Performance Thresholds ```go type PerformanceThresholds struct { MaxEventProcessingTime time.Duration // 100ms MaxLockWaitTime time.Duration // 10ms MaxChannelBlockTime time.Duration // 50ms MaxGoroutineCount int // CPU cores * 8 MaxMemoryUsageMB int // 512MB } ``` ## Implementation Strategy ### Phase 1: Critical Components (Week 1) 1. **EventBus Worker Pool**: Replace unlimited goroutines 2. **TUI Model Pointers**: Convert to pointer receivers 3. **Process Manager Status**: Implement atomic status operations ### Phase 2: Data Consistency (Week 2) 1. **Log Store Refactor**: Single-channel architecture 2. **Proxy Server Cleanup**: Unified mutex pattern 3. **Configuration Integration**: Concurrency settings ### Phase 3: Performance & Monitoring (Week 3) 1. **Resource Management**: Goroutine and memory limits 2. **Metrics Collection**: Performance monitoring 3. **Graceful Degradation**: Error recovery patterns ### Rollback Strategy Each component maintains backward compatibility: ```go type ComponentConfig struct { EnableNewSync bool `toml:"enable_new_sync"` FallbackMode bool `toml:"fallback_mode"` } ``` ## Testing Strategy ### 1. Race Detection ```bash # Continuous race testing go test -race -timeout 30s ./internal/... go test -race -run TestConcurrency ./... ``` ### 2. Load Testing ```go func TestConcurrentLoad(t *testing.T) { const ( goroutines = 100 operations = 1000 duration = 30 * time.Second ) // Test each component under load // Verify no deadlocks or race conditions // Check performance stays within thresholds } ``` ### 3. Stress Testing ```go func TestResourceExhaustion(t *testing.T) { // Test behavior when limits are reached // Verify graceful degradation // Check recovery after resource release } ``` ## Compatibility Guarantees ### API Compatibility - All public interfaces maintain existing signatures - New configuration options are optional with sensible defaults - Existing client code works without modification ### Performance Compatibility - No more than 10% performance degradation on existing workloads - Better performance under high concurrency - Reduced memory usage through bounded collections ### Behavioral Compatibility - Event ordering preserved where semantically required - Error handling maintains existing behavior - Log output format unchanged ## Conclusion This synchronized architecture eliminates race conditions through: 1. **Bounded Resources**: No unlimited goroutines or unbounded queues 2. **Clear Synchronization**: Well-defined lock hierarchies and ownership 3. **Lock-Free Optimizations**: Atomic operations where appropriate 4. **Progressive Rollout**: Safe incremental implementation 5. **Comprehensive Monitoring**: Performance and health tracking The design prioritizes the critical race conditions (EventBus, TUI Model) while providing a coherent overall architecture that can be implemented incrementally with full rollback capability.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server