Skip to main content
Glama
streams.go20.7 kB
package streams import ( "fmt" "lunar/engine/communication" lunar_messages "lunar/engine/messages" "lunar/engine/metrics" streamconfig "lunar/engine/streams/config" streamfilter "lunar/engine/streams/filter" streamflow "lunar/engine/streams/flow" internaltypes "lunar/engine/streams/internal-types" lunar_context "lunar/engine/streams/lunar-context" metrics_data "lunar/engine/streams/metrics-data" "lunar/engine/streams/processors" publictypes "lunar/engine/streams/public-types" "lunar/engine/streams/resources" "lunar/engine/streams/stream" stream_types "lunar/engine/streams/types" "lunar/engine/utils" "lunar/engine/utils/environment" "lunar/toolkit-core/network" "github.com/rs/zerolog/log" ) var _ metrics.FlowMetricsProviderI = &Stream{} // This struct is used to store the node that caused the short circuit // So when we execute the response flows, we can start from the node that caused the short circuit // as intended by the user type shortCircuitOperation struct { node internaltypes.FlowGraphNodeI flow internaltypes.FlowI } type Stream struct { apiStreams *stream.Stream filterTree internaltypes.FilterTreeI processorsManager *processors.ProcessorManager resources *resources.ResourceManagement supportedFilters map[publictypes.ComparableFilter][]publictypes.FilterI loadedConfig network.ConfigurationData lunarHub *communication.HubCommunication metricsData *metrics_data.FlowMetricsData validationMode bool // if true - any error will stop initialization validationPath string } func NewStream() (*Stream, error) { resources, err := resources.NewResourceManagement() if err != nil { log.Err(err).Msg("Failed to create resources") return nil, err } return newStream(resources, metrics_data.NewFlowMetricsData()), nil } func NewValidationStream(dir string) (*Stream, error) { resources, err := resources.NewValidationResourceManagement(dir) if err != nil { log.Err(err).Msg("Failed to create resources for validation") return nil, err } return newStream(resources, metrics_data.NewFlowMetricsData()).WithValidationPath(dir), nil } func (s *Stream) OnError(transactionID string) { onResponse := lunar_messages.OnResponse{LunarName: "OnTransactionError"} //nolint:exhaustruct onResponse.ID = transactionID onResponse.SequenceID = transactionID apiStream := stream_types.NewResponseAPIStream(onResponse, lunar_context.NewMemoryState[[]byte]()) s.resources.OnRequestDrop(apiStream) } func (s *Stream) GetLoadedConfig() network.ConfigurationData { return s.loadedConfig } func (s *Stream) RegisterFlowInvocationsObserver(obs func(*metrics.MetricData)) { s.metricsData.RegisterFlowInvocationsObserver(obs) } func (s *Stream) RegisterRequestsThroughFlowsObserver(obs func(*metrics.MetricData)) { s.metricsData.RegisterRequestsThroughFlowsObserver(obs) } func (s *Stream) GetActiveFlows() *metrics.MetricData { return s.metricsData.GetActiveFlows() } func (s *Stream) GetAvgFlowExecutionTime() *metrics.MetricData { return s.metricsData.GetAvgFlowExecutionTime() } func (s *Stream) GetProcessorExecutionData() *metrics.MetricData { return s.metricsData.GetProcessorExecutionData() } func (s *Stream) WithHub(hub *communication.HubCommunication) *Stream { s.lunarHub = hub return s } // WithValidationMode sets the stream engine to validation mode. // In validation mode, any error will stop initialization. // Used for validation purposes. func (s *Stream) WithValidationMode() *Stream { s.validationMode = true return s } // WithValidationPath sets the path to the validation file. func (s *Stream) WithValidationPath(validationPath string) *Stream { s.validationMode = true s.validationPath = validationPath return s } // Initialize initializes the stream engine by creating flows from the stream config. func (s *Stream) Initialize() error { log.Info().Msg("Initializing stream engine") flowsDefinition, err := s.getFlows() if err != nil { return err } var userFlows []string for key := range flowsDefinition { userFlows = append(userFlows, key) } err = s.attachSystemFlows(flowsDefinition) if err != nil { return fmt.Errorf("failed to attach system flows: %w", err) } for key := range flowsDefinition { log.Info().Msgf("Adding flow %v to filter tree", key) } // Get all supported filters s.supportedFilters = map[publictypes.ComparableFilter][]publictypes.FilterI{} for key, resource := range s.resources.GetFlowsData() { s.supportedFilters[key] = append(s.supportedFilters[key], resource.GetFilter()) } if s.lunarHub != nil { for _, flow := range flowsDefinition { if flow.GetData().IsDataSet() { s.loadedConfig.Data = append(s.loadedConfig.Data, flow.GetData()) } else { log.Trace().Msgf("Empty configuration payload for flow: %s", flow.GetName()) } } } filterToFileName := make(map[publictypes.ComparableFilter]string) for _, flow := range flowsDefinition { s.supportedFilters[flow.GetFilter().ToComparable()] = append( s.supportedFilters[flow.GetFilter().ToComparable()], flow.GetFilter(), ) filterToFileName[flow.GetFilter().ToComparable()] = flow.GetData().FileName } log.Info().Msgf("Supported filters: %+v", s.supportedFilters) // Set path params for all supported filters to be used by the aggregation output plugin for comparableFilter, filters := range s.supportedFilters { for _, filter := range filters { for _, url := range filter.GetURLs() { err = s.resources.SetPathParams(url) if err != nil { fileName := filterToFileName[comparableFilter] return fmt.Errorf("while parsing file %s duplication found: %w."+ " Please fix the error and restart the container", fileName, err) } } } } err = s.resources.GeneratePathParamConfFile() if err != nil { log.Warn().Err(err).Msg("Failed to generate path params configuration file") } log.Trace().Msg("Creating processors") if err = s.processorsManager.Init(); err != nil { return fmt.Errorf("failed to initialize processors: %w", err) } for _, flow := range flowsDefinition { for processorKey, processorData := range flow.GetProcessors() { processor, errCreation := s.processorsManager.CreateProcessor(flow.GetName(), processorData) if errCreation != nil { return fmt.Errorf("failed to create processor %s: %w", processorKey, errCreation) } // Set the processors requirements to the filter. adminFilter := flow.GetFilter().(internaltypes.FlowFilterI) filterRequirements := adminFilter.GetRequirements() procRequirements := processor.GetRequirement() isBodyRequired := filterRequirements.IsBodyRequired || procRequirements.IsBodyRequired isReqCaptureRequired := filterRequirements.IsReqCaptureRequired || procRequirements.IsReqCaptureRequired adminFilter.SetBodyRequired(isBodyRequired) adminFilter.SetReqCaptureRequired(isReqCaptureRequired) } } err = s.createFlows(flowsDefinition) if err != nil { return fmt.Errorf("failed to create flows: %w", err) } s.metricsData.SetActiveFlows(userFlows) return nil } // InitializeHubCommunication notifies the hub about the loaded config of the stream engine func (s *Stream) InitializeHubCommunication() { s.notifyHub() } func (s *Stream) ExecuteFlow( apiStream publictypes.APIStreamI, actions *streamconfig.StreamActions, ) error { log.Trace().Msgf("Executing flow for APIStream %v", apiStream.GetName()) // resetting apiStream instance before flow execution flowsToExecute, found := s.filterTree.GetFlow(apiStream) if !found { log.Debug().Msgf("No flow found for %v", apiStream.GetURL()) return nil } s.apiStreams = stream.NewStream(). WithProcExecutionMeasurement(s.metricsData.GetProcMeasureExecFunc) var err error if apiStream.GetType().IsRequestType() { s.metricsData.IncrementRequestsThroughFlows(apiStream) err = s.executeReq(flowsToExecute, apiStream, actions) } else if apiStream.GetType().IsResponseType() { err = s.executeRes(flowsToExecute, apiStream, actions, nil) } return err } // getFlows gets the flows from the flows directory. // It can be either the directory defined by ENV var or the validation directory. func (s *Stream) getFlows() (map[string]internaltypes.FlowRepI, error) { var flowsDir string if s.validationPath != "" { flowsDir = environment.GetCustomFlowsDirectory(s.validationPath) } if flowsDir == "" { flowsDir = environment.GetStreamsFlowsDirectory() } flowsDefinition, err := streamconfig.GetFlows(flowsDir) if err != nil { if s.validationMode { return nil, fmt.Errorf("failed to get flows: %w", err) } if len(flowsDefinition) > 0 { log.Warn().Err(err).Msg("Part of flows have errors and have been skipped") } else { return nil, fmt.Errorf("failed to get flows: %w", err) } } return flowsDefinition, nil } func (s *Stream) executeReq( flowsToExecute internaltypes.FilterTreeResultI, apiStream publictypes.APIStreamI, actions *streamconfig.StreamActions, ) error { var err error var ShortCircuit *shortCircuitOperation // Execute System Flows if systemStart, found := flowsToExecute.GetSystemFlowStart(); found { for _, systemFlow := range systemStart { if reqFlow := systemFlow.GetRequestDirection(); utils.IsInterfaceNil(reqFlow) { continue } log.Trace().Msgf("Executing system start request flow %v", systemFlow.GetName()) defer systemFlow.CleanExecution() _, err = s.executeFlow(systemFlow, apiStream, actions, nil) if err != nil { return fmt.Errorf("failed to execute system flow: %w", err) } } } // Execute User Flow if userFlows, found := flowsToExecute.GetUserFlow(); found { for _, userFlow := range userFlows { s.metricsData.IncrementFlowInvocations(userFlow.GetName(), apiStream) log.Debug().Msgf("Executing request flow %v", userFlow.GetName()) defer userFlow.CleanExecution() var shortCircuitData *stream.ShortCircuitData shortCircuitData, err = s.executeFlow(userFlow, apiStream, actions, nil) if err != nil { return fmt.Errorf("failed to execute flow: %w", err) } if shortCircuitData != nil { if shortCircuitData.IsInternalShortCircuit { return nil } ShortCircuit = &shortCircuitOperation{ node: shortCircuitData.Node, flow: userFlow, } break } } } // Execute System Flows if systemFlowEnd, found := flowsToExecute.GetSystemFlowEnd(); found { for _, systemFlow := range systemFlowEnd { log.Trace().Msgf("Executing system end request flow %v", systemFlow.GetName()) defer systemFlow.CleanExecution() _, err = s.executeFlow(systemFlow, apiStream, actions, nil) if err != nil { return fmt.Errorf("failed to execute system flow: %w", err) } } } // This is a short circuit, we need to handle the response flows (as it wont be executed otherwise) if ShortCircuit != nil { apiStream.SetType(publictypes.StreamTypeResponse) flowsToExecute, found := s.filterTree.GetFlow(apiStream) if !found { log.Trace().Msgf("No flow found for %v", apiStream.GetURL()) return nil } return s.executeRes(flowsToExecute, apiStream, actions, ShortCircuit) } return nil } func (s *Stream) executeRes( flowsToExecute internaltypes.FilterTreeResultI, apiStream publictypes.APIStreamI, actions *streamconfig.StreamActions, shortCircuit *shortCircuitOperation, ) error { var err error // Execute System Flows if systemFlows, found := flowsToExecute.GetSystemFlowStart(); found { for flowIndex := len(systemFlows) - 1; flowIndex >= 0; flowIndex-- { systemFlow := systemFlows[flowIndex] log.Trace().Msgf("Executing system start response flow %v", systemFlow.GetName()) defer systemFlow.CleanExecution() _, err = s.executeFlow(systemFlow, apiStream, actions, nil) if err != nil { return fmt.Errorf("failed to execute system flow: %w", err) } } } if userFlows, found := flowsToExecute.GetUserFlow(); found { for flowIndex := len(userFlows) - 1; flowIndex >= 0; flowIndex-- { userFlow := userFlows[flowIndex] log.Debug().Msgf("Executing userFlow response flow %v", userFlow.GetName()) defer userFlow.CleanExecution() if shortCircuit != nil && shortCircuit.flow.GetName() == userFlow.GetName() { _, err = s.executeFlow(userFlow, apiStream, actions, shortCircuit.node) } else { _, err = s.executeFlow(userFlow, apiStream, actions, nil) } if err != nil { return fmt.Errorf("failed to execute user flow: %w", err) } } } // Execute System Flows if systemFlows, found := flowsToExecute.GetSystemFlowEnd(); found { for flowIndex := len(systemFlows) - 1; flowIndex >= 0; flowIndex-- { systemFlow := systemFlows[flowIndex] log.Trace().Msgf("Executing system end response flow %v", systemFlow.GetName()) defer systemFlow.CleanExecution() _, err = s.executeFlow(systemFlow, apiStream, actions, nil) if err != nil { return fmt.Errorf("failed to execute system flow: %w", err) } } } s.resources.OnResponseFinish(apiStream) return nil } func (s *Stream) executeFlow( flow internaltypes.FlowI, apiStream publictypes.APIStreamI, actions *streamconfig.StreamActions, startFromNode internaltypes.FlowGraphNodeI, ) (*stream.ShortCircuitData, error) { var shortCircuitData *stream.ShortCircuitData if utils.IsInterfaceNil(flow) { log.Trace().Msgf("No flow found for %v", apiStream.GetURL()) return shortCircuitData, nil } apiStream.SetContext(flow.GetExecutionContext()) log.Trace().Msgf("Flow %v found for %v", flow.GetName(), apiStream.GetURL()) flowDirection := flow.GetDirection(apiStream.GetType()) if !flowDirection.IsDefined() { return shortCircuitData, nil } // TODO: Handle the case where the root is not set. // we need to create the globalStream nodes and set them as default root. // If needed we could replace them with the needed root. start, _ := flowDirection.GetRoot() if utils.IsInterfaceNil(start) { return shortCircuitData, nil } node := start.GetNode() if !utils.IsInterfaceNil(startFromNode) { // If we have a short circuit, we need to start from the node that caused it // We assume that the node (GenerateResponse) has only one edge (one target node) if len(startFromNode.GetEdges()) != 0 { edge := startFromNode.GetEdges()[0] if !edge.IsNodeAvailable() { // if no node is available, it means node connects to stream, meaning 'end of walk' return shortCircuitData, nil } node = edge.GetTargetNode() } else { log.Debug().Msgf("Short circuit node %v has no target node", startFromNode.GetProcessorKey()) } } var err error closureFunc := func() error { shortCircuitData, err = s.apiStreams.ExecuteFlow(flow, apiStream, node, actions) return err } return shortCircuitData, s.metricsData.MeasureFlowExecutionTime(flow.GetName(), closureFunc) } func (s *Stream) GetAPIStreams() *stream.Stream { return s.apiStreams } func (s *Stream) createFlows(flowReps map[string]internaltypes.FlowRepI) error { return streamflow.BuildFlows(s.filterTree, flowReps, s.processorsManager, s.resources) } func (s *Stream) GetSupportedFilters() map[publictypes.ComparableFilter][]publictypes.FilterI { return s.supportedFilters } func (s *Stream) notifyHub() { if s.lunarHub == nil { log.Debug().Msg("No Hub communication, skipping notification") return } log.Debug().Msg("Notifying Hub about loaded config") s.loadedConfig.Data = append(s.loadedConfig.Data, s.resources.GetLoadedConfig()...) s.loadedConfig.Data = append(s.loadedConfig.Data, s.processorsManager.GetLoadedConfig()...) if s.loadedConfig.Data == nil { log.Debug().Msg("No configuration loaded, skipping notification to Hub") return } sent := s.lunarHub.SendDataToHub(&network.ConfigurationMessage{ Event: network.WebSocketEventConfigurationLoad, Data: s.loadedConfig, }) if !sent && !s.lunarHub.IsConnected() { log.Info().Msg( "Failed to send configuration to Hub, will wait for connection and send again in the background", ) go s.notifyHubWhenAvailable() } } func (s *Stream) notifyHubWhenAvailable() { <-s.lunarHub.ConnectionEstablishedChannel() sent := s.lunarHub.SendDataToHub(&network.ConfigurationMessage{ Event: network.WebSocketEventConfigurationLoad, Data: s.loadedConfig, }) if !sent { log.Warn(). Msg("Failed to send configuration to Hub after receiving connection established notification") } else { log.Info().Msg("Configuration sent to Hub after connection established") } } // The following functions are patch functions to disable the quota processor logic. // This is a fast delivery to disable the quota processor logic until we fix the infrastructure. func (s *Stream) getQuotaReferences( flowReps map[string]internaltypes.FlowRepI, ) map[string]struct{} { result := make(map[string]struct{}) for _, flow := range flowReps { for _, processor := range flow.GetProcessors() { for key, value := range processor.ParamMap() { if key == "quota_id" { result[value.GetString()] = struct{}{} s.addParentsQuotaReferences(value, result) continue } } } } return result } func (s *Stream) addParentsQuotaReferences( value *publictypes.ParamValue, result map[string]struct{}, ) { quotaResource, err := s.resources.GetQuota(value.GetString(), "") if err != nil { // No parent quota found! return } parentQuotaID := quotaResource.GetParentID() for parentQuotaID != "" { result[parentQuotaID] = struct{}{} quotaResource, err = s.resources.GetQuota(parentQuotaID, "") if err != nil { // No parent quota found! return } parentQuotaID = quotaResource.GetParentID() } } // The following functions are patch functions to disable the quota processor logic. // This is a fast delivery to disable the quota processor logic until we fix the infrastructure. func (s *Stream) disableQuotaProcessorLogic( quotaFlow internaltypes.FlowRepI, relevantQuotas map[string]struct{}, ) { getParamIndex := func(params []*publictypes.KeyValue, key string) int { for i, param := range params { if param.Key == key { return i } } return -1 // Not found } for _, processor := range quotaFlow.GetProcessors() { params := processor.ParamList() quotaIDIndex := getParamIndex(params, "quota_id") if quotaIDIndex == -1 { continue } // This is a patch to disable the quota processor logic. shouldApplyLogicKey := "should_apply_logic" applyLogicIndex := getParamIndex(params, shouldApplyLogicKey) quotaParam := params[quotaIDIndex] quotaParamValue := quotaParam.GetParamValue() quotaID := quotaParamValue.GetString() if _, ok := relevantQuotas[quotaID]; ok { newKeyValue := &publictypes.KeyValue{ Key: shouldApplyLogicKey, Value: false, } if applyLogicIndex == -1 { // No apply logic param found, add it processor.AddParam(newKeyValue) } else { // Apply logic param found, update it if err := processor.UpdateParam(applyLogicIndex, newKeyValue); err != nil { log.Debug().Msgf("Failed to update param %v for processor %v", newKeyValue.Key, processor.GetName()) } } } } } func (s *Stream) attachSystemFlows( flowReps map[string]internaltypes.FlowRepI, ) error { log.Info().Msg("Attaching standalone system flows") // Here we take all references to quotas from the flows. quotaReferences := s.getQuotaReferences(flowReps) for _, systemFlowRepresentation := range s.resources.GetUnReferencedFlowData() { systemFlowStart := systemFlowRepresentation.GenerateSystemFlowStart() // Here we call to disable if needed the Inc processor logic (System start) s.disableQuotaProcessorLogic(systemFlowStart, quotaReferences) // We did not disable the Dec processor logic (System end) as it is not needed (for now) systemFlowEnd := systemFlowRepresentation.GenerateSystemFlowEnd() if systemFlowStart != nil { log.Info().Msgf("Attaching standalone system flow %s: %v", systemFlowStart.GetType().String(), systemFlowStart.GetName()) flowReps[systemFlowStart.GetName()] = systemFlowStart } if systemFlowEnd != nil { log.Info().Msgf("Attaching standalone system flow %s: %v", systemFlowEnd.GetType().String(), systemFlowEnd.GetName()) flowReps[systemFlowEnd.GetName()] = systemFlowEnd } } return nil } func newStream( resources *resources.ResourceManagement, metricData *metrics_data.FlowMetricsData, ) *Stream { return &Stream{ loadedConfig: network.ConfigurationData{}, apiStreams: stream.NewStream(). WithProcExecutionMeasurement(metricData.GetProcMeasureExecFunc), filterTree: streamfilter.NewFilterTree(), processorsManager: processors.NewProcessorManager(resources), resources: resources, metricsData: metricData, } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server