generate_test.go•19.3 kB
// Copyright 2025 Google LLC
// SPDX-License-Identifier: Apache-2.0
package googlecloud
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// TestGenerateTelemetry_PipelineIntegration verifies that generate telemetry
// works correctly in the full pipeline with realistic model generation spans
func TestGenerateTelemetry_PipelineIntegration(t *testing.T) {
genTel := NewGenerateTelemetry()
f := newTestFixture(t, false, genTel)
// Set up test
// Create realistic generate input/output JSON for span attributes
inputJSON := `{"model":"googleai/gemini-2.5-flash","config":{"maxOutputTokens":100,"temperature":0.7},"messages":[{"content":[{"text":"What is the capital of France?"}],"role":"user"}]}`
outputJSON := `{"message":{"content":[{"text":"The capital of France is Paris."}],"role":"model"},"usage":{"inputTokens":8,"outputTokens":7,"inputCharacters":32,"outputCharacters":32,"inputImages":0,"outputImages":0},"latencyMs":245.6}`
// Create span using the TracerProvider - this triggers the full pipeline
ctx := context.Background()
_, span := f.tracer.Start(ctx, "googleai/gemini-2.0-pro-flash")
span.SetAttributes(
attribute.String("genkit:name", "googleai/gemini-2.0-pro-flash"),
attribute.String("genkit:metadata:subtype", "model"),
attribute.String("genkit:path", "/{chatFlow,t:flow}/{generate,t:action}"),
attribute.String("genkit:input", inputJSON),
attribute.String("genkit:output", outputJSON),
attribute.String("genkit:sessionId", "session-123"),
attribute.String("genkit:threadName", "main-thread"),
)
span.End() // This triggers the pipeline
// Verify the span was exported
spans := f.waitAndGetSpans()
assert.Len(t, spans, 1)
}
func TestGenerateTelemetry_MetricCapture(t *testing.T) {
// Test that verifies we can capture and verify all 6 generate metrics
testCases := []struct {
name string
attrs map[string]string
inputJSON string
outputJSON string
expectMetrics bool
expectedStatus string
expectedLatency float64
expectedInputChars int64
expectedOutputChars int64
expectedInputTokens int64
expectedOutputTokens int64
expectedInputImages int64
expectedOutputImages int64
expectedInputVideos int64
expectedOutputVideos int64
expectedInputAudio int64
expectedOutputAudio int64
}{
{
name: "successful generation captures all metrics",
attrs: map[string]string{
"genkit:type": "action",
"genkit:name": "googleai/gemini-2.0-pro-flash",
"genkit:metadata:subtype": "model",
"genkit:path": "/{chatFlow,t:flow}/{generate,t:action}",
},
inputJSON: `{"model":"googleai/gemini-2.5-flash","messages":[{"content":[{"text":"Hello world"}],"role":"user"}]}`,
outputJSON: `{"message":{"content":[{"text":"Hello! How can I help you today?"}],"role":"model"},"usage":{"inputTokens":12,"outputTokens":8,"inputCharacters":11,"outputCharacters":33,"inputImages":1,"outputImages":0},"latencyMs":342.5}`,
expectMetrics: true,
expectedStatus: "success",
expectedLatency: 342.5,
expectedInputChars: 11,
expectedOutputChars: 33,
expectedInputTokens: 12,
expectedOutputTokens: 8,
expectedInputImages: 1,
expectedOutputImages: 0,
expectedInputVideos: 0,
expectedOutputVideos: 0,
expectedInputAudio: 0,
expectedOutputAudio: 0,
},
{
name: "failed generation captures error metrics",
attrs: map[string]string{
"genkit:type": "action",
"genkit:name": "googleai/gemini-2.0-pro-flash",
"genkit:metadata:subtype": "model",
"genkit:path": "/{errorFlow,t:flow}/{generate,t:action}",
},
inputJSON: `{"model":"googleai/gemini-2.5-flash","messages":[{"content":[{"text":"Invalid prompt"}],"role":"user"}]}`,
outputJSON: `{"usage":{"inputCharacters":14}}`,
expectMetrics: true,
expectedStatus: "failure",
expectedInputChars: 14, // Length of "Invalid prompt"
expectedInputVideos: 0,
expectedOutputVideos: 0,
expectedInputAudio: 0,
expectedOutputAudio: 0,
},
{
name: "generation with video and audio captures multimedia metrics",
attrs: map[string]string{
"genkit:type": "action",
"genkit:name": "googleai/gemini-2.0-pro-flash",
"genkit:metadata:subtype": "model",
"genkit:path": "/{multimediaFlow,t:flow}/{generate,t:action}",
},
inputJSON: `{"model":"googleai/gemini-2.5-flash","messages":[{"content":[{"text":"Analyze this video and audio"}],"role":"user"}]}`,
outputJSON: `{"message":{"content":[{"text":"Analysis complete"}],"role":"model"},"usage":{"inputTokens":5,"outputTokens":3,"inputCharacters":26,"outputCharacters":17,"inputImages":0,"outputImages":1,"inputVideos":2,"outputVideos":0,"inputAudioFiles":1,"outputAudioFiles":0},"latencyMs":1250.3}`,
expectMetrics: true,
expectedStatus: "success",
expectedLatency: 1250.3,
expectedInputChars: 26,
expectedOutputChars: 17,
expectedInputTokens: 5,
expectedOutputTokens: 3,
expectedInputImages: 0,
expectedOutputImages: 1,
expectedInputVideos: 2,
expectedOutputVideos: 0,
expectedInputAudio: 1,
expectedOutputAudio: 0,
},
{
name: "non-model span captures no metrics",
attrs: map[string]string{
"genkit:type": "action",
"genkit:name": "myTool",
"genkit:metadata:subtype": "tool",
"genkit:path": "/{testFlow,t:flow}/{myTool,t:action}",
},
inputJSON: `{"model":"googleai/gemini-2.5-flash"}`,
outputJSON: `{}`,
expectMetrics: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create a fresh ManualReader for each test case to avoid accumulation
reader := sdkmetric.NewManualReader()
// Create a MeterProvider with the test reader
testMeterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
// Set the global meter provider temporarily for the test
originalProvider := otel.GetMeterProvider()
otel.SetMeterProvider(testMeterProvider)
defer otel.SetMeterProvider(originalProvider)
// Create generate telemetry (it will use the global meter provider)
genTel := NewGenerateTelemetry()
f := newTestFixture(t, false, genTel)
f.mockExporter.Reset()
// Create span using the TracerProvider - this will flow through generate telemetry
ctx := context.Background()
_, span := f.tracer.Start(ctx, "test-span")
for key, value := range tc.attrs {
span.SetAttributes(attribute.String(key, value))
}
// Set input/output JSON
if tc.inputJSON != "" {
span.SetAttributes(attribute.String("genkit:input", tc.inputJSON))
}
if tc.outputJSON != "" {
span.SetAttributes(attribute.String("genkit:output", tc.outputJSON))
}
// Set error status for failure cases
if tc.expectedStatus == "failure" {
span.SetStatus(codes.Error, "Test error")
}
span.End() // This triggers the pipeline including generate telemetry
// Wait for span to be processed
spans := f.waitAndGetSpans()
assert.Len(t, spans, 1)
// Collect metrics using the manual reader
var resourceMetrics metricdata.ResourceMetrics
err := reader.Collect(ctx, &resourceMetrics)
assert.NoError(t, err)
if tc.expectMetrics {
// Verify request counter
requestMetric := findMetric(&resourceMetrics, "genkit/ai/generate/requests")
assert.NotNil(t, requestMetric, "Expected generate/requests metric")
if requestMetric != nil {
verifyCounterMetric(t, requestMetric, map[string]interface{}{
"modelName": tc.attrs["genkit:name"],
"status": tc.expectedStatus,
"source": "go",
})
}
// Verify character metrics if we have usage data
if tc.expectedInputChars > 0 {
inputCharMetric := findMetric(&resourceMetrics, "genkit/ai/generate/input/characters")
assert.NotNil(t, inputCharMetric, "Expected input/characters metric")
if inputCharMetric != nil {
verifyCounterMetricValue(t, inputCharMetric, tc.expectedInputChars)
}
}
if tc.expectedOutputChars > 0 {
outputCharMetric := findMetric(&resourceMetrics, "genkit/ai/generate/output/characters")
assert.NotNil(t, outputCharMetric, "Expected output/characters metric")
if outputCharMetric != nil {
verifyCounterMetricValue(t, outputCharMetric, tc.expectedOutputChars)
}
}
// Verify token metrics
if tc.expectedInputTokens > 0 {
inputTokenMetric := findMetric(&resourceMetrics, "genkit/ai/generate/input/tokens")
assert.NotNil(t, inputTokenMetric, "Expected input/tokens metric")
if inputTokenMetric != nil {
verifyCounterMetricValue(t, inputTokenMetric, tc.expectedInputTokens)
}
}
if tc.expectedOutputTokens > 0 {
outputTokenMetric := findMetric(&resourceMetrics, "genkit/ai/generate/output/tokens")
assert.NotNil(t, outputTokenMetric, "Expected output/tokens metric")
if outputTokenMetric != nil {
verifyCounterMetricValue(t, outputTokenMetric, tc.expectedOutputTokens)
}
}
// Verify image metrics
if tc.expectedInputImages > 0 {
inputImageMetric := findMetric(&resourceMetrics, "genkit/ai/generate/input/images")
assert.NotNil(t, inputImageMetric, "Expected input/images metric")
if inputImageMetric != nil {
verifyCounterMetricValue(t, inputImageMetric, tc.expectedInputImages)
}
}
if tc.expectedOutputImages > 0 {
outputImageMetric := findMetric(&resourceMetrics, "genkit/ai/generate/output/images")
assert.NotNil(t, outputImageMetric, "Expected output/images metric")
if outputImageMetric != nil {
verifyCounterMetricValue(t, outputImageMetric, tc.expectedOutputImages)
}
}
// Verify video metrics
if tc.expectedInputVideos > 0 {
inputVideoMetric := findMetric(&resourceMetrics, "genkit/ai/generate/input/videos")
assert.NotNil(t, inputVideoMetric, "Expected input/videos metric")
if inputVideoMetric != nil {
verifyCounterMetricValue(t, inputVideoMetric, tc.expectedInputVideos)
}
}
if tc.expectedOutputVideos > 0 {
outputVideoMetric := findMetric(&resourceMetrics, "genkit/ai/generate/output/videos")
assert.NotNil(t, outputVideoMetric, "Expected output/videos metric")
if outputVideoMetric != nil {
verifyCounterMetricValue(t, outputVideoMetric, tc.expectedOutputVideos)
}
}
// Verify audio metrics
if tc.expectedInputAudio > 0 {
inputAudioMetric := findMetric(&resourceMetrics, "genkit/ai/generate/input/audio")
assert.NotNil(t, inputAudioMetric, "Expected input/audio metric")
if inputAudioMetric != nil {
verifyCounterMetricValue(t, inputAudioMetric, tc.expectedInputAudio)
}
}
if tc.expectedOutputAudio > 0 {
outputAudioMetric := findMetric(&resourceMetrics, "genkit/ai/generate/output/audio")
assert.NotNil(t, outputAudioMetric, "Expected output/audio metric")
if outputAudioMetric != nil {
verifyCounterMetricValue(t, outputAudioMetric, tc.expectedOutputAudio)
}
}
// Verify latency histogram
if tc.expectedLatency > 0 {
latencyMetric := findMetric(&resourceMetrics, "genkit/ai/generate/latency")
assert.NotNil(t, latencyMetric, "Expected latency metric")
if latencyMetric != nil {
verifyHistogramMetricValue(t, latencyMetric, tc.expectedLatency)
}
}
} else {
// Should have no generate metrics
requestMetric := findMetric(&resourceMetrics, "genkit/ai/generate/requests")
assert.Nil(t, requestMetric, "Should not have generate metrics for non-model spans")
}
})
}
}
func TestGenerateTelemetry_FilteringLogic(t *testing.T) {
// Test that GenerateTelemetry only processes model subtypes
testCases := []struct {
name string
subtype string
expectProcessing bool
}{
{
name: "model subtype gets processed",
subtype: "model",
expectProcessing: true,
},
{
name: "tool subtype gets skipped",
subtype: "tool",
expectProcessing: false,
},
{
name: "flow subtype gets skipped",
subtype: "flow",
expectProcessing: false,
},
{
name: "embedder subtype gets skipped",
subtype: "embedder",
expectProcessing: false,
},
{
name: "empty subtype gets skipped",
subtype: "",
expectProcessing: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
genTel := NewGenerateTelemetry()
f := newTestFixture(t, false, genTel)
// Create minimal valid input for processing
inputJSON := `{"model":"googleai/gemini-2.5-flash"}`
outputJSON := `{"usage":{}}`
ctx := context.Background()
_, span := f.tracer.Start(ctx, "test-span")
span.SetAttributes(
attribute.String("genkit:name", "test-model"),
attribute.String("genkit:metadata:subtype", tc.subtype),
attribute.String("genkit:path", "/{testFlow,t:flow}/{generate,t:action}"),
attribute.String("genkit:input", inputJSON),
attribute.String("genkit:output", outputJSON),
)
span.End()
// Verify span was processed by pipeline
spans := f.waitAndGetSpans()
assert.Len(t, spans, 1)
})
}
}
func TestGenerateTelemetry_JSONParsingEdgeCases(t *testing.T) {
// Test behavior with malformed JSON, missing data, etc.
testCases := []struct {
name string
inputJSON string
outputJSON string
expectLogs bool
expectPanic bool
}{
{
name: "valid JSON processes normally",
inputJSON: `{"model":"googleai/gemini-2.5-flash","messages":[]}`,
outputJSON: `{"response":{"candidates":[]},"usage":{"inputTokens":5}}`,
expectLogs: true,
expectPanic: false,
},
{
name: "malformed input JSON handled gracefully",
inputJSON: `{"model":"gemini","invalid":}`,
outputJSON: `{"response":{"candidates":[]}}`,
expectLogs: true,
expectPanic: false,
},
{
name: "malformed output JSON handled gracefully",
inputJSON: `{"model":"googleai/gemini-2.5-flash"}`,
outputJSON: `{"response":{"invalid":}`,
expectLogs: true,
expectPanic: false,
},
{
name: "empty JSON strings handled gracefully",
inputJSON: "",
outputJSON: "",
expectLogs: false, // No input means no metrics
expectPanic: false,
},
{
name: "null JSON handled gracefully",
inputJSON: "null",
outputJSON: "null",
expectLogs: true,
expectPanic: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
genTel := NewGenerateTelemetry()
f := newTestFixture(t, false, genTel)
ctx := context.Background()
_, span := f.tracer.Start(ctx, "test-span")
span.SetAttributes(
attribute.String("genkit:name", "test-model"),
attribute.String("genkit:metadata:subtype", "model"),
attribute.String("genkit:path", "/{testFlow,t:flow}/{generate,t:action}"),
)
// Only set JSON attributes if they're not empty
if tc.inputJSON != "" {
span.SetAttributes(attribute.String("genkit:input", tc.inputJSON))
}
if tc.outputJSON != "" {
span.SetAttributes(attribute.String("genkit:output", tc.outputJSON))
}
// Function that should not panic
testFunc := func() {
span.End()
spans := f.waitAndGetSpans()
assert.Len(t, spans, 1)
}
if tc.expectPanic {
assert.Panics(t, testFunc, "Expected panic for malformed data")
} else {
assert.NotPanics(t, testFunc, "Should handle malformed JSON gracefully")
}
})
}
}
func TestGenerateTelemetry_FeatureNameExtraction(t *testing.T) {
// Test feature name extraction from paths and flow context
testCases := []struct {
name string
path string
flowName string
expectedFeature string
}{
{
name: "extracts feature from flow context",
path: "/{chatFlow,t:flow}/{generate,t:action}",
flowName: "chatFlow",
expectedFeature: "chatFlow",
},
{
name: "falls back to path extraction",
path: "/{assistantFlow,t:flow}/{step1,t:flowStep}/{generate,t:action}",
flowName: "",
expectedFeature: "assistantFlow",
},
{
name: "uses fallback for empty path",
path: "",
flowName: "",
expectedFeature: "generate",
},
{
name: "uses fallback for unknown path",
path: "<unknown>",
flowName: "",
expectedFeature: "generate",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
genTel := NewGenerateTelemetry()
f := newTestFixture(t, false, genTel)
// Create minimal valid input for processing
inputJSON := `{"model":"googleai/gemini-2.5-flash"}`
ctx := context.Background()
_, span := f.tracer.Start(ctx, "test-span")
attrs := []attribute.KeyValue{
attribute.String("genkit:name", "test-model"),
attribute.String("genkit:metadata:subtype", "model"),
attribute.String("genkit:path", tc.path),
attribute.String("genkit:input", inputJSON),
}
// Add flow name if provided
if tc.flowName != "" {
attrs = append(attrs, attribute.String("genkit:metadata:flow:name", tc.flowName))
}
span.SetAttributes(attrs...)
span.End()
// Verify span was processed
spans := f.waitAndGetSpans()
assert.Len(t, spans, 1)
})
}
}
// Helper functions for metric verification
func verifyCounterMetricValue(t *testing.T, metric *metricdata.Metrics, expectedValue int64) {
sum, ok := metric.Data.(metricdata.Sum[int64])
assert.True(t, ok, "Expected metric to be a Sum[int64]")
assert.Len(t, sum.DataPoints, 1, "Expected exactly one data point")
if len(sum.DataPoints) > 0 {
assert.Equal(t, expectedValue, sum.DataPoints[0].Value, "Metric value mismatch")
}
}
func verifyHistogramMetricValue(t *testing.T, metric *metricdata.Metrics, expectedValue float64) {
// Try Int64Histogram first (for latency metrics)
if hist, ok := metric.Data.(metricdata.Histogram[int64]); ok {
assert.Len(t, hist.DataPoints, 1, "Expected exactly one data point")
if len(hist.DataPoints) > 0 {
dp := hist.DataPoints[0]
assert.Equal(t, int64(expectedValue), dp.Sum, "Histogram sum mismatch")
assert.Equal(t, uint64(1), dp.Count, "Histogram count should be 1")
}
return
}
// Try Float64Histogram as fallback
if hist, ok := metric.Data.(metricdata.Histogram[float64]); ok {
assert.Len(t, hist.DataPoints, 1, "Expected exactly one data point")
if len(hist.DataPoints) > 0 {
dp := hist.DataPoints[0]
assert.Equal(t, expectedValue, dp.Sum, "Histogram sum mismatch")
assert.Equal(t, uint64(1), dp.Count, "Histogram count should be 1")
}
return
}
t.Errorf("Expected metric to be a Histogram[int64] or Histogram[float64], got %T", metric.Data)
}