Skip to main content
Glama
stream.go4.84 kB
package stream import ( "fmt" streamconfig "lunar/engine/streams/config" internaltypes "lunar/engine/streams/internal-types" publictypes "lunar/engine/streams/public-types" streamtypes "lunar/engine/streams/types" "github.com/rs/zerolog/log" ) type ( ProcessorExecuteFunc func() (streamtypes.ProcessorIO, error) MeasureExecutorFunc func(string, publictypes.APIStreamI, ProcessorExecuteFunc) ( streamtypes.ProcessorIO, error) ) type ShortCircuitData struct { Node internaltypes.FlowGraphNodeI IsInternalShortCircuit bool } type Stream struct { getMeasureProcExecFunc func(string) MeasureExecutorFunc Request *streamconfig.RequestStream Response *streamconfig.ResponseStream } func NewStream() *Stream { return &Stream{ Request: &streamconfig.RequestStream{}, Response: &streamconfig.ResponseStream{}, } } func (s *Stream) WithProcExecutionMeasurement(fnMeasure func(string) MeasureExecutorFunc) *Stream { s.getMeasureProcExecFunc = fnMeasure return s } func (s *Stream) GetRequestStream() *streamconfig.RequestStream { return s.Request } func (s *Stream) GetResponseStream() *streamconfig.ResponseStream { return s.Response } func (s *Stream) ExecuteFlow( flow internaltypes.FlowI, apiStream publictypes.APIStreamI, node internaltypes.FlowGraphNodeI, actions *streamconfig.StreamActions, ) (*ShortCircuitData, error) { closureFunc := func() (streamtypes.ProcessorIO, error) { return node.GetProcessor().Execute(flow.GetName(), apiStream) } var err error var shortCircuitData *ShortCircuitData // internaltypes.FlowGraphNodeI var procIO streamtypes.ProcessorIO if s.getMeasureProcExecFunc != nil { measureFunc := s.getMeasureProcExecFunc(node.GetProcessorKey()) procIO, err = measureFunc(flow.GetName(), apiStream, closureFunc) } else { procIO, err = closureFunc() } if err != nil { return shortCircuitData, fmt.Errorf("failed to execute processor %s: %w", node.GetProcessorKey(), err) } log.Debug().Msgf("Executed processor %s. ProcIO: %+v", node.GetProcessorKey(), procIO) if procIO.ShortCircuit != nil { log.Trace().Msgf("Internal short circuit is used for request %s", apiStream.GetName()) // Short circuit is used to stop the flow execution and return the result if procIO.ShortCircuit.ReqAction != nil { // Short circuit is used to stop the flow execution and return the result actions.Request.Actions = append(actions.Request.Actions, procIO.ShortCircuit.ReqAction) } else if procIO.ShortCircuit.RespAction != nil { // Short circuit is used to stop the flow execution and return the result actions.Response.Actions = append(actions.Response.Actions, procIO.ShortCircuit.RespAction) } shortCircuitData = &ShortCircuitData{IsInternalShortCircuit: true} return shortCircuitData, nil } if apiStream.GetActionsType().IsRequestType() { if procIO.IsRequestActionAvailable() { if procIO.ReqAction.IsEarlyReturnType() { // If the request is early response, we should drop the request slot from the quota // to allow other requests to be processed flow.GetResourceManagement().OnRequestDrop(apiStream) } actions.Request.Actions = append(actions.Request.Actions, procIO.ReqAction) } } else if apiStream.GetActionsType().IsResponseType() { if procIO.IsResponseActionAvailable() { actions.Response.Actions = append(actions.Response.Actions, procIO.RespAction) } } else { return shortCircuitData, fmt.Errorf("unknown stream type: %v", apiStream.GetType()) } // TODO: Detach this and use this node only when executing the responses in streams.go if procIO.Type.IsResponseType() && apiStream.GetType().IsRequestType() { // Case of early response. We should perform walk on response flow. // Walk on response flow should be started from node with key equal to the key of current node node, err = flow.GetResponseDirection().GetNode(node.GetProcessorKey()) if err != nil { return shortCircuitData, fmt.Errorf("failed to get response node: %w", err) } shortCircuitData = &ShortCircuitData{Node: node, IsInternalShortCircuit: false} return shortCircuitData, nil } for _, edge := range node.GetEdges() { if !edge.IsNodeAvailable() { // if no node is available, it means node connects to stream, meaning 'end of walk' continue } // Check if the condition is met. procIO.Name - is name of condition. // Condition can be met either if it's defined, of if it's just empty string - // meaning there is no condition defined (procIO.Name is empty). if edge.GetCondition() == procIO.Name { targetNode := edge.GetTargetNode() if shortCircuitData, err = s.ExecuteFlow(flow, apiStream, targetNode, actions); err != nil { return shortCircuitData, fmt.Errorf("failed to execute flow: %w", err) } } } return shortCircuitData, nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server