Skip to main content
Glama
diagnosis_worker.go4.52 kB
package runner import ( "lunar/engine/config" lunarMessages "lunar/engine/messages" "lunar/engine/services" "lunar/engine/utils" sharedConfig "lunar/shared-model/config" contextmanager "lunar/toolkit-core/context-manager" "runtime" "strings" "github.com/rs/zerolog/log" ) type DiagnosisTask struct { Request lunarMessages.OnRequest Response lunarMessages.OnResponse } type DiagnosisWorker struct { diagnosisCache utils.Cache[string, DiagnosisTask] diagnosisData chan string } const ( // cacheTTL: The time that the data will be stored in the cache // before being deleted. The default will be 2 minutes. cacheTTL float64 = 60 * 2 // channelBufferSize: The size of the channel buffer, // the number of messages that can be stored without blocking the flow. channelBufferSize int = 2048 ) func NewDiagnosisWorker() *DiagnosisWorker { return &DiagnosisWorker{ diagnosisCache: utils.NewMemoryCache[string, DiagnosisTask]( contextmanager.Get().GetClock(), ), diagnosisData: make(chan string, channelBufferSize), } } func (worker *DiagnosisWorker) AddRequestToTask(onRequest lunarMessages.OnRequest) { var emptyResponse lunarMessages.OnResponse cacheKey := strings.Clone(onRequest.ID) log.Trace().Msgf("Adding request data to the cache with key: %v", cacheKey) err := worker.diagnosisCache.Set( cacheKey, DiagnosisTask{Request: onRequest.DeepCopy(), Response: emptyResponse}, cacheTTL) if err != nil { log.Debug(). Msgf("Failed to cache key: %v, cache: %+v. %+v", cacheKey, worker.diagnosisCache, err) return } log.Trace().Msgf("Cache after adding request: %+v", worker.diagnosisCache) } func (worker *DiagnosisWorker) AddResponseToTask( onResponse lunarMessages.OnResponse, ) { cacheKey := strings.Clone(onResponse.ID) task, found := worker.diagnosisCache.Get(cacheKey) if !found { log.Debug(). Msgf("Failed to find transaction for key: %v, cache: %+v", cacheKey, worker.diagnosisCache) return } task.Response = onResponse.DeepCopy() log.Trace().Msgf( "Adding response data to the cache with key: %v, value: %+v", cacheKey, task.Response, ) err := worker.diagnosisCache.Set(cacheKey, task, cacheTTL) if err != nil { log.Debug(). Msgf("Failed to cache key: %v, cache: %+v. %+v", cacheKey, worker.diagnosisCache, err) return } log.Trace().Msgf("Cache after adding response: %+v", worker.diagnosisCache) } func (worker *DiagnosisWorker) NotifyTaskReady(transactionID string) { // This is executed in a separate goroutine // to avoid blocking the flow if the channel is full. log.Trace().Msgf( "Scheduling goroutine to send %v to diagnosis worker", transactionID) copyOfTransactionID := strings.Clone(transactionID) go func(transactionID string) { log.Trace().Msgf("Sending %v to diagnosis worker", transactionID) worker.diagnosisData <- transactionID }(copyOfTransactionID) } func (worker *DiagnosisWorker) Run( policiesAccessor config.PoliciesAccessor, plugins *services.DiagnosisPlugins, exporters *services.Exporters, ) { go worker.diagnosisWorker( worker.diagnosisData, policiesAccessor, plugins, exporters, ) } func (worker *DiagnosisWorker) Stop() { log.Trace().Msg("Stopping the Diagnosis Worker...") close(worker.diagnosisData) } func (worker *DiagnosisWorker) diagnosisWorker( diagnosisTasks <-chan string, policiesAccessor config.PoliciesAccessor, plugins *services.DiagnosisPlugins, exporters *services.Exporters, ) { sublogger := log.With(). Str("component", "diagnosis-worker"). Logger() // TODO: How to share the logger down the call stack? for taskKey := range diagnosisTasks { task, found := worker.diagnosisCache.Get(taskKey) if !found { sublogger.Debug().Msgf( "Failed to find transaction for key: %v", taskKey, ) } policiesData := policiesAccessor.GetTxnPoliciesData( config.TxnID(taskKey), ) RunTask( task, &policiesData.EndpointPolicyTree, policiesData.Config.Global.Diagnosis, plugins, exporters, ) // Set the function as low priority to give more runtime to the remedy types. runtime.Gosched() } } func RunTask( task DiagnosisTask, policyTree *config.EndpointPolicyTree, globalDiagnoses []sharedConfig.Diagnosis, plugins *services.DiagnosisPlugins, exporters *services.Exporters, ) { diagnoses := getDiagnoses( task.Request.Method, task.Request.URL, policyTree, globalDiagnoses) runOnTransaction( task.Request, task.Response, diagnoses, plugins, exporters, policyTree, ) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server