Skip to main content
Glama

Genkit MCP

Official
by firebase
path_test.go12.8 kB
// Copyright 2025 Google LLC // SPDX-License-Identifier: Apache-2.0 package googlecloud import ( "context" "testing" "time" "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) func TestNewPathTelemetry(t *testing.T) { pathTel := NewPathTelemetry() assert.NotNil(t, pathTel) assert.NotNil(t, pathTel.pathCounter) assert.NotNil(t, pathTel.pathLatencies) } // TestPathTelemetry_PipelineIntegration verifies that path telemetry // processes failing spans correctly in the full pipeline func TestPathTelemetry_PipelineIntegration(t *testing.T) { // This test verifies that path telemetry works correctly in the full pipeline, // only processing failing spans that are failure sources pathTel := NewPathTelemetry() f := newTestFixture(t, true, pathTel) // Enable logging for path telemetry tests // Set up log capture logBuf := setupLogCapture(t) // Create span using the TracerProvider - this triggers the full pipeline ctx := context.Background() _, span := f.tracer.Start(ctx, "test-path-span") span.SetAttributes( attribute.String("genkit:type", "flow"), // Required for telemetry processing attribute.String("genkit:path", "/{testFlow,t:flow}/{myAction,t:action}"), attribute.Bool("genkit:isFailureSource", true), attribute.String("genkit:state", "error"), attribute.String("genkit:sessionId", "session-123"), attribute.String("genkit:threadName", "thread-456"), ) span.SetStatus(codes.Error, "Test error") span.End() // This triggers the pipeline // Get captured logs logOutput := logBuf.String() // Verify path telemetry processed the failing span assert.Contains(t, logOutput, "Error[") // Verify the span was exported spans := f.waitAndGetSpans() assert.Len(t, spans, 1) } func TestPathTelemetry_MetricCapture(t *testing.T) { // Test that verifies we can capture and verify metric calls using OTel's built-in test reader testCases := []struct { name string attrs map[string]interface{} spanStatus codes.Code expectCounterMetrics bool expectHistogramMetrics bool expectedFeatureName string expectedError string }{ { name: "failing span captures metrics correctly", attrs: map[string]interface{}{ "genkit:type": "flow", "genkit:path": "/{chatFlow,t:flow}/{generateResponse,t:action}", "genkit:isFailureSource": true, "genkit:state": "error", "genkit:sessionId": "session-123", "genkit:threadName": "thread-456", }, spanStatus: codes.Error, expectCounterMetrics: true, expectHistogramMetrics: true, expectedFeatureName: "chatFlow", expectedError: "Error", }, { name: "non-failure-source span captures no metrics", attrs: map[string]interface{}{ "genkit:type": "flow", "genkit:path": "/{testFlow,t:flow}/{myAction,t:action}", "genkit:isFailureSource": false, "genkit:state": "error", }, spanStatus: codes.Error, expectCounterMetrics: false, expectHistogramMetrics: false, }, { name: "success span captures no metrics", attrs: map[string]interface{}{ "genkit:type": "flow", "genkit:path": "/{testFlow,t:flow}/{myAction,t:action}", "genkit:isFailureSource": true, "genkit:state": "success", }, spanStatus: codes.Ok, expectCounterMetrics: false, expectHistogramMetrics: false, }, { name: "span without path captures no metrics", attrs: map[string]interface{}{ "genkit:type": "flow", "genkit:isFailureSource": true, "genkit:state": "error", }, spanStatus: codes.Error, expectCounterMetrics: false, expectHistogramMetrics: false, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // Create a fresh ManualReader for each test case to avoid accumulation reader := sdkmetric.NewManualReader() // Create a MeterProvider with the test reader testMeterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) // Set the global meter provider temporarily for the test originalProvider := otel.GetMeterProvider() otel.SetMeterProvider(testMeterProvider) defer otel.SetMeterProvider(originalProvider) // Create path telemetry (it will use the global meter provider) pathTel := NewPathTelemetry() f := newTestFixture(t, true, pathTel) // Enable logging for telemetry processing f.mockExporter.Reset() // Create span using the TracerProvider - this will flow through path telemetry ctx := context.Background() _, span := f.tracer.Start(ctx, "test-path-span") for key, value := range tc.attrs { switch v := value.(type) { case string: span.SetAttributes(attribute.String(key, v)) case bool: span.SetAttributes(attribute.Bool(key, v)) case int: span.SetAttributes(attribute.Int(key, v)) case int64: span.SetAttributes(attribute.Int64(key, v)) case float64: span.SetAttributes(attribute.Float64(key, v)) } } span.SetStatus(tc.spanStatus, "Test error") span.End() // This triggers the pipeline including path telemetry // Wait for span to be processed spans := f.waitAndGetSpans() assert.Len(t, spans, 1) // Collect metrics using the manual reader var resourceMetrics metricdata.ResourceMetrics err := reader.Collect(ctx, &resourceMetrics) assert.NoError(t, err) // Verify counter metrics if tc.expectCounterMetrics { counterMetric := findMetric(&resourceMetrics, "genkit/feature/path/requests") assert.NotNil(t, counterMetric, "Expected counter metric to be recorded") if counterMetric != nil { expectedAttrs := map[string]interface{}{ "featureName": tc.expectedFeatureName, "status": "failure", "source": "go", } if tc.expectedError != "" { expectedAttrs["error"] = tc.expectedError } verifyCounterMetric(t, counterMetric, expectedAttrs) } } // Verify histogram metrics if tc.expectHistogramMetrics { histogramMetric := findMetric(&resourceMetrics, "genkit/feature/path/latency") assert.NotNil(t, histogramMetric, "Expected histogram metric to be recorded") if histogramMetric != nil { expectedAttrs := map[string]interface{}{ "featureName": tc.expectedFeatureName, "status": "failure", "source": "go", } if tc.expectedError != "" { expectedAttrs["error"] = tc.expectedError } verifyHistogramMetric(t, histogramMetric, expectedAttrs) } } if !tc.expectCounterMetrics && !tc.expectHistogramMetrics { // Should have no path metrics counterMetric := findMetric(&resourceMetrics, "genkit/feature/path/requests") histogramMetric := findMetric(&resourceMetrics, "genkit/feature/path/latency") assert.Nil(t, counterMetric, "Should not have counter metrics") assert.Nil(t, histogramMetric, "Should not have histogram metrics") } }) } } func TestPathTelemetry_ComprehensiveScenarios(t *testing.T) { // Test multiple path telemetry scenarios using the proper pipeline integration pathTel := NewPathTelemetry() f := newTestFixture(t, true, pathTel) // Enable logging for path telemetry tests testCases := []struct { name string attrs map[string]interface{} spanStatus codes.Code expectLog bool expectedText string }{ { name: "failing span with failure source", attrs: map[string]interface{}{ "genkit:type": "flow", "genkit:path": "/{chatFlow,t:flow}/{generateResponse,t:action}", "genkit:isFailureSource": true, "genkit:state": "error", "genkit:sessionId": "session-123", "genkit:threadName": "thread-456", }, spanStatus: codes.Error, expectLog: true, expectedText: "Error[", }, { name: "failing span without failure source", attrs: map[string]interface{}{ "genkit:type": "flow", "genkit:path": "/{testFlow,t:flow}/{myAction,t:action}", "genkit:isFailureSource": false, "genkit:state": "error", }, spanStatus: codes.Error, expectLog: false, expectedText: "", }, { name: "success span skipped", attrs: map[string]interface{}{ "genkit:type": "flow", "genkit:path": "/{testFlow,t:flow}/{myAction,t:action}", "genkit:isFailureSource": true, "genkit:state": "success", }, spanStatus: codes.Ok, expectLog: false, expectedText: "", }, { name: "span without path skipped", attrs: map[string]interface{}{ "genkit:type": "flow", "genkit:isFailureSource": true, "genkit:state": "error", }, spanStatus: codes.Error, expectLog: false, expectedText: "", }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { f.mockExporter.Reset() // Set up log capture logBuf := setupLogCapture(t) // Create span using the TracerProvider - this triggers the full pipeline ctx := context.Background() _, span := f.tracer.Start(ctx, "test-path-span") for key, value := range tc.attrs { switch v := value.(type) { case string: span.SetAttributes(attribute.String(key, v)) case bool: span.SetAttributes(attribute.Bool(key, v)) case int: span.SetAttributes(attribute.Int(key, v)) case int64: span.SetAttributes(attribute.Int64(key, v)) case float64: span.SetAttributes(attribute.Float64(key, v)) } } span.SetStatus(tc.spanStatus, "Test error") span.End() // This triggers the pipeline including path telemetry // Get captured logs logOutput := logBuf.String() // Verify spans were processed spans := f.waitAndGetSpans() assert.Len(t, spans, 1) // Check logging behavior if tc.expectLog { assert.Contains(t, logOutput, tc.expectedText, "Expected log containing %q but got: %q", tc.expectedText, logOutput) } else { assert.NotContains(t, logOutput, "Error[", "Should not log errors for non-qualifying spans") } }) } } func TestPathTelemetry_LatencyVerification(t *testing.T) { // Specific test to verify that latency measurement actually works correctly reader := sdkmetric.NewManualReader() testMeterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) originalProvider := otel.GetMeterProvider() otel.SetMeterProvider(testMeterProvider) defer otel.SetMeterProvider(originalProvider) pathTel := NewPathTelemetry() f := newTestFixture(t, true, pathTel) // Enable logging for telemetry processing ctx := context.Background() _, span := f.tracer.Start(ctx, "latency-test-span") span.SetAttributes( attribute.String("genkit:type", "flow"), // Required for telemetry processing attribute.String("genkit:path", "/{testFlow,t:flow}/{latencyTest,t:action}"), attribute.Bool("genkit:isFailureSource", true), attribute.String("genkit:state", "error"), attribute.String("genkit:sessionId", "session-123"), attribute.String("genkit:threadName", "thread-456"), ) span.SetStatus(codes.Error, "Test latency error") // Add a small delay to ensure measurable latency time.Sleep(2 * time.Millisecond) span.End() // Collect metrics var resourceMetrics metricdata.ResourceMetrics err := reader.Collect(ctx, &resourceMetrics) assert.NoError(t, err) // Verify latency histogram histogramMetric := findMetric(&resourceMetrics, "genkit/feature/path/latency") assert.NotNil(t, histogramMetric, "Expected latency histogram metric") if histogramMetric != nil { histogram, ok := histogramMetric.Data.(metricdata.Histogram[float64]) assert.True(t, ok, "Expected histogram type") if len(histogram.DataPoints) > 0 { dp := histogram.DataPoints[0] // More specific latency assertions assert.Equal(t, uint64(1), dp.Count, "Should have one measurement") assert.GreaterOrEqual(t, dp.Sum, 2.0, "Should have at least 2ms latency due to sleep") assert.Less(t, dp.Sum, 100.0, "Should be less than 100ms for test span") // Verify histogram has reasonable structure assert.NotEmpty(t, dp.BucketCounts, "Should have histogram buckets") assert.NotEmpty(t, dp.Bounds, "Should have bucket boundaries") // At least one bucket should contain our measurement hasNonZeroBucket := false for _, count := range dp.BucketCounts { if count > 0 { hasNonZeroBucket = true break } } assert.True(t, hasNonZeroBucket, "At least one bucket should contain the measurement") } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/firebase/genkit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server