Skip to main content
Glama
events_test.go9.81 kB
package events import ( "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // TestEventBusCreation tests creating a new event bus func TestEventBusCreation(t *testing.T) { bus := NewEventBus() require.NotNil(t, bus) assert.NotNil(t, bus.handlers) } // TestEventSubscription tests subscribing to events func TestEventSubscription(t *testing.T) { bus := NewEventBus() var receivedEvents []Event var mu sync.Mutex handler := func(event Event) { mu.Lock() receivedEvents = append(receivedEvents, event) mu.Unlock() } // Subscribe to ProcessStarted events bus.Subscribe(ProcessStarted, handler) // Publish an event testEvent := Event{ Type: ProcessStarted, ProcessID: "test-process", Data: map[string]interface{}{ "command": "echo hello", "pid": 12345, }, } bus.Publish(testEvent) // Wait for async handler execution time.Sleep(10 * time.Millisecond) // Verify event was received mu.Lock() defer mu.Unlock() require.Len(t, receivedEvents, 1) assert.Equal(t, ProcessStarted, receivedEvents[0].Type) assert.Equal(t, "test-process", receivedEvents[0].ProcessID) assert.Equal(t, "echo hello", receivedEvents[0].Data["command"]) assert.Equal(t, 12345, receivedEvents[0].Data["pid"]) assert.NotEmpty(t, receivedEvents[0].ID) // ID should be auto-generated assert.False(t, receivedEvents[0].Timestamp.IsZero()) // Timestamp should be set } // TestMultipleSubscribers tests multiple handlers for the same event type func TestMultipleSubscribers(t *testing.T) { bus := NewEventBus() var handler1Events []Event var handler2Events []Event var mu1, mu2 sync.Mutex handler1 := func(event Event) { mu1.Lock() handler1Events = append(handler1Events, event) mu1.Unlock() } handler2 := func(event Event) { mu2.Lock() handler2Events = append(handler2Events, event) mu2.Unlock() } // Both handlers subscribe to the same event type bus.Subscribe(LogLine, handler1) bus.Subscribe(LogLine, handler2) // Publish an event testEvent := Event{ Type: LogLine, ProcessID: "test-process", Data: map[string]interface{}{ "line": "Test log line", "isError": false, }, } bus.Publish(testEvent) // Wait for async handler execution time.Sleep(10 * time.Millisecond) // Both handlers should have received the event mu1.Lock() defer mu1.Unlock() mu2.Lock() defer mu2.Unlock() require.Len(t, handler1Events, 1) require.Len(t, handler2Events, 1) assert.Equal(t, LogLine, handler1Events[0].Type) assert.Equal(t, LogLine, handler2Events[0].Type) assert.Equal(t, "Test log line", handler1Events[0].Data["line"]) assert.Equal(t, "Test log line", handler2Events[0].Data["line"]) } // TestMultipleEventTypes tests subscribing to different event types func TestMultipleEventTypes(t *testing.T) { bus := NewEventBus() var processEvents []Event var logEvents []Event var errorEvents []Event var muProcess, muLog, muError sync.Mutex bus.Subscribe(ProcessStarted, func(event Event) { muProcess.Lock() processEvents = append(processEvents, event) muProcess.Unlock() }) bus.Subscribe(LogLine, func(event Event) { muLog.Lock() logEvents = append(logEvents, event) muLog.Unlock() }) bus.Subscribe(ErrorDetected, func(event Event) { muError.Lock() errorEvents = append(errorEvents, event) muError.Unlock() }) // Publish different types of events bus.Publish(Event{Type: ProcessStarted, ProcessID: "proc1", Data: map[string]interface{}{"command": "echo"}}) bus.Publish(Event{Type: LogLine, ProcessID: "proc1", Data: map[string]interface{}{"line": "output"}}) bus.Publish(Event{Type: ErrorDetected, ProcessID: "proc1", Data: map[string]interface{}{"error": "test error"}}) bus.Publish(Event{Type: LogLine, ProcessID: "proc1", Data: map[string]interface{}{"line": "more output"}}) // Wait for async handler execution time.Sleep(10 * time.Millisecond) // Verify each handler only received its event type muProcess.Lock() defer muProcess.Unlock() muLog.Lock() defer muLog.Unlock() muError.Lock() defer muError.Unlock() assert.Len(t, processEvents, 1) assert.Len(t, logEvents, 2) assert.Len(t, errorEvents, 1) assert.Equal(t, ProcessStarted, processEvents[0].Type) assert.Equal(t, LogLine, logEvents[0].Type) assert.Equal(t, LogLine, logEvents[1].Type) assert.Equal(t, ErrorDetected, errorEvents[0].Type) } // TestEventMetadata tests automatic ID and timestamp generation func TestEventMetadata(t *testing.T) { bus := NewEventBus() var receivedEvent Event var received bool var mu sync.Mutex bus.Subscribe(BuildEvent, func(event Event) { mu.Lock() receivedEvent = event received = true mu.Unlock() }) // Publish event without ID or timestamp originalEvent := Event{ Type: BuildEvent, ProcessID: "build-process", Data: map[string]interface{}{"buildID": 123}, } publishTime := time.Now() bus.Publish(originalEvent) // Wait for async handler execution time.Sleep(10 * time.Millisecond) mu.Lock() defer mu.Unlock() require.True(t, received) // Verify metadata was automatically added assert.NotEmpty(t, receivedEvent.ID) assert.False(t, receivedEvent.Timestamp.IsZero()) assert.True(t, receivedEvent.Timestamp.After(publishTime.Add(-1*time.Second))) assert.True(t, receivedEvent.Timestamp.Before(publishTime.Add(1*time.Second))) // Original data should be preserved assert.Equal(t, BuildEvent, receivedEvent.Type) assert.Equal(t, "build-process", receivedEvent.ProcessID) assert.Equal(t, 123, receivedEvent.Data["buildID"]) } // TestConcurrentPublishing tests thread safety with concurrent publishing func TestConcurrentPublishing(t *testing.T) { bus := NewEventBus() var receivedEvents []Event var mu sync.Mutex bus.Subscribe(TestPassed, func(event Event) { mu.Lock() receivedEvents = append(receivedEvents, event) mu.Unlock() }) // Publish events concurrently from multiple goroutines var wg sync.WaitGroup numPublishers := 10 eventsPerPublisher := 5 for i := 0; i < numPublishers; i++ { wg.Add(1) go func(publisherID int) { defer wg.Done() for j := 0; j < eventsPerPublisher; j++ { bus.Publish(Event{ Type: TestPassed, ProcessID: "test-process", Data: map[string]interface{}{ "publisherID": publisherID, "eventID": j, }, }) } }(i) } wg.Wait() // Wait for all async handlers to complete time.Sleep(50 * time.Millisecond) mu.Lock() defer mu.Unlock() // Should have received all published events expectedCount := numPublishers * eventsPerPublisher assert.Len(t, receivedEvents, expectedCount) // Verify all events have unique IDs idSet := make(map[string]bool) for _, event := range receivedEvents { assert.False(t, idSet[event.ID], "Duplicate event ID found: %s", event.ID) idSet[event.ID] = true assert.Equal(t, TestPassed, event.Type) } } // TestConcurrentSubscription tests thread safety with concurrent subscription func TestConcurrentSubscription(t *testing.T) { bus := NewEventBus() var totalReceived int64 var mu sync.Mutex // Add subscribers concurrently var wg sync.WaitGroup numSubscribers := 5 for i := 0; i < numSubscribers; i++ { wg.Add(1) go func(subscriberID int) { defer wg.Done() bus.Subscribe(TestFailed, func(event Event) { mu.Lock() totalReceived++ mu.Unlock() }) }(i) } wg.Wait() // Publish a single event bus.Publish(Event{ Type: TestFailed, ProcessID: "test-process", Data: map[string]interface{}{"test": "concurrent subscription"}, }) // Wait for all handlers time.Sleep(20 * time.Millisecond) mu.Lock() defer mu.Unlock() // All subscribers should have received the event assert.Equal(t, int64(numSubscribers), totalReceived) } // TestEventTypeConstants tests all defined event type constants func TestEventTypeConstants(t *testing.T) { eventTypes := []EventType{ ProcessStarted, ProcessExited, LogLine, ErrorDetected, BuildEvent, TestFailed, TestPassed, MCPActivity, MCPConnected, MCPDisconnected, } bus := NewEventBus() var receivedTypes []EventType var mu sync.Mutex // Subscribe to all event types for _, eventType := range eventTypes { bus.Subscribe(eventType, func(event Event) { mu.Lock() receivedTypes = append(receivedTypes, event.Type) mu.Unlock() }) } // Publish events of all types for i, eventType := range eventTypes { bus.Publish(Event{ Type: eventType, ProcessID: "test-process", Data: map[string]interface{}{"index": i}, }) } // Wait for all handlers time.Sleep(20 * time.Millisecond) mu.Lock() defer mu.Unlock() // Should have received all event types assert.Len(t, receivedTypes, len(eventTypes)) // Verify all types were received receivedSet := make(map[EventType]bool) for _, eventType := range receivedTypes { receivedSet[eventType] = true } for _, expectedType := range eventTypes { assert.True(t, receivedSet[expectedType], "Event type %s was not received", expectedType) } } // TestEmptyEventHandling tests handling of events with minimal data func TestEmptyEventHandling(t *testing.T) { bus := NewEventBus() var receivedEvent Event var received bool var mu sync.Mutex bus.Subscribe(MCPActivity, func(event Event) { mu.Lock() receivedEvent = event received = true mu.Unlock() }) // Publish event with minimal data bus.Publish(Event{ Type: MCPActivity, // ProcessID is empty // Data is nil }) time.Sleep(10 * time.Millisecond) mu.Lock() defer mu.Unlock() require.True(t, received) assert.Equal(t, MCPActivity, receivedEvent.Type) assert.Empty(t, receivedEvent.ProcessID) assert.Nil(t, receivedEvent.Data) assert.NotEmpty(t, receivedEvent.ID) assert.False(t, receivedEvent.Timestamp.IsZero()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server