Skip to main content
Glama
06-event-system.md23.1 kB
# Task: Event System Extensions for AI Coders **Generated from Master Planning**: 2025-01-28 **Context Package**: `/requests/agentic-ai-coders/context/` **Next Phase**: [subtasks-execute.md](../subtasks-execute.md) ## Task Sizing Assessment **File Count**: 3 files - Within target range (3-7 files) **Estimated Time**: 18 minutes - Within target (15-30min) **Token Estimate**: 75k tokens - Within target (<150k) **Complexity Level**: 2 (Moderate) - Event system extension with async patterns **Parallelization Benefit**: MEDIUM - Requires core service completion **Atomicity Assessment**: ✅ ATOMIC - Complete event system extension **Boundary Analysis**: ✅ CLEAR - Extends existing event infrastructure cleanly ## Persona Assignment **Persona**: Software Engineer (Event Systems/Async) **Expertise Required**: Event-driven architecture, Go channels, async patterns **Worktree**: `~/work/worktrees/agentic-ai-coders/06-event-system/` ## Context Summary **Risk Level**: MEDIUM (async complexity, event coordination) **Integration Points**: Event bus, AI coder service, TUI, MCP tools **Architecture Pattern**: Event Extension Pattern (from existing event system) **Similar Reference**: `pkg/events/events.go` - Event bus and handler patterns ### Codebase Context (from master analysis) **Files in Scope**: ```yaml read_files: [pkg/events/events.go, internal/logs/store.go] modify_files: [pkg/events/events.go] create_files: [ /pkg/events/ai_coder_events.go, /pkg/events/ai_coder_handlers.go ] # Total: 3 files (1 modify, 2 create) - focused event extension ``` **Existing Patterns to Follow**: - `pkg/events/events.go` - Event bus architecture and registration patterns - Event handler signature: `func(data interface{})` - Async event processing with worker pools **Dependencies Context**: - Integration with Task 01 (Core Service) - AI coder event emission - Extension of existing event bus infrastructure - Event coordination for all AI coder operations ### Task Scope Boundaries **MODIFY Zone** (Direct Changes): ```yaml primary_files: - /pkg/events/events.go # Add AI coder event types - /pkg/events/ai_coder_events.go # AI coder event definitions - /pkg/events/ai_coder_handlers.go # Specialized event handlers direct_dependencies: - /internal/aicoder/manager.go # Event emission from AI coder service ``` **REVIEW Zone** (Check for Impact): ```yaml check_integration: - /internal/tui/model.go # TUI event subscription patterns - /internal/process/manager.go # Process event coordination - /internal/mcp/server.go # MCP event integration check_documentation: - /docs/events.md # Event system documentation ``` **IGNORE Zone** (Do Not Touch): ```yaml ignore_completely: - /internal/proxy/ # Proxy events separate system - /internal/discovery/ # Discovery events separate - /internal/logs/ # Log events separate handling ignore_search_patterns: - "**/testdata/**" # Test data files - "**/vendor/**" # Third-party code - "**/node_modules/**" # JavaScript dependencies ``` **Boundary Analysis Results**: - **Usage Count**: Limited to event subsystem extension - **Scope Assessment**: LIMITED scope - extends well-defined event patterns - **Impact Radius**: 1 core file to modify, 2 new files for event types ### External Context Sources (from master research) **Primary Documentation**: - [Event-Driven Architecture](https://martinfowler.com/articles/201701-event-driven.html) - Event design patterns - [Go Concurrency Patterns](https://blog.golang.org/pipelines) - Async event processing - [Observer Pattern](https://refactoring.guru/design-patterns/observer) - Event notification patterns **Standards Applied**: - Event naming: `ai_coder_*` prefix for consistency - Async processing with buffered channels - Type-safe event data structures **Reference Implementation**: - Existing event types and handler patterns in `pkg/events/events.go` - Event subscription and emission patterns - Worker pool patterns for async processing ## Task Requirements **Objective**: Extend event system with comprehensive AI coder event types and specialized handlers **Success Criteria**: - [ ] AI coder event types integrated with existing event bus - [ ] Specialized event handlers for AI coder state transitions - [ ] Event aggregation and filtering for AI coder operations - [ ] Integration with TUI for real-time AI coder status updates - [ ] Integration with MCP for external event access - [ ] Event persistence for AI coder operation history - [ ] Performance optimization for high-frequency AI coder events **Event Types to Implement**: 1. **Lifecycle Events** - Created, started, paused, completed, failed, deleted 2. **Progress Events** - Task progress updates, milestone completions 3. **Workspace Events** - File changes, workspace operations 4. **Provider Events** - API calls, rate limiting, errors 5. **Resource Events** - Memory usage, CPU consumption, disk operations **Validation Commands**: ```bash # Event System Integration Verification grep -q "ai_coder_created" pkg/events/events.go # Event types registered go build ./pkg/events # Events package compiles go test ./pkg/events -v # Event tests pass ./brum --events | grep -i "ai.*coder" # Events appear in debug output ``` ## Implementation Specifications ### Event Type Definitions ```go // pkg/events/ai_coder_events.go import ( "time" ) // AI Coder Event Types const ( // Lifecycle events EventAICoderCreated = "ai_coder_created" EventAICoderStarted = "ai_coder_started" EventAICoderPaused = "ai_coder_paused" EventAICoderResumed = "ai_coder_resumed" EventAICoderCompleted = "ai_coder_completed" EventAICoderFailed = "ai_coder_failed" EventAICoderStopped = "ai_coder_stopped" EventAICoderDeleted = "ai_coder_deleted" // Progress events EventAICoderProgress = "ai_coder_progress" EventAICoderMilestone = "ai_coder_milestone" EventAICoderOutput = "ai_coder_output" // Workspace events EventAICoderFileCreated = "ai_coder_file_created" EventAICoderFileModified = "ai_coder_file_modified" EventAICoderFileDeleted = "ai_coder_file_deleted" EventAICoderWorkspaceSync = "ai_coder_workspace_sync" // Provider events EventAICoderAPICall = "ai_coder_api_call" EventAICoderAPIError = "ai_coder_api_error" EventAICoderRateLimit = "ai_coder_rate_limit" // Resource events EventAICoderResourceUsage = "ai_coder_resource_usage" EventAICoderResourceLimit = "ai_coder_resource_limit" ) // Core AI Coder Event Structure type AICoderEvent struct { Type string `json:"type"` CoderID string `json:"coder_id"` CoderName string `json:"coder_name"` Timestamp time.Time `json:"timestamp"` Data map[string]interface{} `json:"data"` } // Lifecycle Event Data type AICoderLifecycleEvent struct { AICoderEvent PreviousStatus string `json:"previous_status"` CurrentStatus string `json:"current_status"` Reason string `json:"reason,omitempty"` ErrorMessage string `json:"error_message,omitempty"` } // Progress Event Data type AICoderProgressEvent struct { AICoderEvent Progress float64 `json:"progress"` Stage string `json:"stage"` Description string `json:"description"` Milestone string `json:"milestone,omitempty"` } // Workspace Event Data type AICoderWorkspaceEvent struct { AICoderEvent Operation string `json:"operation"` FilePath string `json:"file_path"` FileSize int64 `json:"file_size,omitempty"` ContentHash string `json:"content_hash,omitempty"` } // Provider Event Data type AICoderProviderEvent struct { AICoderEvent Provider string `json:"provider"` Model string `json:"model"` TokensUsed int `json:"tokens_used,omitempty"` Duration time.Duration `json:"duration,omitempty"` ErrorCode string `json:"error_code,omitempty"` ErrorMessage string `json:"error_message,omitempty"` } // Resource Event Data type AICoderResourceEvent struct { AICoderEvent MemoryMB int64 `json:"memory_mb"` CPUPercent float64 `json:"cpu_percent"` DiskUsageMB int64 `json:"disk_usage_mb"` NetworkBytes int64 `json:"network_bytes"` FileCount int `json:"file_count"` } // Event Factory Functions func NewAICoderLifecycleEvent(coderID, coderName, eventType, prevStatus, currStatus, reason string) *AICoderLifecycleEvent { return &AICoderLifecycleEvent{ AICoderEvent: AICoderEvent{ Type: eventType, CoderID: coderID, CoderName: coderName, Timestamp: time.Now(), }, PreviousStatus: prevStatus, CurrentStatus: currStatus, Reason: reason, } } func NewAICoderProgressEvent(coderID, coderName string, progress float64, stage, description string) *AICoderProgressEvent { return &AICoderProgressEvent{ AICoderEvent: AICoderEvent{ Type: EventAICoderProgress, CoderID: coderID, CoderName: coderName, Timestamp: time.Now(), }, Progress: progress, Stage: stage, Description: description, } } func NewAICoderWorkspaceEvent(coderID, coderName, operation, filePath string) *AICoderWorkspaceEvent { return &AICoderWorkspaceEvent{ AICoderEvent: AICoderEvent{ Type: getWorkspaceEventType(operation), CoderID: coderID, CoderName: coderName, Timestamp: time.Now(), }, Operation: operation, FilePath: filePath, } } func NewAICoderProviderEvent(coderID, coderName, provider, model string) *AICoderProviderEvent { return &AICoderProviderEvent{ AICoderEvent: AICoderEvent{ Type: EventAICoderAPICall, CoderID: coderID, CoderName: coderName, Timestamp: time.Now(), }, Provider: provider, Model: model, } } func NewAICoderResourceEvent(coderID, coderName string, memMB int64, cpuPercent float64, diskMB int64) *AICoderResourceEvent { return &AICoderResourceEvent{ AICoderEvent: AICoderEvent{ Type: EventAICoderResourceUsage, CoderID: coderID, CoderName: coderName, Timestamp: time.Now(), }, MemoryMB: memMB, CPUPercent: cpuPercent, DiskUsageMB: diskMB, } } // Helper functions func getWorkspaceEventType(operation string) string { switch operation { case "create": return EventAICoderFileCreated case "modify": return EventAICoderFileModified case "delete": return EventAICoderFileDeleted default: return EventAICoderWorkspaceSync } } ``` ### Specialized Event Handlers ```go // pkg/events/ai_coder_handlers.go import ( "context" "encoding/json" "fmt" "log" "sync" "time" ) // AI Coder Event Aggregator type AICoderEventAggregator struct { events []AICoderEvent mu sync.RWMutex maxEvents int eventBus *EventBus // Event statistics stats AICoderEventStats statsMu sync.RWMutex } type AICoderEventStats struct { TotalEvents int64 `json:"total_events"` EventsByType map[string]int64 `json:"events_by_type"` EventsByCoder map[string]int64 `json:"events_by_coder"` LastEvent time.Time `json:"last_event"` EventsPerMinute float64 `json:"events_per_minute"` } func NewAICoderEventAggregator(eventBus *EventBus, maxEvents int) *AICoderEventAggregator { aggregator := &AICoderEventAggregator{ events: make([]AICoderEvent, 0, maxEvents), maxEvents: maxEvents, eventBus: eventBus, stats: AICoderEventStats{ EventsByType: make(map[string]int64), EventsByCoder: make(map[string]int64), }, } // Register handlers for all AI coder event types aggregator.registerHandlers() return aggregator } // Register event handlers func (a *AICoderEventAggregator) registerHandlers() { eventTypes := []string{ EventAICoderCreated, EventAICoderStarted, EventAICoderPaused, EventAICoderResumed, EventAICoderCompleted, EventAICoderFailed, EventAICoderStopped, EventAICoderDeleted, EventAICoderProgress, EventAICoderMilestone, EventAICoderOutput, EventAICoderFileCreated, EventAICoderFileModified, EventAICoderFileDeleted, EventAICoderWorkspaceSync, EventAICoderAPICall, EventAICoderAPIError, EventAICoderRateLimit, EventAICoderResourceUsage, EventAICoderResourceLimit, } for _, eventType := range eventTypes { a.eventBus.Subscribe(eventType, a.handleAICoderEvent) } } // Handle AI coder events func (a *AICoderEventAggregator) handleAICoderEvent(data interface{}) { event, ok := data.(AICoderEvent) if !ok { // Try to convert from interface{} with type assertion if eventMap, ok := data.(map[string]interface{}); ok { event = a.mapToAICoderEvent(eventMap) } else { log.Printf("Invalid AI coder event data type: %T", data) return } } // Add to event history a.addEvent(event) // Update statistics a.updateStats(event) // Handle specialized processing a.processSpecializedEvent(event) } // Add event to history with size limit func (a *AICoderEventAggregator) addEvent(event AICoderEvent) { a.mu.Lock() defer a.mu.Unlock() // Add new event a.events = append(a.events, event) // Trim if over limit if len(a.events) > a.maxEvents { // Remove oldest events a.events = a.events[len(a.events)-a.maxEvents:] } } // Update event statistics func (a *AICoderEventAggregator) updateStats(event AICoderEvent) { a.statsMu.Lock() defer a.statsMu.Unlock() a.stats.TotalEvents++ a.stats.EventsByType[event.Type]++ a.stats.EventsByCoder[event.CoderID]++ a.stats.LastEvent = event.Timestamp // Calculate events per minute (simple moving average) if a.stats.TotalEvents > 1 { duration := event.Timestamp.Sub(time.Now().Add(-time.Minute)) if duration > 0 { a.stats.EventsPerMinute = float64(a.stats.TotalEvents) / duration.Minutes() } } } // Process specialized event handling func (a *AICoderEventAggregator) processSpecializedEvent(event AICoderEvent) { switch event.Type { case EventAICoderFailed: a.handleFailureEvent(event) case EventAICoderCompleted: a.handleCompletionEvent(event) case EventAICoderRateLimit: a.handleRateLimitEvent(event) case EventAICoderResourceLimit: a.handleResourceLimitEvent(event) } } // Handle failure events func (a *AICoderEventAggregator) handleFailureEvent(event AICoderEvent) { // Log failure for debugging log.Printf("AI Coder %s failed: %v", event.CoderID, event.Data) // Emit aggregated failure alert if multiple failures failureCount := a.getRecentFailureCount(event.CoderID) if failureCount >= 3 { a.eventBus.Emit("ai_coder_failure_alert", map[string]interface{}{ "coder_id": event.CoderID, "failure_count": failureCount, "time": event.Timestamp, }) } } // Handle completion events func (a *AICoderEventAggregator) handleCompletionEvent(event AICoderEvent) { // Calculate completion time from creation creationTime := a.getCreationTime(event.CoderID) if !creationTime.IsZero() { duration := event.Timestamp.Sub(creationTime) // Emit completion metrics a.eventBus.Emit("ai_coder_completion_metrics", map[string]interface{}{ "coder_id": event.CoderID, "duration": duration, "time": event.Timestamp, }) } } // Handle rate limit events func (a *AICoderEventAggregator) handleRateLimitEvent(event AICoderEvent) { // Emit system-wide rate limit warning a.eventBus.Emit("ai_coder_system_warning", map[string]interface{}{ "type": "rate_limit", "coder_id": event.CoderID, "message": "AI provider rate limit reached", "time": event.Timestamp, }) } // Handle resource limit events func (a *AICoderEventAggregator) handleResourceLimitEvent(event AICoderEvent) { // Emit resource warning a.eventBus.Emit("ai_coder_system_warning", map[string]interface{}{ "type": "resource_limit", "coder_id": event.CoderID, "message": "AI coder resource limit exceeded", "time": event.Timestamp, }) } // Query methods for event history func (a *AICoderEventAggregator) GetEvents(filter AICoderEventFilter) []AICoderEvent { a.mu.RLock() defer a.mu.RUnlock() var filtered []AICoderEvent for _, event := range a.events { if filter.matches(event) { filtered = append(filtered, event) } } return filtered } func (a *AICoderEventAggregator) GetStats() AICoderEventStats { a.statsMu.RLock() defer a.statsMu.RUnlock() return a.stats } // Event Filter type AICoderEventFilter struct { CoderID string EventType string Since time.Time Until time.Time } func (f AICoderEventFilter) matches(event AICoderEvent) bool { if f.CoderID != "" && event.CoderID != f.CoderID { return false } if f.EventType != "" && event.Type != f.EventType { return false } if !f.Since.IsZero() && event.Timestamp.Before(f.Since) { return false } if !f.Until.IsZero() && event.Timestamp.After(f.Until) { return false } return true } // Helper methods func (a *AICoderEventAggregator) getRecentFailureCount(coderID string) int { a.mu.RLock() defer a.mu.RUnlock() count := 0 cutoff := time.Now().Add(-10 * time.Minute) // Recent = last 10 minutes for _, event := range a.events { if event.CoderID == coderID && event.Type == EventAICoderFailed && event.Timestamp.After(cutoff) { count++ } } return count } func (a *AICoderEventAggregator) getCreationTime(coderID string) time.Time { a.mu.RLock() defer a.mu.RUnlock() for _, event := range a.events { if event.CoderID == coderID && event.Type == EventAICoderCreated { return event.Timestamp } } return time.Time{} } // Convert map to AICoderEvent func (a *AICoderEventAggregator) mapToAICoderEvent(eventMap map[string]interface{}) AICoderEvent { event := AICoderEvent{ Data: make(map[string]interface{}), } if eventType, ok := eventMap["type"].(string); ok { event.Type = eventType } if coderID, ok := eventMap["coder_id"].(string); ok { event.CoderID = coderID } if coderName, ok := eventMap["coder_name"].(string); ok { event.CoderName = coderName } if timestamp, ok := eventMap["timestamp"].(time.Time); ok { event.Timestamp = timestamp } else { event.Timestamp = time.Now() } // Copy remaining data for k, v := range eventMap { if k != "type" && k != "coder_id" && k != "coder_name" && k != "timestamp" { event.Data[k] = v } } return event } ``` ### Event Bus Integration ```go // Addition to pkg/events/events.go // Add AI coder event types to existing event bus // Register AI coder event aggregator func (eb *EventBus) RegisterAICoderAggregator(maxEvents int) *AICoderEventAggregator { return NewAICoderEventAggregator(eb, maxEvents) } // AI coder specific subscription helper func (eb *EventBus) SubscribeAICoderEvents(coderID string, handler EventHandler) { // Subscribe to all AI coder events for specific coder eventTypes := []string{ EventAICoderCreated, EventAICoderStarted, EventAICoderPaused, EventAICoderResumed, EventAICoderCompleted, EventAICoderFailed, EventAICoderStopped, EventAICoderDeleted, EventAICoderProgress, } for _, eventType := range eventTypes { eb.Subscribe(eventType, func(data interface{}) { if event, ok := data.(AICoderEvent); ok && event.CoderID == coderID { handler(data) } }) } } // Emit AI coder event with validation func (eb *EventBus) EmitAICoderEvent(event AICoderEvent) { if event.Type == "" { log.Printf("Warning: AI coder event missing type") return } if event.CoderID == "" { log.Printf("Warning: AI coder event missing coder ID") return } if event.Timestamp.IsZero() { event.Timestamp = time.Now() } eb.Emit(event.Type, event) } ``` ## Risk Mitigation (from master analysis) **Medium-Risk Mitigations**: - Async complexity - Use established event bus patterns with worker pools - Testing: Concurrent event processing tests - Event coordination - Implement event ordering and deduplication - Recovery: Event replay mechanism for failures - Memory management - Bounded event history with LRU eviction - Monitoring: Event memory usage tracking **Context Validation**: - [ ] Event bus patterns from `pkg/events/events.go` successfully extended - [ ] Event processing maintains system responsiveness - [ ] Event aggregation provides useful insights without performance impact ## Integration with Other Tasks **Dependencies**: Task 01 (Core Service) - Requires AI coder event emission **Integration Points**: - Task 04 (TUI Integration) will subscribe to events for real-time updates - Task 05 (Process Integration) will coordinate events between systems - Task 02 (MCP Tools) will access event history via tools **Shared Context**: Event system becomes the nervous system for all AI coder operations ## Execution Notes - **Start Pattern**: Use existing event bus patterns from `pkg/events/events.go` - **Key Context**: Focus on event aggregation and intelligent filtering - **Performance Focus**: Implement bounded event history and efficient filtering - **Review Focus**: Event handler performance and memory usage patterns This task creates a comprehensive event system that provides real-time visibility into all AI coder operations while maintaining high performance and system responsiveness.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server