Skip to main content
Glama
handling_data_manager.go18.5 kB
package routing import ( "encoding/json" "fmt" "lunar/engine/communication" "lunar/engine/config" "lunar/engine/doctor" "lunar/engine/failsafe" "lunar/engine/metrics" "lunar/engine/runner" "lunar/engine/services" "lunar/engine/streams" stream_config "lunar/engine/streams/config" configstate "lunar/engine/streams/config-state" internal_types "lunar/engine/streams/internal-types" lunar_context "lunar/engine/streams/lunar-context" stream_types "lunar/engine/streams/types" "lunar/engine/streams/validation" "lunar/engine/utils" "lunar/engine/utils/environment" "lunar/engine/utils/writers" "lunar/toolkit-core/logging" "lunar/toolkit-core/network" "lunar/toolkit-core/otel" "net/http" "sync" "time" shared_config "lunar/shared-model/config" shared_discovery "lunar/shared-model/discovery" context_manager "lunar/toolkit-core/context-manager" "github.com/rs/zerolog/log" ) const ( lunarEngine string = "lunar-engine" syslogExporterEndpoint string = "127.0.0.1:5140" ) type PoliciesData struct { diagnosisWorker *runner.DiagnosisWorker configBuildResult config.BuildResult diagnosisWatcher *failsafe.StateChangeWatcher } type StreamsData struct { stream *streams.Stream flowValidator *validation.Validator } type HandlingDataManager struct { PoliciesData StreamsData handlingLock sync.Mutex isStreamsEnabled bool lunarHub *communication.HubCommunication writer writers.Writer proxyTimeout time.Duration policiesServices *services.PoliciesServices metricManager *metrics.MetricManager legacyMetricManager *metrics.LegacyMetricManager doctor *doctor.Doctor shutdown func() areMetricsInitialized bool } func NewHandlingDataManager( proxyTimeout time.Duration, hubComm *communication.HubCommunication, ) *HandlingDataManager { ctxMng := context_manager.Get() data := &HandlingDataManager{ proxyTimeout: proxyTimeout, lunarHub: hubComm, writer: writers.Dial("tcp", syslogExporterEndpoint, ctxMng.GetClock()), } context_manager.Get().WithFileExporter(data.writer) return data } func (rd *HandlingDataManager) Setup(telemetryWriter *logging.LunarLogger) error { rd.initializeOtel() err := rd.initializeDoctor(telemetryWriter) if err != nil { log.Warn().Err(err).Msg("Failed to initialize doctor") } if environment.IsStreamsEnabled() { rd.isStreamsEnabled = true rd.doctor.WithStreams(rd.GetLoadedStreamsConfig) err := rd.initializeStreams() if err != nil { return err } rd.metricManager, err = metrics.NewMetricManager() if err != nil { return fmt.Errorf("failed to initialize metric manager: %w", err) } rd.metricManager.UpdateMetricsProviderForFlow(rd.stream) return nil } rd.doctor.WithPolicies(rd.GetTxnPoliciesAccessor) return rd.initializePolicies() } func (rd *HandlingDataManager) GetMetricManager() *metrics.MetricManager { return rd.metricManager } func (rd *HandlingDataManager) RunDiagnosisWorker() { if rd.diagnosisWorker == nil { return } rd.diagnosisWorker.Run( rd.configBuildResult.Accessor, &rd.policiesServices.Diagnosis, &rd.policiesServices.Exporters, ) } func (rd *HandlingDataManager) StopDiagnosisWorker() { if rd.diagnosisWorker != nil { rd.diagnosisWorker.Stop() } } func (rd *HandlingDataManager) GetTxnPoliciesAccessor() *config.TxnPoliciesAccessor { return rd.configBuildResult.Accessor } func (rd *HandlingDataManager) GetLoadedStreamsConfig() *network.ConfigurationData { if rd.isStreamsEnabled && rd.stream != nil { f := rd.stream.GetLoadedConfig() return &f } return nil } func (rd *HandlingDataManager) IsStreamsEnabled() bool { return rd.isStreamsEnabled } func (rd *HandlingDataManager) Shutdown() { if rd.shutdown != nil { rd.shutdown() } } func (rd *HandlingDataManager) SetHandleRoutes(mux *http.ServeMux) { if rd.isStreamsEnabled { mux.HandleFunc( "/on_haproxy_error", rd.handleOnError(), ) mux.HandleFunc( "/load_flows", rd.handleFlowsLoading(), ) mux.HandleFunc( "/validate_flows", rd.handleFlowsValidation(), ) mux.HandleFunc( "/apply_flows", rd.handleApplyFlows(), ) mux.HandleFunc( "/configuration", rd.handleConfiguration(), ) } else { mux.HandleFunc( "/apply_policies", HandleApplyPolicies(rd.configBuildResult.Accessor, rd.writer), ) mux.HandleFunc( "/validate_policies", HandleValidatePolicies(), ) mux.HandleFunc( "/revert_to_diagnosis_free", HandleRevertToDiagnosisFree(rd.configBuildResult.Accessor, rd.writer), ) mux.HandleFunc( "/revert_to_last_loaded", HandleRevertToLastLoaded(rd.configBuildResult.Accessor, rd.writer), ) } mux.HandleFunc( "/doctor", HandleDoctorRequest(rd.doctor), ) mux.HandleFunc( "/discover", HandleJSONFileRead(environment.GetDiscoveryStateLocation()), ) mux.HandleFunc( "/remedy_stats", HandleJSONFileRead(environment.GetRemedyStateLocation()), ) mux.HandleFunc( "/handshake", HandleHandshake(), ) } func (rd *HandlingDataManager) initializeOtel() { rd.shutdown = otel.InitProvider(lunarEngine) go otel.ServeMetrics() rd.areMetricsInitialized = true } func (rd *HandlingDataManager) initializeStreamsForDryRun() error { log.Info().Msg("Validating flows for Lunar Engine") rd.flowValidator = validation.NewValidator() return rd.flowValidator.Validate() } func (rd *HandlingDataManager) initializeStreams() (err error) { statusMsg := context_manager.Get().GetStatusMessage() statusMsg.AddMessage(lunarEngine, "Engine: Lunar Flows") _ = lunar_context.NewSharedState[int64]() // For Redis initialization var previousHaProxyReq *config.HAProxyEndpointsRequest if rd.stream != nil { previousHaProxyReq = rd.buildHAProxyFlowsEndpointsRequest() } stream, err := streams.NewStream() if err != nil { return fmt.Errorf("failed to create stream: %w", err) } rd.stream = stream rd.stream.WithHub(rd.lunarHub) if err = rd.stream.Initialize(); err != nil { return fmt.Errorf("failed to initialize streams: %w", err) } rd.stream.InitializeHubCommunication() if err = config.WaitForProxyHealthcheck(); err != nil { return fmt.Errorf("failed to wait for HAProxy healthcheck: %w", err) } newHAProxyEndpoints := rd.buildHAProxyFlowsEndpointsRequest() err = config.ManageHAProxyEndpoints(newHAProxyEndpoints) if err != nil { return fmt.Errorf("failed to initialize HAProxy endpoints: %v", err) } // Unmanaging HAProxy endpoints should occur after all possible transactions have reached Engine if previousHaProxyReq != nil && len(previousHaProxyReq.ManagedEndpoints) > 0 { haproxyEndpointsToRemove := config.GetEndpointsDiffToRemove(previousHaProxyReq, newHAProxyEndpoints) config.ScheduleUnmanageHAProxyEndpoints(haproxyEndpointsToRemove) } return nil } func (rd *HandlingDataManager) processFlowsValidation() error { err := rd.initializeStreamsForDryRun() if err == nil { return nil } err = utils.LastErrorWithUnwrappedDepth(err, 1) return fmt.Errorf("💔 Validation failed: %v", err.Error()) } func (rd *HandlingDataManager) handleFlowsLoading() func(http.ResponseWriter, *http.Request) { return func(writer http.ResponseWriter, req *http.Request) { switch req.Method { case http.MethodPost: if err := rd.reloadFlows(); err != nil { handleError(writer, fmt.Sprintf("%v", err), http.StatusBadRequest, err) return } SuccessResponse(writer, "✅ Successfully loaded flows") default: http.Error( writer, "Unsupported Method", http.StatusMethodNotAllowed, ) } } } func (rd *HandlingDataManager) handleOnError() func(http.ResponseWriter, *http.Request) { return func(writer http.ResponseWriter, req *http.Request) { if req.Method != http.MethodPut { http.Error( writer, "Unsupported Method for handling errors", http.StatusMethodNotAllowed, ) } failedTransactions := shared_discovery.OnError{} if err := json.NewDecoder(req.Body).Decode(&failedTransactions); err != nil { handleError(writer, "Failed to decode incoming data", http.StatusBadRequest, err) return } for failedTransactionID := range failedTransactions.FailedTransactions { rd.stream.OnError(failedTransactionID) } SuccessResponse(writer, "Error logged successfully") } } func (rd *HandlingDataManager) handleApplyFlows() func(http.ResponseWriter, *http.Request) { return func(writer http.ResponseWriter, req *http.Request) { if !rd.handlingLock.TryLock() { handleError(writer, "Failed to decode incoming data", http.StatusIMUsed, fmt.Errorf("already handling another apply flows request")) return } defer rd.handlingLock.Unlock() if req.Method != http.MethodPut { http.Error( writer, "Unsupported Method for applying flows", http.StatusMethodNotAllowed, ) } incomingData := stream_config.NewConfigurationPayload() if err := json.NewDecoder(req.Body).Decode(&incomingData); err != nil { handleError(writer, "Failed to decode incoming data", http.StatusBadRequest, err) return } if incomingData == nil { handleError(writer, "No data provided", http.StatusBadRequest, nil) return } if err := incomingData.ParsePayload(); err != nil { handleError(writer, "Failed to parse incoming data", http.StatusBadRequest, err) return } configState := configstate.Get() endTxn := configState.StartTransaction() defer endTxn() if err := configState.Backup(); err != nil { log.Error().Err(err).Msg("Failed to backup config") handleError(writer, "Failed to backup config", http.StatusInternalServerError, err) return } if err := configState.Clean(); err != nil { handleError(writer, "Failed to clean up", http.StatusInternalServerError, err) if err = configState.RestoreNewest(); err != nil { log.Error().Err(err).Msg("Failed to restore file system operations") } return } if err := incomingData.SavePayloadContentToDisk(); err != nil { handleError(writer, "Failed to save payload content", http.StatusInternalServerError, err) if err = configState.RestoreNewest(); err != nil { log.Error().Err(err).Msg("Failed to restore file system operations") } return } if err := rd.reloadFlows(); err != nil { handleError(writer, err.Error(), http.StatusUnprocessableEntity, err) if err = configState.RestoreNewest(); err != nil { log.Error().Err(err).Msg("Failed to restore file system operations") } return } SuccessResponse(writer, "Lunar Gateway config is being updated...") } } func (rd *HandlingDataManager) handleConfiguration() func(http.ResponseWriter, *http.Request) { return func(writer http.ResponseWriter, req *http.Request) { if !rd.handlingLock.TryLock() { handleError(writer, "Failed to decode incoming data", http.StatusIMUsed, fmt.Errorf("already handling another configuration request")) return } defer rd.handlingLock.Unlock() if req.Method != http.MethodPost { http.Error( writer, "Unsupported Method for configuration", http.StatusMethodNotAllowed, ) } incomingData := stream_config.NewContractPayload() if err := json.NewDecoder(req.Body).Decode(incomingData); err != nil { handleError(writer, "Failed to decode incoming data", http.StatusBadRequest, err) return } if incomingData == nil || !incomingData.IsDataProvided() { handleError(writer, "No data provided", http.StatusBadRequest, nil) return } if err := incomingData.ParsePayload(); err != nil { handleError(writer, "Failed to parse incoming data", http.StatusBadRequest, err) return } configState := configstate.Get() endTxn := configState.StartTransaction() defer endTxn() respPayload, err := incomingData.Operation.Apply() if err != nil { handleError(writer, "Failed to apply incoming data", http.StatusInternalServerError, err) } if !incomingData.Operation.IsGetOperation() { if err = rd.reloadFlows(); err != nil { handleError(writer, err.Error(), http.StatusUnprocessableEntity, err) if err = configState.RestoreNewest(); err != nil { log.Error().Err(err).Msg("Failed to restore file system operations") } else if err = rd.reloadFlows(); err != nil { log.Error().Err(err).Msg("Failed to reload flows after restore") } return } } jsonData, err := json.Marshal(respPayload) if err != nil { handleError(writer, "Failed to marshal response payload", http.StatusInternalServerError, err) return } handleJSONResponse(writer, jsonData) } } func (rd *HandlingDataManager) handleFlowsValidation() func(http.ResponseWriter, *http.Request) { return func(writer http.ResponseWriter, req *http.Request) { switch req.Method { case http.MethodPost: if err := rd.processFlowsValidation(); err != nil { handleError(writer, fmt.Sprintf("%v", err), http.StatusUnprocessableEntity, err) return } SuccessResponse(writer, "✅ Validation succeeded") default: http.Error( writer, "Unsupported Method", http.StatusMethodNotAllowed, ) } } } func (rd *HandlingDataManager) initializePolicies() error { statusMsg := context_manager.Get().GetStatusMessage() statusMsg.AddMessage(lunarEngine, "Engine: Lunar Policies") shared_config.Validate.RegisterStructValidation( config.ValidateStructLevel, shared_config.Remedy{}, //nolint: exhaustruct shared_config.Diagnosis{}, //nolint: exhaustruct shared_config.PoliciesConfig{}, //nolint: exhaustruct ) err := shared_config.Validate.RegisterValidation( "validateInt", config.ValidateInt, ) if err != nil { return fmt.Errorf("failed to register config validation: %w", err) } configBuildResult, err := config.BuildInitialFromFile() if err != nil { return fmt.Errorf("failed to build initial config: %w", err) } rd.configBuildResult = configBuildResult rd.diagnosisWorker = runner.NewDiagnosisWorker() exporters := rd.configBuildResult.Initial.Config.Exporters rd.policiesServices, err = services.Initialize(rd.writer, rd.proxyTimeout, exporters) if err != nil { return fmt.Errorf("failed to initialize services: %w", err) } legacyMetricsCollectedByPlugin := false for _, dig := range rd.configBuildResult.Initial.Config.Global.Diagnosis { if dig.Config.MetricsCollector != nil { legacyMetricsCollectedByPlugin = true log.Info().Msg("Legacy metrics will be collected by plugin") break } } if !legacyMetricsCollectedByPlugin { log.Info().Msg("Legacy metrics will be collected by Lunar Engine") rd.legacyMetricManager, err = metrics.NewLegacyMetricManager(exporters) if err != nil { log.Error().Err(err).Msg("Failed to initialize legacy metric manager") } } ctxMng := context_manager.Get() watcher, err := failsafe.NewDiagnosisFailsafeStateChangeWatcher( rd.GetTxnPoliciesAccessor(), ctxMng.GetClock(), ) rd.diagnosisWatcher = watcher if err != nil { log.Panic(). Stack(). Err(err). Msg("Could not create diagnosis failsafe state change watcher") } rd.diagnosisWatcher.RunInBackground() return nil } func (rd *HandlingDataManager) buildHAProxyFlowsEndpointsRequest() *config.HAProxyEndpointsRequest { if !rd.isStreamsEnabled { return &config.HAProxyEndpointsRequest{} } manageAll := false bodyMessageForAll := false reqCaptureForAll := false managedEndpoints := []*config.HAProxyEndpointData{} for _, filters := range rd.stream.GetSupportedFilters() { if len(filters) == 0 { continue } requirements := &stream_types.ProcessorRequirement{} for _, filter := range filters { adminFilter := filter.(internal_types.FlowFilterI) // If any filter requires body, we will set the flag to true filterRequirements := adminFilter.GetRequirements() requirements.IsBodyRequired = requirements.IsBodyRequired || filterRequirements.IsBodyRequired requirements.IsReqCaptureRequired = requirements.IsReqCaptureRequired || filterRequirements.IsReqCaptureRequired manageAll = manageAll || filter.IsAnyURLAccepted() bodyMessageForAll = bodyMessageForAll || (manageAll && requirements.IsBodyRequired) reqCaptureForAll = reqCaptureForAll || (manageAll && requirements.IsReqCaptureRequired) } for _, method := range filters[0].GetSupportedMethods() { for _, url := range filters[0].GetURLs() { managedEndpoints = append(managedEndpoints, config.HaproxyEndpointFormat(method, url, requirements)) } } } return &config.HAProxyEndpointsRequest{ ManageAll: manageAll, BodyNeededForAll: bodyMessageForAll, ReqCaptureForAll: reqCaptureForAll, ManagedEndpoints: managedEndpoints, } } func (rd *HandlingDataManager) initializeDoctor( telemetryWriter *logging.LunarLogger, ) error { ctxManager := context_manager.Get() getLastSuccessfulHubCommunication := func() *time.Time { return nil } if rd.lunarHub != nil { getLastSuccessfulHubCommunication = func() *time.Time { return rd.lunarHub.LastSuccessfulCommunication() } } doctorInstance, err := doctor.NewDoctor( ctxManager, getLastSuccessfulHubCommunication, log.Logger, ) statusMsg := context_manager.Get().GetStatusMessage() if err != nil { statusMsg.AddMessage(lunarEngine, "Doctor: Initialization failed") return err } // Set-up periodic reports doctorReportInterval, err := environment.GetDoctorReportInterval() if err != nil { log.Warn(). Stack(). Err(err). Msgf("Could not get doctor report interval, will use default of %v", environment.DoctorReportIntervalMinDefault) doctorReportInterval = environment.DoctorReportIntervalMinDefault } if doctorReportInterval > doctor.MaxDoctorReportInterval { log.Warn(). Msgf("Doctor report interval (%v) is too high, setting it to %v", doctorReportInterval, doctor.MaxDoctorReportInterval) doctorReportInterval = doctor.MaxDoctorReportInterval } doctor.ReportPeriodicallyInBackground( doctorInstance, doctorReportInterval, telemetryWriter, ctxManager.GetClock(), ) statusMsg.AddMessage(lunarEngine, "Doctor: Initialized") rd.doctor = doctorInstance return nil } func (rd *HandlingDataManager) reloadFlows() error { if err := rd.processFlowsValidation(); err != nil { return err } err := rd.initializeStreams() if err != nil { return fmt.Errorf("💔 Failed to load flows: %v", err) } err = rd.metricManager.ReloadMetricsConfig() if err != nil { return fmt.Errorf("failed to load metrics config: %v", err) } rd.metricManager.UpdateMetricsProviderForFlow(rd.stream) return nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server