Skip to main content
Glama
dispacher.worker.go3.7 kB
//go:build pro package handlers import ( "lunar/async-service/utils" stream_types "lunar/engine/streams/types" protocol_async "lunar/toolkit-core/network/protocols/async" "net/http" "github.com/rs/zerolog" ) type worker struct { protocol *protocol_async.Protocol logger zerolog.Logger } func (w *worker) Process(asyncReq *protocol_async.RequestData) { var err error IDLogger := w.logger.With().Str("request_id", asyncReq.ID).Logger() if !asyncReq.Initialized { IDLogger.Trace().Msg("asyncReq not initialized") return } defer func() { if err = w.protocol.RemoveRequestFromProcessingQueue(asyncReq); err != nil { IDLogger.Debug().Err(err).Msg("Error removing request from processing queue") } }() result := w.processJob(asyncReq, IDLogger) switch result { case addToIdle: if err = w.protocol.AddRequestToIdleQueue(asyncReq); err != nil { IDLogger.Trace().Err(err).Msg("Error adding request to idle queue") } case addToPending: if err = w.protocol.AddRequestToPendingQueue(asyncReq); err != nil { IDLogger.Trace().Err(err).Msg("Error adding request to pending queue") } case addToErrors: // TODO: do we want to have an error queue? // w.protocol.AddRequestToErrorQueue(asyncReq) return case noOperation: return case addResponse: return } } func (w *worker) processJob( asyncReq *protocol_async.RequestData, IDLogger zerolog.Logger, ) workerResult { request := w.prepareRequest(asyncReq, IDLogger) response, err := utils.MakeRequest(request) if err != nil { IDLogger.Trace().Err(err).Msg("Error making request") return addToIdle } if response == nil { IDLogger.Trace().Msg("Response is nil") return addToErrors } else if response.ID == "" { response.ID = asyncReq.ID response.SequenceID = asyncReq.ID } operation := w.getOperationBasedOnResponse(response, IDLogger) if operation == addResponse { err = w.onResponse(asyncReq, response) if err != nil { IDLogger.Trace().Err(err).Msg("Error on response") return addToErrors } } IDLogger.Trace().Msg("Finished processing.") if flowName, found := response.Headers[asyncServiceFlowIndicatorHeaderName]; found { request.Headers[asyncServiceFlowIndicatorHeaderName] = flowName if err := w.protocol.StoreRequest(request, asyncReq); err != nil { IDLogger.Debug().Err(err).Msg("Error updating request with flow name") } } return operation } func (w *worker) onResponse( asyncReq *protocol_async.RequestData, response *stream_types.OnResponse, ) error { if err := w.protocol.StoreResponse(response); err != nil { return err } return w.protocol.RemoveRequestFromStorage(asyncReq.ID) } func (w *worker) prepareRequest( asyncReq *protocol_async.RequestData, IDLogger zerolog.Logger, ) *stream_types.OnRequest { request, err := w.protocol.GetRequestByID(asyncReq.ID) if err != nil { IDLogger.Trace().Err(err).Msgf("Error getting request by ID: %s", asyncReq.ID) return nil } if request == nil { IDLogger.Trace().Msg("Request not found") return nil } return request } func (w *worker) getOperationBasedOnResponse( response *stream_types.OnResponse, IDLogger zerolog.Logger, ) workerResult { if response.Status != http.StatusAccepted { return addResponse } headerVal, found := response.Headers[asyncServiceResponseNotAllowedHeaderName] if !found { return addResponse } switch headerVal { case asyncServiceResponseRegister: return addToPending case asyncServiceResponseBlocked: return addToPending case asyncServiceResponseError: return addToIdle case asyncServiceResponseRetry: return addToIdle default: IDLogger.Debug().Msgf("Unknown header value: %s", headerVal) return noOperation } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server