path.go•5.38 kB
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
package googlecloud
import (
"context"
"fmt"
"log/slog"
"strings"
"time"
"github.com/firebase/genkit/go/internal"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)
// PathTelemetry implements telemetry collection for error/failure path tracking
type PathTelemetry struct {
// Note: uses feature namespace for path metrics
pathCounter *MetricCounter // genkit/feature/path/requests
pathLatencies *MetricHistogram // genkit/feature/path/latency
}
// NewPathTelemetry creates a new path telemetry module with required metrics
func NewPathTelemetry() *PathTelemetry {
n := func(name string) string { return internalMetricNamespaceWrap("feature", name) }
return &PathTelemetry{
pathCounter: NewMetricCounter(n("path/requests"), MetricCounterOptions{
Description: "Tracks unique flow paths per flow.",
Unit: "1",
}),
pathLatencies: NewMetricHistogram(n("path/latency"), MetricHistogramOptions{
Description: "Latencies per flow path.",
Unit: "ms",
}),
}
}
// Tick processes a span for path telemetry
func (p *PathTelemetry) Tick(span sdktrace.ReadOnlySpan, logInputOutput bool, projectID string) {
ctx := trace.ContextWithSpanContext(context.Background(), span.SpanContext())
attributes := span.Attributes()
path := extractStringAttribute(attributes, "genkit:path")
isFailureSource := extractBoolAttribute(attributes, "genkit:isFailureSource")
state := extractStringAttribute(attributes, "genkit:state")
if path == "" || !isFailureSource || state != "error" {
return
}
sessionID := extractStringAttribute(attributes, "genkit:sessionId")
threadName := extractStringAttribute(attributes, "genkit:threadName")
errorType := "Error"
errorMessage := p.extractErrorMessage(span)
if errorMessage == "" {
if span.Status().Code == codes.Error {
errorMessage = span.Status().Description
}
if errorMessage == "" {
errorMessage = "unknown error"
}
}
errorStack := p.extractErrorStack(span)
latencyMs := p.calculateLatencyMs(span)
pathDimensions := map[string]interface{}{
"featureName": extractOuterFeatureNameFromPath(path),
"status": "failure",
"error": errorType,
"path": path,
"source": "go",
"sourceVersion": internal.Version,
}
p.pathCounter.Add(1, pathDimensions)
p.pathLatencies.Record(latencyMs, pathDimensions)
displayPath := toDisplayPath(path)
sharedMetadata := createCommonLogAttributes(span, projectID)
logData := map[string]interface{}{
"path": displayPath,
"qualifiedPath": path,
"name": errorType,
"stack": errorStack,
"source": "go",
"sourceVersion": internal.Version,
}
if sessionID != "" {
logData["sessionId"] = sessionID
}
if threadName != "" {
logData["threadName"] = threadName
}
for k, v := range sharedMetadata {
logData[k] = v
}
logMessage := fmt.Sprintf("[genkit] Error[%s, %s] %s", displayPath, errorType, errorMessage)
slog.ErrorContext(ctx, logMessage, MetadataKey, logData)
}
// Helper functions
// extractSimplePathFromQualified extracts simple path name from qualified path
// /{simple-error-test,t:flow} -> simple-error-test
func extractSimplePathFromQualified(qualifiedPath string) string {
if qualifiedPath == "" {
return ""
}
path := strings.TrimPrefix(qualifiedPath, "/")
if strings.HasPrefix(path, "{") && strings.Contains(path, "}") {
end := strings.Index(path, "}")
if end > 1 {
content := path[1:end]
if parts := strings.Split(content, ","); len(parts) > 0 {
return parts[0]
}
}
}
return qualifiedPath
}
// calculateLatencyMs calculates the latency in milliseconds from span start/end times
func (p *PathTelemetry) calculateLatencyMs(span sdktrace.ReadOnlySpan) float64 {
startTime := span.StartTime()
endTime := span.EndTime()
if endTime.IsZero() {
endTime = time.Now()
}
duration := endTime.Sub(startTime)
return float64(duration.Nanoseconds()) / 1e6
}
// extractErrorMessage extracts error message from span events
func (p *PathTelemetry) extractErrorMessage(span sdktrace.ReadOnlySpan) string {
for _, event := range span.Events() {
if event.Name == "exception" {
for _, attr := range event.Attributes {
if string(attr.Key) == "exception.message" {
return attr.Value.AsString()
}
}
}
}
return ""
}
// extractErrorStack extracts error stack trace from span events
func (p *PathTelemetry) extractErrorStack(span sdktrace.ReadOnlySpan) string {
for _, event := range span.Events() {
if event.Name == "exception" {
for _, attr := range event.Attributes {
if string(attr.Key) == "exception.message" {
return attr.Value.AsString()
}
}
}
}
return ""
}