Skip to main content
Glama
pipeline.go4.49 kB
package pipeline import ( "sync" "github.com/babelcloud/gbox/packages/cli/internal/device_connect/core" "github.com/babelcloud/gbox/packages/cli/internal/util" ) // Pipeline manages video/audio sample distribution. type Pipeline struct { mu sync.RWMutex spsPps []byte // Video subscribers videoSubs map[string]chan core.VideoSample // Audio subscribers audioSubs map[string]chan core.AudioSample } // NewPipeline creates a new pipeline. func NewPipeline() *Pipeline { return &Pipeline{ videoSubs: make(map[string]chan core.VideoSample), audioSubs: make(map[string]chan core.AudioSample), } } // CacheSpsPps caches SPS/PPS data for H.264 streams. func (p *Pipeline) CacheSpsPps(spsPps []byte) { p.mu.Lock() defer p.mu.Unlock() p.spsPps = spsPps util.GetLogger().Debug("Pipeline SPS/PPS cached", "size", len(spsPps)) } // GetSpsPps returns cached SPS/PPS data. func (p *Pipeline) GetSpsPps() []byte { p.mu.RLock() defer p.mu.RUnlock() return p.spsPps } // SubscribeVideo adds a video subscriber. func (p *Pipeline) SubscribeVideo(id string, bufferSize int) <-chan core.VideoSample { p.mu.Lock() defer p.mu.Unlock() ch := make(chan core.VideoSample, bufferSize) p.videoSubs[id] = ch util.GetLogger().Debug("Video subscriber added", "id", id, "total", len(p.videoSubs)) return ch } // UnsubscribeVideo removes a video subscriber. func (p *Pipeline) UnsubscribeVideo(id string) { p.mu.Lock() defer p.mu.Unlock() if ch, exists := p.videoSubs[id]; exists { close(ch) delete(p.videoSubs, id) util.GetLogger().Info("Video subscriber removed", "id", id, "total", len(p.videoSubs)) } } // PublishVideo publishes a video sample to all subscribers. func (p *Pipeline) PublishVideo(sample core.VideoSample) { p.mu.RLock() defer p.mu.RUnlock() for id, ch := range p.videoSubs { select { case ch <- sample: // Sample sent successfully default: // Channel is full, skip util.GetLogger().Warn("Video channel full, dropping sample", "subscriber", id) } } } // SubscribeAudio adds an audio subscriber. func (p *Pipeline) SubscribeAudio(id string, bufferSize int) <-chan core.AudioSample { p.mu.Lock() defer p.mu.Unlock() ch := make(chan core.AudioSample, bufferSize) p.audioSubs[id] = ch util.GetLogger().Debug("Audio subscriber added", "id", id, "total", len(p.audioSubs)) return ch } // UnsubscribeAudio removes an audio subscriber. func (p *Pipeline) UnsubscribeAudio(id string) { p.mu.Lock() defer p.mu.Unlock() if ch, exists := p.audioSubs[id]; exists { close(ch) delete(p.audioSubs, id) util.GetLogger().Info("Audio subscriber removed", "id", id, "total", len(p.audioSubs)) } } // PublishAudio publishes an audio sample to all subscribers. func (p *Pipeline) PublishAudio(sample core.AudioSample) { p.mu.RLock() defer p.mu.RUnlock() if len(p.audioSubs) == 0 { util.GetLogger().Debug("🎵 No audio subscribers, dropping sample", "size", len(sample.Data)) return } // Track successful sends successfulSends := 0 totalSubscribers := len(p.audioSubs) // Create a list of subscribers to remove (dead channels) var deadSubscribers []string for id, ch := range p.audioSubs { select { case ch <- sample: util.GetLogger().Debug("🎵 Audio sample sent to subscriber", "subscriber", id, "size", len(sample.Data)) successfulSends++ default: // Channel is full, check if it's a dead channel select { case <-ch: // Channel was closed, mark for removal deadSubscribers = append(deadSubscribers, id) util.GetLogger().Warn("🎵 Dead audio subscriber detected, marking for removal", "subscriber", id) default: // Channel is just full, not dead util.GetLogger().Warn("🎵 Audio channel full, dropping sample", "subscriber", id) } } } // Remove dead subscribers if len(deadSubscribers) > 0 { p.mu.RUnlock() // Release read lock p.mu.Lock() // Acquire write lock for _, id := range deadSubscribers { if ch, exists := p.audioSubs[id]; exists { close(ch) delete(p.audioSubs, id) util.GetLogger().Info("🎵 Dead audio subscriber removed", "id", id, "total", len(p.audioSubs)) } } p.mu.Unlock() // Release write lock p.mu.RLock() // Re-acquire read lock for consistency } // If no subscribers could accept the sample, log a warning if successfulSends == 0 { util.GetLogger().Warn("🎵 All audio channels full, dropping sample", "total_subscribers", totalSubscribers, "sample_size", len(sample.Data)) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/babelcloud/gru-sandbox'

If you have feedback or need assistance with the MCP directory API, please join our Discord server