Skip to main content
Glama
dispacher.worker_pool.go2.26 kB
//go:build pro package handlers import ( "lunar/async-service/config" context_manager "lunar/toolkit-core/context-manager" protocol_async "lunar/toolkit-core/network/protocols/async" "sync/atomic" "github.com/rs/zerolog/log" ) type WorkerPool struct { numWorkers int64 runningWorkers atomic.Int64 protocol *protocol_async.Protocol done chan bool } func NewWorkerPool(numWorkers int64, protocolHandler *protocol_async.Protocol) *WorkerPool { return &WorkerPool{ numWorkers: numWorkers, protocol: protocolHandler, done: make(chan bool), } } func (wp *WorkerPool) Start() { go wp.start() } func (wp *WorkerPool) Stop() { wp.done <- true log.Debug().Msgf("Worker pool stopped.") } func (wp *WorkerPool) start() { clock := context_manager.Get().GetClock() asyncServiceIdle := config.GetAsyncServiceIdle() for { select { case <-clock.After(asyncServiceIdle): wp.processRequests() case <-wp.done: log.Debug().Msg("Received done signal, exiting") return } } } func (wp *WorkerPool) processRequests() { for wp.protocol.GetQueueSize(protocol_async.QueueRequired) > 0 { if wp.runningWorkers.Load() >= wp.numWorkers { return } // Gets the request from the required queue and moves it to processing asyncReq, err := wp.protocol.MoveHead(protocol_async.QueueRequired, protocol_async.QueueProcessing) if err == nil { if !wp.executeTask(asyncReq) { return } } else { break } } for wp.protocol.GetQueueSize(protocol_async.QueueIdle) > 0 { if wp.runningWorkers.Load() >= wp.numWorkers { return } // Gets the registered request from the idle queue in order to register it with the Engine. asyncReq, err := wp.protocol.MoveHead(protocol_async.QueueIdle, protocol_async.QueueProcessing) if err == nil { if !wp.executeTask(asyncReq) { return } } else { break } } } func (wp *WorkerPool) executeTask(asyncReq *protocol_async.RequestData) bool { wp.runningWorkers.Add(1) workerInstance := &worker{ protocol: wp.protocol, logger: log.With().Str("request_id", asyncReq.ID).Logger(), } go func(workerInstance *worker) { defer wp.runningWorkers.Add(-1) workerInstance.Process(asyncReq) }(workerInstance) return true }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server