Skip to main content
Glama
streams_test.go18.4 kB
package streams import ( "errors" "fmt" lunar_messages "lunar/engine/messages" stream_config "lunar/engine/streams/config" test_processors "lunar/engine/streams/flow/test-processors" internal_types "lunar/engine/streams/internal-types" lunar_context "lunar/engine/streams/lunar-context" metrics_data "lunar/engine/streams/metrics-data" "lunar/engine/streams/processors" filter_processor "lunar/engine/streams/processors/filter-processor" public_types "lunar/engine/streams/public-types" stream_types "lunar/engine/streams/types" "lunar/engine/utils/environment" context_manager "lunar/toolkit-core/context-manager" "os" "path/filepath" "testing" "time" "github.com/stretchr/testify/require" ) var sharedState = lunar_context.NewMemoryState[[]byte]() func TestMain(m *testing.M) { prevVal := environment.SetProcessorsDirectory(filepath.Join("flow", "test-processors")) context_manager.Get().SetMockClock() // Run the tests code := m.Run() // Clean up if necessary environment.SetProcessorsDirectory(prevVal) // Exit with the code from the tests os.Exit(code) } func setFlowRepDirectory(path string) string { return environment.SetStreamsFlowsDirectory(path) } func revertFlowRepDirectory(path string) { environment.SetStreamsFlowsDirectory(path) } func TestNewStream(t *testing.T) { stream, err := NewStream() require.NoError(t, err, "Failed to create stream") require.NotNil(t, stream, "stream is nil") require.NotNil(t, stream.apiStreams, "APIStreams is nil") require.NotNil(t, stream.filterTree, "filterTree is nil") } func TestRequestBodyFromResponseStream(t *testing.T) { reqBody := "request body" resBody := "response body" seqID := "1234" apiStreamReq := stream_types.NewAPIStream("APIStreamName", public_types.StreamTypeRequest, sharedState) apiStreamReq.SetRequest(stream_types.NewRequest(lunar_messages.OnRequest{ Method: "GET", Scheme: "https", SequenceID: seqID, URL: "maps.googleapis.com/maps/api/geocode/json", Headers: map[string]string{}, RawBody: []byte(reqBody), })) apiStreamReq.StoreRequest() apiStreamRes := stream_types.NewAPIStream("APIStreamName", public_types.StreamTypeResponse, sharedState) apiStreamRes.SetResponse(stream_types.NewResponse(lunar_messages.OnResponse{ Method: "GET", SequenceID: seqID, URL: "maps.googleapis.com/maps/api/geocode/json", Headers: map[string]string{}, RawBody: []byte(resBody), })) req := apiStreamRes.GetRequest() fmt.Printf("Request body: %+v\v", req) fmt.Printf("Request: %s\v", req.GetBody()) require.Equal(t, reqBody, apiStreamRes.GetRequest().GetBody(), "Request body is not correct") require.Equal(t, resBody, apiStreamRes.GetResponse().GetBody(), "Response body is not correct") } func TestExecuteFlows(t *testing.T) { procMng := createTestProcessorManager( t, []string{ "removePII", "readCache", "checkLimit", "generateResponse", "globalStream", "writeCache", "LogAPM", "readXXX", "writeXXX", }, ) stream, err := NewStream() require.NoError(t, err, "Failed to create stream") stream.processorsManager = procMng flowReps := createFlowRepresentation(t, "2-flows-test*") defer revertFlowRepDirectory(setFlowRepDirectory(filepath.Join("flow", "test-cases", "2-flows-test-case"))) err = stream.Initialize() require.NoError(t, err, "Failed to create flows") err = stream.createFlows(flowReps) require.NoError(t, err, "Failed to create flows") apiStream := stream_types.NewAPIStream("APIStreamName", public_types.StreamTypeRequest, sharedState) apiStream.SetResponse(stream_types.NewResponse(lunar_messages.OnResponse{ Status: 200, })) apiStream.SetRequest(stream_types.NewRequest(lunar_messages.OnRequest{ Method: "GET", Scheme: "https", URL: "maps.googleapis.com/maps/api/geocode/json", Headers: map[string]string{}, })) flowActions := &stream_config.StreamActions{ Request: &stream_config.RequestStream{}, Response: &stream_config.ResponseStream{}, } err = stream.ExecuteFlow(apiStream, flowActions) require.NoError(t, err, "Failed to execute flow") apiStream.SetType(public_types.StreamTypeResponse) err = stream.ExecuteFlow(apiStream, flowActions) require.NoError(t, err, "Failed to execute flow") // Test for 3 flows procMng = createTestProcessorManager( t, []string{ "removePII", "readCache", "checkLimit", "generateResponse", "globalStream", "writeCache", "LogAPM", "readXXX", "writeXXX", }, ) stream, err = NewStream() require.NoError(t, err, "Failed to create stream") stream.processorsManager = procMng flowReps = createFlowRepresentation(t, "3-flows*") setFlowRepDirectory(filepath.Join("flow", "test-cases", "3-flows-test-case")) err = stream.Initialize() require.NoError(t, err, "Failed to create flows") err = stream.createFlows(flowReps) require.NoError(t, err, "Failed to create flows") apiStream = stream_types.NewAPIStream("APIStreamName", public_types.StreamTypeRequest, sharedState) apiStream.SetResponse(stream_types.NewResponse(lunar_messages.OnResponse{ Status: 200, })) apiStream.SetRequest(stream_types.NewRequest(lunar_messages.OnRequest{ Method: "GET", Scheme: "https", URL: "www.whatever.com/blabla", Headers: map[string]string{}, })) err = stream.ExecuteFlow(apiStream, flowActions) require.NoError(t, err, "Failed to execute flow") } func TestCreateFlows(t *testing.T) { procMng := createTestProcessorManager( t, []string{ "removePII", "readCache", "checkLimit", "generateResponse", "globalStream", "writeCache", "LogAPM", "readXXX", "writeXXX", }, ) stream, err := NewStream() require.NoError(t, err, "Failed to create stream") stream.processorsManager = procMng flowReps := createFlowRepresentation(t, "2-flows-test*") defer revertFlowRepDirectory(setFlowRepDirectory(filepath.Join("flow", "test-cases", "2-flows-test-case"))) err = stream.Initialize() require.NoError(t, err, "Failed to create flows") err = stream.createFlows(flowReps) require.NoError(t, err, "Failed to create flows") procMng = createTestProcessorManager( t, []string{ "removePII", "readCache", "checkLimit", "generateResponse", "globalStream", "writeCache", "LogAPM", "readXXX", "writeXXX", }, ) stream, err = NewStream() require.NoError(t, err, "Failed to create stream") stream.processorsManager = procMng flowReps = createFlowRepresentation(t, "3-flows*") setFlowRepDirectory(filepath.Join("flow", "test-cases", "3-flows-test-case")) err = stream.Initialize() require.NoError(t, err, "Failed to create flows") err = stream.createFlows(flowReps) require.NoError(t, err, "Failed to create flows") } func TestCreateFlowsWithSameProcessorsName(t *testing.T) { procMng := createTestProcessorManager( t, []string{ "removePII", "readCache", "checkLimit", "generateResponse", "globalStream", "writeCache", "LogAPM", "readCache", "writeCache", }, ) stream, err := NewStream() require.NoError(t, err, "Failed to create stream") stream.processorsManager = procMng flowReps := createFlowRepresentation(t, "2-flows-same-processor-key-test-case") defer revertFlowRepDirectory( setFlowRepDirectory(filepath.Join("flow", "test-cases", "2-flows-same-processor-key-test-case")), ) err = stream.Initialize() require.NoError(t, err, "Failed to create flows") err = stream.createFlows(flowReps) require.NoError(t, err, "Failed to create flows") } func TestEarlyResponseFlow(t *testing.T) { procMng := createTestProcessorManagerWithFactories( t, []string{"readCache", "writeCache", "generateResponse", "LogAPM"}, test_processors.NewMockProcessorUsingCache, test_processors.NewMockProcessor, test_processors.NewMockGenerateResponseProcessor, test_processors.NewMockProcessor, ) stream, err := NewStream() require.NoError(t, err, "Failed to create stream") stream.processorsManager = procMng defer revertFlowRepDirectory(setFlowRepDirectory(filepath.Join("flow", "test-cases", "early-response-test-case"))) err = stream.Initialize() require.NoError(t, err, "Failed to create flows") require.NoError(t, err, "Failed to create flows") contextManager := lunar_context.NewContextManager() globalContext := contextManager.GetGlobalContext() apiStream := stream_types.NewAPIStream("APIStreamName", public_types.StreamTypeRequest, sharedState) apiStream.SetResponse(stream_types.NewResponse(lunar_messages.OnResponse{ Method: "GET", Status: 200, URL: "maps.googleapis.com/maps/api/geocode/json", })) apiStream.SetRequest(stream_types.NewRequest(lunar_messages.OnRequest{ Method: "GET", Scheme: "https", URL: "maps.googleapis.com/maps/api/geocode/json", Headers: map[string]string{}, })) flowActions := &stream_config.StreamActions{ Request: &stream_config.RequestStream{}, Response: &stream_config.ResponseStream{}, } // simulate early response err = globalContext.Set(test_processors.GlobalKeyExecutionOrder, []string{}) require.NoError(t, err, "Failed to set global context value") err = globalContext.Set(test_processors.GlobalKeyCacheHit, true) require.NoError(t, err, "Failed to set global context value") err = stream.ExecuteFlow(apiStream, flowActions) require.NoError(t, err, "Failed to execute flow") execOrder, err := globalContext.Get(test_processors.GlobalKeyExecutionOrder) require.NoError(t, err, "Failed to get global context value") require.Equal(t, []string{"readCache", "generateResponse", "LogAPM"}, execOrder, "Execution order is not correct") // simulate regular execution apiStream.SetType(public_types.StreamTypeRequest) err = globalContext.Set(test_processors.GlobalKeyExecutionOrder, []string{}) require.NoError(t, err, "Failed to set global context value") err = globalContext.Set(test_processors.GlobalKeyCacheHit, false) require.NoError(t, err, "Failed to set global context value") err = stream.ExecuteFlow(apiStream, flowActions) require.NoError(t, err, "Failed to execute flow") execOrder, err = globalContext.Get(test_processors.GlobalKeyExecutionOrder) require.NoError(t, err, "Failed to get global context value") require.Equal(t, []string{"readCache"}, execOrder, "Execution order is not correct") // simulate API provider response apiStream.SetType(public_types.StreamTypeResponse) err = globalContext.Set(test_processors.GlobalKeyExecutionOrder, []string{}) require.NoError(t, err, "Failed to set global context value") err = stream.ExecuteFlow(apiStream, flowActions) require.NoError(t, err, "Failed to execute flow") execOrder, err = globalContext.Get(test_processors.GlobalKeyExecutionOrder) require.NoError(t, err, "Failed to get global context value") require.Equal(t, []string{"writeCache"}, execOrder, "Execution order is not correct") } func TestEarlyResponseFromAnotherFlow(t *testing.T) { procMng := createTestProcessorManager( t, []string{ "removePII", "readCache", "checkLimit", "generateResponse", "globalStream", "writeCache", "LogAPM", "readCache", "writeCache", }, ) stream, err := NewStream() require.NoError(t, err, "Failed to create stream") stream.processorsManager = procMng defer revertFlowRepDirectory(setFlowRepDirectory(filepath.Join("flow", "test-cases", "cross-flow-processor-use"))) err = stream.Initialize() require.NoError(t, err, "Failed to create flows") contextManager := lunar_context.NewContextManager() globalContext := contextManager.GetGlobalContext() apiStream := stream_types.NewAPIStream("APIStreamName", public_types.StreamTypeRequest, sharedState) apiStream.SetResponse(stream_types.NewResponse(lunar_messages.OnResponse{ Status: 200, URL: "maps.googleapis.com/maps/api/geocode/json", })) apiStream.SetRequest(stream_types.NewRequest(lunar_messages.OnRequest{ Method: "GET", Scheme: "https", URL: "maps.googleapis.com/maps/api/geocode/json", Headers: map[string]string{}, })) flowActions := &stream_config.StreamActions{ Request: &stream_config.RequestStream{}, Response: &stream_config.ResponseStream{}, } // simulate early response err = globalContext.Set(test_processors.GlobalKeyExecutionOrder, []string{}) require.NoError(t, err, "Failed to set global context value") err = globalContext.Set(test_processors.GlobalKeyCacheHit, true) require.NoError(t, err, "Failed to set global context value") err = stream.ExecuteFlow(apiStream, flowActions) require.NoError(t, err, "Failed to execute flow") execOrder, err := globalContext.Get(test_processors.GlobalKeyExecutionOrder) require.NoError(t, err, "Failed to get global context value") require.Equal(t, []string{"readCache"}, execOrder, "Execution order is not correct") } func TestFilterProcessorFlow(t *testing.T) { procMng := createTestProcessorManagerWithFactories(t, []string{"Filter", "generateResponse", "LogAPM"}, filter_processor.NewProcessor, test_processors.NewMockGenerateResponseProcessor, test_processors.NewMockProcessor, ) stream, err := NewStream() require.NoError(t, err, "Failed to create stream") stream.processorsManager = procMng _ = createFlowRepresentation(t, "filter*") defer revertFlowRepDirectory(setFlowRepDirectory(filepath.Join("flow", "test-cases", "filter-processor-test-case"))) err = stream.Initialize() require.NoError(t, err, "Failed to create flows") require.NoError(t, err, "Failed to create flows") contextManager := lunar_context.NewContextManager() globalContext := contextManager.GetGlobalContext() apiStream := stream_types.NewAPIStream("APIStreamName", public_types.StreamTypeRequest, sharedState) request := lunar_messages.OnRequest{ Method: "GET", Scheme: "https", URL: "maps.googleapis.com/maps/api/geocode/json", Headers: map[string]string{"x-group": "production"}, } apiStream.SetRequest(stream_types.NewRequest(request)) // execution for production environment err = globalContext.Set(test_processors.GlobalKeyExecutionOrder, []string{}) require.NoError(t, err, "Failed to set global context value") flowActions := &stream_config.StreamActions{ Request: &stream_config.RequestStream{}, Response: &stream_config.ResponseStream{}, } err = stream.ExecuteFlow(apiStream, flowActions) require.NoError(t, err, "Failed to execute flow") execOrder, err := globalContext.Get(test_processors.GlobalKeyExecutionOrder) require.NoError(t, err, "Failed to get global context value") require.Equal(t, []string{"LogAPM"}, execOrder, "Execution order is not correct") // execution for staging environment request.Headers["x-group"] = "staging" apiStream.SetRequest(stream_types.NewRequest(request)) err = globalContext.Set(test_processors.GlobalKeyExecutionOrder, []string{}) require.NoError(t, err, "Failed to set global context value") err = stream.ExecuteFlow(apiStream, flowActions) require.NoError(t, err, "Failed to execute flow") execOrder, err = globalContext.Get(test_processors.GlobalKeyExecutionOrder) require.NoError(t, err, "Failed to get global context value") require.Equal(t, []string{"GenerateResponseTooManyRequests"}, execOrder, "Execution order is not correct") // execution for development environment request.Headers["x-group"] = "development" apiStream.SetRequest(stream_types.NewRequest(request)) err = globalContext.Set(test_processors.GlobalKeyExecutionOrder, []string{}) require.NoError(t, err, "Failed to set global context value") err = stream.ExecuteFlow(apiStream, flowActions) require.NoError(t, err, "Failed to execute flow") execOrder, err = globalContext.Get(test_processors.GlobalKeyExecutionOrder) require.NoError(t, err, "Failed to get global context value") require.Equal(t, []string{"LogAPM"}, execOrder, "Execution order is not correct") } func TestMeasureFlowExecutionTime(t *testing.T) { metrics := metrics_data.NewFlowMetricsData() for i := 0; i < 5; i++ { err := metrics.MeasureFlowExecutionTime("test flow", func() error { time.Sleep(50 * time.Millisecond) // Simulate some work return nil }) require.NoError(t, err) } avgTimeData := metrics.GetAvgFlowExecutionTime() require.Greater(t, avgTimeData.AvgFlowExecutionTime["test flow"], 0.0) // Ensure that the average is within the expected range // Allowing some delta (±10ms) due to possible variations in execution time require.InDelta(t, 50, avgTimeData.AvgFlowExecutionTime["test flow"], 10) // Call the measured function with an error to check that it doesn't affect the average err := metrics.MeasureFlowExecutionTime("test flow", func() error { return errors.New("test error") }) require.Error(t, err) require.Equal(t, "test error", err.Error()) // average execution time should be still the same after the error avgTimeAfterError := metrics.GetAvgFlowExecutionTime() require.InDelta( t, avgTimeData.AvgFlowExecutionTime["test flow"], avgTimeAfterError.AvgFlowExecutionTime["test flow"], 5e-2, // allow ±0.05 "AvgFlowExecutionTime should match within 50ms", ) } func createTestProcessorManager(t *testing.T, processorNames []string) *processors.ProcessorManager { return createTestProcessorManagerWithFactories(t, processorNames, test_processors.NewMockProcessor) } func createTestProcessorManagerWithFactories( t *testing.T, processorNames []string, factories ...processors.ProcessorFactory, ) *processors.ProcessorManager { processorMng := processors.NewProcessorManager(nil) for i, procName := range processorNames { factory := factories[0] if len(factories) > 1 { factory = factories[i] } processorMng.SetFactory(procName, factory) } err := processorMng.Init() require.NoError(t, err) return processorMng } func createFlowRepresentation(t *testing.T, testCase string) map[string]internal_types.FlowRepI { pattern := filepath.Join("flow", "test-cases", testCase, "*.yaml") files, fileErr := filepath.Glob(pattern) require.NoError(t, fileErr, "Failed to find YAML files") flowReps := make(map[string]internal_types.FlowRepI) for _, file := range files { t.Run(filepath.Base(file), func(t *testing.T) { flowRep, err := stream_config.ReadStreamFlowConfig(file) require.NoError(t, err, "Failed to read YAML file") // TODO: We should do it more strict that we load the processor keys for key, proc := range flowRep.Processors { proc.Key = key flowRep.Processors[key] = proc } flowReps[flowRep.Name] = flowRep }) } return flowReps }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server