Skip to main content
Glama
flow_builder.go10.5 kB
package streamflow import ( "fmt" configstate "lunar/engine/streams/config-state" internaltypes "lunar/engine/streams/internal-types" "lunar/engine/streams/processors" publictypes "lunar/engine/streams/public-types" "lunar/engine/streams/resources" "lunar/engine/utils" "github.com/rs/zerolog/log" ) type flowBuilder struct { filterTree internaltypes.FilterTreeI flowReps map[string]internaltypes.FlowRepI foreignRoot *EntryPoint nodeBuilder *graphNodeBuilder processorManager *processors.ProcessorManager resourceManagement *resources.ResourceManagement } // newFlowBuilder creates a new instance of a flow builder. func newFlowBuilder(filterTree internaltypes.FilterTreeI, flowReps map[string]internaltypes.FlowRepI, processorManager *processors.ProcessorManager, resourceManagement *resources.ResourceManagement, ) *flowBuilder { builder := &flowBuilder{ filterTree: filterTree, processorManager: processorManager, resourceManagement: resourceManagement, flowReps: flowReps, } builder.nodeBuilder = newGraphNodeBuilder(builder.flowReps, builder.processorManager) return builder } // build function builds flows from provided FlowRepresentations. func (fb *flowBuilder) build() error { pendingFlows := make(map[string]struct{}) for _, flowRep := range fb.flowReps { if flowRep == nil || flowRep.GetName() == "" { return fmt.Errorf("flow representation is invalid") } if err := fb.buildFlow(flowRep); err != nil { pendingFlows[flowRep.GetName()] = struct{}{} } } // try to build pending flows for flowName := range pendingFlows { flowRep, exists := fb.flowReps[flowName] if !exists { return fmt.Errorf("flow '%s' not found", flowName) } if err := fb.buildFlow(flowRep); err != nil { return fmt.Errorf("failed to build flow %s: %w", flowName, err) } } return nil } // buildFlow builds a flow based on the provided FlowRepresentation. func (fb *flowBuilder) buildFlow(flowRep internaltypes.FlowRepI) error { log.Info().Msgf("Building flow %s", flowRep.GetName()) flow := NewFlow(fb.nodeBuilder, flowRep, fb.resourceManagement) // process request and response connections if err := fb.buildConnections( flowRep.GetName(), flow.request, flowRep.GetFlow().GetRequest()); err != nil { return fmt.Errorf("failed to build connections for request flow: %w", err) } if err := fb.buildConnections( flowRep.GetName(), flow.response, flowRep.GetFlow().GetResponse()); err != nil { return fmt.Errorf("failed to build connections for response flow: %w", err) } // validate the flow if err := validateFlow(flow); err != nil { return err } // add the flow to the filter tree log.Info().Msgf("Adding %s with filter on %v to filter tree", flowRep.GetName(), flowRep.GetFilter().GetURLs(), ) if err := fb.filterTree.AddFlow(flow); err != nil { return fmt.Errorf("failed to add flow %s to filter tree: %w", flowRep.GetName(), err) } configstate.Get().AddFlow(flow) return nil } // buildConnections builds connections for the provided FlowDirection. func (fb *flowBuilder) buildConnections( currentFlowName string, flowDir *FlowDirection, connections []internaltypes.FlowConnRepI, ) error { for _, conn := range connections { if err := fb.buildConnection(currentFlowName, flowDir, conn); err != nil { return err } } return nil } // validateCondition validates the condition for a processor. func (fb *flowBuilder) validateCondition( streamType publictypes.StreamType, flowName string, procRef internaltypes.ProcessorRefI, ) error { procDef := fb.processorManager.GetProcessorDefinitionByKey(flowName, procRef) if procDef == nil { return nil } return procDef.CheckCondition(procRef.GetCondition(), streamType) } // buildConnection builds a connection between two nodes. func (fb *flowBuilder) buildConnection( currentFlowName string, flowDir *FlowDirection, conn internaltypes.FlowConnRepI, ) error { if !utils.IsInterfaceNil(conn.GetFrom().GetProcessor()) { procRef := conn.GetFrom().GetProcessor() if err := fb.validateCondition(flowDir.flowType, currentFlowName, procRef); err != nil { return fmt.Errorf("invalid condition for processor %s: %w", procRef.GetName(), err) } } // connections to Processor if !utils.IsInterfaceNil(conn.GetTo().GetProcessor()) { // Processor -> Processor if !utils.IsInterfaceNil(conn.GetFrom().GetProcessor()) { return fb.connectProcessors(currentFlowName, flowDir, conn) } // Stream -> Processor if !utils.IsInterfaceNil(conn.GetFrom().GetStream()) && conn.GetFrom().GetStream().GetAt() == publictypes.StreamStart { return fb.connectStreamToProcessor(currentFlowName, flowDir, conn) } // Flow -> Processor if !utils.IsInterfaceNil(conn.GetFrom().GetFlow()) && conn.GetFrom().GetFlow().GetAt() == internaltypes.FlowEnd { return fb.connectFlowToProcessor(currentFlowName, flowDir, conn) } } // connections from Processor if !utils.IsInterfaceNil(conn.GetFrom().GetProcessor()) { // Processor -> Stream if !utils.IsInterfaceNil(conn.GetTo().GetStream()) && conn.GetTo().GetStream().GetAt() == publictypes.StreamEnd { return fb.connectProcessorToStream(currentFlowName, flowDir, conn) } // Processor -> Flow if !utils.IsInterfaceNil(conn.GetTo().GetFlow()) && conn.GetTo().GetFlow().GetAt() == internaltypes.FlowStart { return fb.connectProcessorToFlow(currentFlowName, flowDir, conn) } } if !utils.IsInterfaceNil(conn.GetFrom().GetStream()) && !utils.IsInterfaceNil(conn.GetTo().GetStream()) { // Stream -> Stream return nil } return fmt.Errorf("invalid connection configuration") } // connectProcessorToFlow adds a connection between a processor and a flow. func (fb *flowBuilder) connectProcessorToFlow( currentFlowName string, flowDir *FlowDirection, conn internaltypes.FlowConnRepI, ) error { sourceNode, err := flowDir.getOrCreateNode( currentFlowName, conn.GetFrom().GetProcessor()) if err != nil { return err } targetFlowName := conn.GetTo().GetFlow().GetName() // will incorporate nodes/connections from the target flow into the current flow if err := fb.incorporateFlow(targetFlowName, flowDir); err != nil { return fmt.Errorf("failed to incorporate flow %s: %w", targetFlowName, err) } // calling includeFlow had to define root of the flow being incorporated. if fb.foreignRoot == nil { return fmt.Errorf("foreign root node not found") } // setting root of the flow being incorporated as our connection edge := NewConnectionEdge(conn.GetFrom().GetProcessor().GetCondition()) edge.node = fb.foreignRoot.node fb.foreignRoot = nil sourceNode.addEdge(edge) return nil } // connectFlowToProcessor adds a connection between a flow and a processor. func (fb *flowBuilder) connectFlowToProcessor( currentFlowName string, flowDir *FlowDirection, conn internaltypes.FlowConnRepI, ) error { targetNode, err := flowDir.getOrCreateNode(currentFlowName, conn.GetTo().GetProcessor()) if err != nil { return err } root := NewEntryPoint(targetNode) root.flow = conn.GetFrom().GetFlow() flowDir.setAsRoot(root) // flow from which we have connection to this processor sourceFlowName := conn.GetFrom().GetFlow().GetName() if err := fb.incorporateFlow(sourceFlowName, flowDir); err != nil { return fmt.Errorf("failed to incorporate flow %s: %w", sourceFlowName, err) } // calling includeFlow had to define root of the flow being incorporated. if fb.foreignRoot == nil { return fmt.Errorf("foreign root node not found") } // setting root of the flow being incorporated as our root flowDir.setAsRoot(fb.foreignRoot) fb.foreignRoot = nil return nil } // incorporateFlow gets processors and connections from source flow // and incorporates them into specified FlowDirection func (fb *flowBuilder) incorporateFlow(flowName string, targetFlowDir *FlowDirection) error { flowRep, exists := fb.flowReps[flowName] if !exists { return fmt.Errorf("flow '%s' not found", flowName) } // build connections from the source flow and add all to target FlowDirection connections := flowRep.GetFlow().GetFlowConnections(targetFlowDir.flowType) if err := fb.buildConnections(flowName, targetFlowDir, connections); err != nil { return fmt.Errorf("failed to build connections for flow %s: %w", flowName, err) } return nil } // connectProcessorToStream adds a connection between a processor and a stream. func (fb *flowBuilder) connectProcessorToStream( currentFlowName string, flowDir *FlowDirection, conn internaltypes.FlowConnRepI, ) error { sourceNode, err := flowDir.getOrCreateNode( currentFlowName, conn.GetFrom().GetProcessor()) if err != nil { return err } edge := NewConnectionEdge(conn.GetFrom().GetProcessor().GetCondition()) if flowDir.flowType.IsRequestType() && sourceNode.flowGraphName != flowDir.flowName { // case when processor is really connected to another flow and not to stream // in this case, we will connect this node to the root node of the connected flow if flowDir.root == nil { return fmt.Errorf("root node not found for flow %s", flowDir.flowName) } edge.node = flowDir.root.node } else { edge.stream = conn.GetTo().GetStream() } sourceNode.addEdge(edge) return nil } // addStreamConnection adds a connection between a stream and a processor. func (fb *flowBuilder) connectStreamToProcessor( currentFlowName string, flowDir *FlowDirection, conn internaltypes.FlowConnRepI, ) error { targetNode, err := flowDir.getOrCreateNode(currentFlowName, conn.GetTo().GetProcessor()) if err != nil { return err } root := NewEntryPoint(targetNode) root.stream = conn.GetFrom().GetStream() // root will be set if node is from the same flow (and not being incorporated from another flow) if flowDir.flowName == targetNode.flowGraphName { flowDir.setAsRoot(root) } else { fb.foreignRoot = root } return nil } // addProcessorConnection adds a connection between processors. func (fb *flowBuilder) connectProcessors( currentFlowName string, flowDir *FlowDirection, conn internaltypes.FlowConnRepI, ) error { sourceNode, err := flowDir.getOrCreateNode( currentFlowName, conn.GetFrom().GetProcessor()) if err != nil { return err } targetNode, err := flowDir.getOrCreateNode(currentFlowName, conn.GetTo().GetProcessor()) if err != nil { return err } edge := NewConnectionEdge(conn.GetFrom().GetProcessor().GetCondition()) edge.node = targetNode sourceNode.addEdge(edge) return nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server