Skip to main content
Glama
watchdog.go5.12 kB
//go:build pro package handlers import ( context_manager "lunar/toolkit-core/context-manager" protocol_async "lunar/toolkit-core/network/protocols/async" redis_client "lunar/toolkit-core/redis-client" "strings" "sync" "time" "github.com/rs/zerolog/log" ) const ( idleWaitMin = 5 initBachSize = 200 initAvgRedisLatency = 5 * time.Millisecond targetBachTime = 40 * time.Millisecond ) type Watchdog struct { done chan bool protocol *protocol_async.Protocol mutex sync.RWMutex batchSize int avgRedisLatency time.Duration } func NewWatchdog(protocol *protocol_async.Protocol) *Watchdog { return &Watchdog{ protocol: protocol, done: make(chan bool), batchSize: initBachSize, avgRedisLatency: initAvgRedisLatency, } } func (wd *Watchdog) Start() { go wd.validateRequests() } func (wd *Watchdog) Stop() { wd.done <- true } func (wd *Watchdog) validateRequests() { clock := context_manager.Get().GetClock() for { select { case <-clock.After(time.Duration(idleWaitMin) * time.Minute): wd.validatePendingRequests() case <-wd.done: log.Debug().Msg("Received done signal, exiting") return } } } func (wd *Watchdog) validatePendingRequests() { pendingQueueSize := wd.protocol.GetQueueSize(protocol_async.QueuePending) if pendingQueueSize == 0 { return } availableQueues, err := wd.protocol.GetAvailableEngineQueues() if err != nil { log.Debug().Err(err).Msg("Failed to get available queues") return } availableQueuesKeys := make([]redis_client.Key, len(availableQueues)) for _, queueRawKey := range availableQueues { parsedKey := strings.Split(queueRawKey, redis_client.Delimiter) if len(parsedKey) < 2 { log.Debug().Msgf("Invalid queue key format: %v", queueRawKey) continue } queueKey := redis_client.NewKey() for i := 1; i < len(parsedKey)-1; i++ { queueKey.Append(redis_client.UnhashedKeyPart(parsedKey[i])) } availableQueuesKeys = append(availableQueuesKeys, queueKey) } batchSize := wd.getBatchSize() if batchSize <= 0 { batchSize = 1 } numBatches := int(pendingQueueSize) / batchSize if int(pendingQueueSize)%batchSize != 0 { numBatches++ } for i := 0; i < numBatches; i++ { log.Trace().Msgf("Processing batch %d/%d", i+1, numBatches) start := int64(i * batchSize) end := start + int64(batchSize) - 1 if end >= pendingQueueSize { end = pendingQueueSize - 1 } wd.processBatch(start, end, availableQueuesKeys) } } func (wd *Watchdog) processBatch(start, end int64, availableQueues []redis_client.Key) { clock := context_manager.Get().GetClock() startTime := clock.Now() result, err := wd.protocol.ZRangeFromToOnQueue(protocol_async.QueuePending, start, end) redisCallDuration := clock.Since(startTime) if err != nil || len(result) == 0 { return } for _, member := range result { asyncReq, err := protocol_async.NewAsyncRequestData(nil, &member) if err != nil { log.Debug().Err(err).Msg("Failed to create async request data") continue } if asyncReq == nil { log.Debug().Msg("Async request data is nil") continue } reqValid := false for _, queue := range availableQueues { found := false found, err = wd.protocol.IsRequestInEngineQueue(asyncReq, queue) if err != nil { log.Debug().Err(err).Msg("Failed to check if request is in engine queue") continue } if found { log.Trace().Msgf("Request %s is valid in queue %v", asyncReq.ID, queue) reqValid = true break } } if !reqValid { log.Debug().Msgf("Request %s is not valid, adding to Idle queue", asyncReq.ID) err = wd.protocol.RemoveRequestFromPendingQueue(asyncReq) if err != nil { log.Debug().Err(err).Msg("Failed to remove request from pending queue") } err = wd.protocol.AddRequestToIdleQueue(asyncReq) if err != nil { log.Debug().Err(err).Msg("Failed to add request to Idle queue") } } } batchDuration := clock.Since(startTime) wd.adjustBatchSize(redisCallDuration, batchDuration, len(result)) } func (wd *Watchdog) getBatchSize() int { wd.mutex.RLock() defer wd.mutex.RUnlock() return wd.batchSize } func (wd *Watchdog) adjustBatchSize( redisCallDuration time.Duration, batchDuration time.Duration, itemsCount int, ) { wd.mutex.Lock() defer wd.mutex.Unlock() // Update average Redis Latency wd.avgRedisLatency = (wd.avgRedisLatency*9 + redisCallDuration) / 10 // Calculate Desired Batch Size base on time desiredBatchSize := int(float64(targetBachTime) / float64(wd.avgRedisLatency) * float64(itemsCount)) // Adjust Batch Size if batchDuration > targetBachTime { // Batch took too long -> decrease wd.batchSize = int(float64(wd.batchSize) * 0.9) if wd.batchSize < 100 { wd.batchSize = 100 } log.Debug().Msgf("Decreasing batch size to %d", wd.batchSize) } else if batchDuration < targetBachTime/2 && desiredBatchSize > wd.batchSize { // Batch was fast & Desired Batch is bigger -> Increase wd.batchSize = int(float64(desiredBatchSize) * 1.1) log.Debug().Msgf("Increasing batch size to %d", wd.batchSize) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server