Skip to main content
Glama
race_test.go11.8 kB
package integration import ( "context" "fmt" "runtime" "sync" "sync/atomic" "testing" "time" "github.com/standardbeagle/brummer/internal/logs" "github.com/standardbeagle/brummer/internal/mcp" "github.com/standardbeagle/brummer/internal/process" "github.com/standardbeagle/brummer/internal/proxy" "github.com/standardbeagle/brummer/pkg/events" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // TestSystemWideRaceConditions validates that all race conditions have been eliminated func TestSystemWideRaceConditions(t *testing.T) { t.Run("EventBusWorkerPoolStress", func(t *testing.T) { eb := events.NewEventBus() defer eb.Shutdown() const numPublishers = 50 const eventsPerPublisher = 200 var processedCount int64 var handlerExecutions int64 // Subscribe multiple handlers eb.Subscribe(events.LogLine, func(event events.Event) { atomic.AddInt64(&handlerExecutions, 1) time.Sleep(time.Microsecond) // Simulate work }) eb.Subscribe(events.ProcessStarted, func(event events.Event) { atomic.AddInt64(&handlerExecutions, 1) time.Sleep(time.Microsecond) }) // Flood with events from multiple goroutines var wg sync.WaitGroup for i := 0; i < numPublishers; i++ { wg.Add(1) go func(publisherID int) { defer wg.Done() for j := 0; j < eventsPerPublisher; j++ { eventType := events.LogLine if j%2 == 0 { eventType = events.ProcessStarted } eb.Publish(events.Event{ Type: eventType, ProcessID: fmt.Sprintf("stress-test-%d", publisherID), Data: map[string]interface{}{ "publisher": publisherID, "sequence": j, }, }) atomic.AddInt64(&processedCount, 1) } }(i) } wg.Wait() time.Sleep(100 * time.Millisecond) // Allow processing to complete finalProcessed := atomic.LoadInt64(&processedCount) finalExecutions := atomic.LoadInt64(&handlerExecutions) assert.Equal(t, int64(numPublishers*eventsPerPublisher), finalProcessed) assert.Equal(t, finalProcessed, finalExecutions) // Each event hits one handler t.Logf("EventBus processed %d events with %d handler executions", finalProcessed, finalExecutions) }) t.Run("ProcessManagerConcurrentOperations", func(t *testing.T) { eb := events.NewEventBus() defer eb.Shutdown() mgr, err := process.NewManager("/tmp", eb, false) require.NoError(t, err) defer mgr.Cleanup() const numWorkers = 10 const operationsPerWorker = 20 var processesStarted int64 var processesQueried int64 var errors int64 var wg sync.WaitGroup for i := 0; i < numWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for j := 0; j < operationsPerWorker; j++ { // Start a process proc, err := mgr.StartCommand( fmt.Sprintf("worker-%d-op-%d", workerID, j), "echo", []string{fmt.Sprintf("hello from worker %d operation %d", workerID, j)}, ) if err != nil { atomic.AddInt64(&errors, 1) continue } atomic.AddInt64(&processesStarted, 1) // Query the process status (tests thread-safe getters) status := proc.GetStatus() if status != process.StatusPending && status != process.StatusRunning { t.Logf("Unexpected status: %v", status) } // Query from manager (tests concurrent map access) _, exists := mgr.GetProcess(proc.ID) if exists { atomic.AddInt64(&processesQueried, 1) } // Small delay to increase concurrency pressure time.Sleep(time.Millisecond) } }(i) } wg.Wait() time.Sleep(500 * time.Millisecond) // Allow processes to complete finalStarted := atomic.LoadInt64(&processesStarted) finalQueried := atomic.LoadInt64(&processesQueried) finalErrors := atomic.LoadInt64(&errors) assert.Greater(t, finalStarted, int64(0), "Should have started some processes") assert.Equal(t, finalStarted, finalQueried, "All started processes should be queryable") assert.Equal(t, int64(0), finalErrors, "Should have no errors") t.Logf("Started %d processes, queried %d, errors: %d", finalStarted, finalQueried, finalErrors) }) t.Run("LogStoreConcurrentWrites", func(t *testing.T) { eb := events.NewEventBus() defer eb.Shutdown() store := logs.NewStore(10000, eb) defer store.Close() const numWriters = 25 const logsPerWriter = 100 var logsWritten int64 var wg sync.WaitGroup for i := 0; i < numWriters; i++ { wg.Add(1) go func(writerID int) { defer wg.Done() for j := 0; j < logsPerWriter; j++ { entry := store.Add( fmt.Sprintf("process-%d", writerID), fmt.Sprintf("writer-%d", writerID), fmt.Sprintf("Log message %d from writer %d", j, writerID), j%5 == 0, // Every 5th message is an error ) if entry != nil { atomic.AddInt64(&logsWritten, 1) } } }(i) } wg.Wait() time.Sleep(100 * time.Millisecond) // Allow async processing finalWritten := atomic.LoadInt64(&logsWritten) storedLogs := store.GetAll() // Due to fire-and-forget async pattern, some logs might be dropped under pressure assert.Greater(t, finalWritten, int64(0), "Should have written some logs") assert.LessOrEqual(t, len(storedLogs), int(finalWritten), "Stored logs should not exceed written") t.Logf("Wrote %d logs, stored %d", finalWritten, len(storedLogs)) }) t.Run("ProxyServerConcurrentRequests", func(t *testing.T) { eb := events.NewEventBus() defer eb.Shutdown() server := proxy.NewServerWithMode(0, proxy.ProxyModeReverse, eb) // Don't start the server to avoid hanging - just test the concurrent map operations const numRequesters = 10 const requestsPerRequester = 5 var requestsProcessed int64 var mappingsCreated int64 var wg sync.WaitGroup for i := 0; i < numRequesters; i++ { wg.Add(1) go func(requesterID int) { defer wg.Done() for j := 0; j < requestsPerRequester; j++ { // Register URL mappings (tests concurrent map operations) proxyURL := server.RegisterURL( fmt.Sprintf("http://localhost:%d", 3000+requesterID), fmt.Sprintf("requester-%d", requesterID), ) if proxyURL != "" { atomic.AddInt64(&mappingsCreated, 1) } // Get proxy requests (tests concurrent slice access) requests := server.GetRequests() atomic.AddInt64(&requestsProcessed, int64(len(requests))) time.Sleep(time.Millisecond) } }(i) } wg.Wait() finalMappings := atomic.LoadInt64(&mappingsCreated) finalRequests := atomic.LoadInt64(&requestsProcessed) assert.Greater(t, finalMappings, int64(0), "Should have created some mappings") t.Logf("Created %d mappings, processed %d requests", finalMappings, finalRequests) }) t.Run("MCPConnectionManagerConcurrentSessions", func(t *testing.T) { cm := mcp.NewConnectionManager() defer cm.Stop() const numSessions = 20 const operationsPerSession = 10 var connectSuccesses int64 var disconnectSuccesses int64 var errors int64 var wg sync.WaitGroup for i := 0; i < numSessions; i++ { wg.Add(1) go func(sessionID int) { defer wg.Done() sessionName := fmt.Sprintf("session-%d", sessionID) instanceID := fmt.Sprintf("instance-%d", sessionID%5) // 5 instances for j := 0; j < operationsPerSession; j++ { // Connect session err := cm.ConnectSession(sessionName, instanceID) if err == nil { atomic.AddInt64(&connectSuccesses, 1) } else { atomic.AddInt64(&errors, 1) } // Disconnect session err = cm.DisconnectSession(sessionName) if err == nil { atomic.AddInt64(&disconnectSuccesses, 1) } else { atomic.AddInt64(&errors, 1) } time.Sleep(time.Millisecond) } }(i) } wg.Wait() finalConnects := atomic.LoadInt64(&connectSuccesses) finalDisconnects := atomic.LoadInt64(&disconnectSuccesses) finalErrors := atomic.LoadInt64(&errors) // Note: Connections will fail because instances aren't registered, // but the important thing is no race conditions t.Logf("Connects: %d, Disconnects: %d, Errors: %d", finalConnects, finalDisconnects, finalErrors) }) } // TestGoroutineLeakPrevention ensures proper cleanup of goroutines func TestGoroutineLeakPrevention(t *testing.T) { initialGoroutines := runtime.NumGoroutine() t.Run("EventBusCleanup", func(t *testing.T) { for i := 0; i < 10; i++ { eb := events.NewEventBus() // Use the EventBus eb.Subscribe(events.LogLine, func(event events.Event) { // No-op handler }) for j := 0; j < 100; j++ { eb.Publish(events.Event{ Type: events.LogLine, ProcessID: "test", }) } // Proper shutdown eb.Shutdown() } }) t.Run("LogStoreCleanup", func(t *testing.T) { eb := events.NewEventBus() defer eb.Shutdown() for i := 0; i < 10; i++ { store := logs.NewStore(100, eb) // Use the store for j := 0; j < 50; j++ { store.Add("test", "test", fmt.Sprintf("message %d", j), false) } // Proper shutdown store.Close() } }) // Force garbage collection and wait for cleanup runtime.GC() time.Sleep(100 * time.Millisecond) finalGoroutines := runtime.NumGoroutine() // Allow for some variance in goroutine count assert.LessOrEqual(t, finalGoroutines, initialGoroutines+5, "Should not leak significant goroutines") t.Logf("Goroutines: %d -> %d", initialGoroutines, finalGoroutines) } // TestSystemStressWithRaceDetection validates system behavior under extreme load func TestSystemStressWithRaceDetection(t *testing.T) { if testing.Short() { t.Skip("Skipping stress test in short mode") } eb := events.NewEventBus() defer eb.Shutdown() store := logs.NewStore(50000, eb) defer store.Close() mgr, err := process.NewManager("/tmp", eb, false) require.NoError(t, err) defer mgr.Cleanup() // Note: ProxyServer stress testing would require server start which can hang in tests // The concurrent map operations are already tested in the ProxyServerConcurrentRequests subtest cm := mcp.NewConnectionManager() defer cm.Stop() // Stress test all components simultaneously const duration = 5 * time.Second const numWorkers = 20 ctx, cancel := context.WithTimeout(context.Background(), duration) defer cancel() var operations int64 var errors int64 var wg sync.WaitGroup // Event flood for i := 0; i < numWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for { select { case <-ctx.Done(): return default: eb.Publish(events.Event{ Type: events.LogLine, ProcessID: fmt.Sprintf("stress-%d", workerID), }) atomic.AddInt64(&operations, 1) } } }(i) } // Log flood for i := 0; i < numWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for { select { case <-ctx.Done(): return default: store.Add( fmt.Sprintf("stress-%d", workerID), "stress-test", fmt.Sprintf("Message from worker %d", workerID), false, ) atomic.AddInt64(&operations, 1) } } }(i) } // MCP session operations for i := 0; i < numWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() sessionID := fmt.Sprintf("stress-session-%d", workerID) for { select { case <-ctx.Done(): return default: // These will fail but test concurrent channel operations cm.ConnectSession(sessionID, "nonexistent") cm.DisconnectSession(sessionID) atomic.AddInt64(&operations, 1) } } }(i) } wg.Wait() finalOps := atomic.LoadInt64(&operations) finalErrors := atomic.LoadInt64(&errors) assert.Greater(t, finalOps, int64(1000), "Should have processed many operations") t.Logf("Stress test completed: %d operations, %d errors in %v", finalOps, finalErrors, duration) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server