Skip to main content
Glama

Genkit MCP

Official
by firebase
generate.go16.9 kB
// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // SPDX-License-Identifier: Apache-2.0 package googlecloud import ( "context" "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "log/slog" "regexp" "strings" "github.com/firebase/genkit/go/ai" "github.com/firebase/genkit/go/internal" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" ) // GenerateTelemetry implements telemetry collection for model generate actions type GenerateTelemetry struct { actionCounter metric.Int64Counter // genkit/ai/generate/requests latencies metric.Int64Histogram // genkit/ai/generate/latency inputCharacters metric.Int64Counter // genkit/ai/generate/input/characters inputTokens metric.Int64Counter // genkit/ai/generate/input/tokens inputImages metric.Int64Counter // genkit/ai/generate/input/images inputVideos metric.Int64Counter // genkit/ai/generate/input/videos inputAudio metric.Int64Counter // genkit/ai/generate/input/audio outputCharacters metric.Int64Counter // genkit/ai/generate/output/characters outputTokens metric.Int64Counter // genkit/ai/generate/output/tokens thinkingTokens metric.Int64Counter // genkit/ai/generate/thinking/tokens outputImages metric.Int64Counter // genkit/ai/generate/output/images outputVideos metric.Int64Counter // genkit/ai/generate/output/videos outputAudio metric.Int64Counter // genkit/ai/generate/output/audio } // NewGenerateTelemetry creates a new generate telemetry module with all required metrics func NewGenerateTelemetry() *GenerateTelemetry { meter := otel.Meter("genkit") actionCounter, _ := meter.Int64Counter("genkit/ai/generate/requests", metric.WithDescription("Counts calls to genkit generate actions."), metric.WithUnit("1")) latencies, _ := meter.Int64Histogram("genkit/ai/generate/latency", metric.WithDescription("Latencies when interacting with a Genkit model."), metric.WithUnit("ms")) inputCharacters, _ := meter.Int64Counter("genkit/ai/generate/input/characters", metric.WithDescription("Counts input characters to any Genkit model."), metric.WithUnit("1")) inputTokens, _ := meter.Int64Counter("genkit/ai/generate/input/tokens", metric.WithDescription("Counts input tokens to a Genkit model."), metric.WithUnit("1")) inputImages, _ := meter.Int64Counter("genkit/ai/generate/input/images", metric.WithDescription("Counts input images to a Genkit model."), metric.WithUnit("1")) inputVideos, _ := meter.Int64Counter("genkit/ai/generate/input/videos", metric.WithDescription("Counts input videos to a Genkit model."), metric.WithUnit("1")) inputAudio, _ := meter.Int64Counter("genkit/ai/generate/input/audio", metric.WithDescription("Counts input audio files to a Genkit model."), metric.WithUnit("1")) outputCharacters, _ := meter.Int64Counter("genkit/ai/generate/output/characters", metric.WithDescription("Counts output characters from a Genkit model."), metric.WithUnit("1")) outputTokens, _ := meter.Int64Counter("genkit/ai/generate/output/tokens", metric.WithDescription("Counts output tokens from a Genkit model."), metric.WithUnit("1")) thinkingTokens, _ := meter.Int64Counter("genkit/ai/generate/thinking/tokens", metric.WithDescription("Counts thinking tokens from a Genkit model."), metric.WithUnit("1")) outputImages, _ := meter.Int64Counter("genkit/ai/generate/output/images", metric.WithDescription("Count output images from a Genkit model."), metric.WithUnit("1")) outputVideos, _ := meter.Int64Counter("genkit/ai/generate/output/videos", metric.WithDescription("Count output videos from a Genkit model."), metric.WithUnit("1")) outputAudio, _ := meter.Int64Counter("genkit/ai/generate/output/audio", metric.WithDescription("Count output audio files from a Genkit model."), metric.WithUnit("1")) return &GenerateTelemetry{ actionCounter: actionCounter, latencies: latencies, inputCharacters: inputCharacters, inputTokens: inputTokens, inputImages: inputImages, inputVideos: inputVideos, inputAudio: inputAudio, outputCharacters: outputCharacters, outputTokens: outputTokens, thinkingTokens: thinkingTokens, outputImages: outputImages, outputVideos: outputVideos, outputAudio: outputAudio, } } // Tick processes a span for generate telemetry func (g *GenerateTelemetry) Tick(span sdktrace.ReadOnlySpan, logInputOutput bool, projectID string) { attributes := span.Attributes() modelName := truncate(extractStringAttribute(attributes, "genkit:name"), 1024) path := extractStringAttribute(attributes, "genkit:path") inputStr := extractStringAttribute(attributes, "genkit:input") outputStr := extractStringAttribute(attributes, "genkit:output") var input ai.GenerateActionOptions var output ai.ModelResponse if inputStr != "" { json.Unmarshal([]byte(inputStr), &input) } if outputStr != "" { json.Unmarshal([]byte(outputStr), &output) } errName := g.extractErrorName(span) featureName := truncate(g.extractFeatureName(attributes, path)) sessionId := extractStringAttribute(attributes, "genkit:sessionId") threadName := extractStringAttribute(attributes, "genkit:threadName") if input.Config != nil { g.recordGenerateActionConfigLogs(span, modelName, featureName, path, &input, projectID, sessionId, threadName) } if inputStr != "" && logInputOutput { g.recordGenerateActionInputLogs(span, modelName, featureName, path, &input, projectID, sessionId, threadName) } if outputStr != "" && logInputOutput { g.recordGenerateActionOutputLogs(span, modelName, featureName, path, &output, projectID, sessionId, threadName) } if featureName == "" || featureName == "<unknown>" { featureName = "generate" } if inputStr != "" { g.recordGenerateActionMetrics(modelName, featureName, path, &output, errName) } } // recordGenerateActionMetrics records all metrics for a generate action func (g *GenerateTelemetry) recordGenerateActionMetrics(modelName, featureName, path string, output *ai.ModelResponse, errName string) { status := "success" if errName != "" { status = "failure" } attrs := []attribute.KeyValue{ attribute.String("modelName", modelName), attribute.String("featureName", featureName), attribute.String("path", path), attribute.String("status", status), attribute.String("source", "go"), attribute.String("sourceVersion", internal.Version), } errorAttrs := attrs if errName != "" { errorAttrs = append(attrs, attribute.String("error", errName)) } g.actionCounter.Add(context.Background(), 1, metric.WithAttributes(errorAttrs...)) if output != nil && output.LatencyMs > 0 { g.latencies.Record(context.Background(), int64(output.LatencyMs), metric.WithAttributes(attrs...)) } if usage := output.Usage; usage != nil { opt := metric.WithAttributes(attrs...) g.inputTokens.Add(context.Background(), int64(usage.InputTokens), opt) g.inputCharacters.Add(context.Background(), int64(usage.InputCharacters), opt) g.inputImages.Add(context.Background(), int64(usage.InputImages), opt) g.inputVideos.Add(context.Background(), int64(usage.InputVideos), opt) g.inputAudio.Add(context.Background(), int64(usage.InputAudioFiles), opt) g.outputTokens.Add(context.Background(), int64(usage.OutputTokens), opt) g.outputCharacters.Add(context.Background(), int64(usage.OutputCharacters), opt) g.thinkingTokens.Add(context.Background(), int64(usage.ThoughtsTokens), opt) g.outputImages.Add(context.Background(), int64(usage.OutputImages), opt) g.outputVideos.Add(context.Background(), int64(usage.OutputVideos), opt) g.outputAudio.Add(context.Background(), int64(usage.OutputAudioFiles), opt) } else { slog.Warn("GenerateTelemetry.Tick: No usage data available", "output_is_nil", output == nil) } } // recordGenerateActionConfigLogs logs configuration information func (g *GenerateTelemetry) recordGenerateActionConfigLogs(span sdktrace.ReadOnlySpan, model, featureName, qualifiedPath string, input *ai.GenerateActionOptions, projectID, sessionID, threadName string) { ctx := trace.ContextWithSpanContext(context.Background(), span.SpanContext()) path := truncatePath(toDisplayPath(qualifiedPath)) sharedMetadata := createCommonLogAttributes(span, projectID) logData := map[string]interface{}{ "model": model, "path": path, "qualifiedPath": qualifiedPath, "featureName": featureName, "source": "go", "sourceVersion": internal.Version, } if sessionID != "" { logData["sessionId"] = sessionID } if threadName != "" { logData["threadName"] = threadName } for k, v := range sharedMetadata { logData[k] = v } if input.Config != nil { if configMap, ok := input.Config.(map[string]interface{}); ok { if maxTokens, exists := configMap["maxOutputTokens"]; exists { logData["maxOutputTokens"] = maxTokens } if stopSeqs, exists := configMap["stopSequences"]; exists { logData["stopSequences"] = stopSeqs } } } logData["source"] = "go" logData["sourceVersion"] = internal.Version message := fmt.Sprintf("[genkit] Config[%s, %s]", path, model) slog.InfoContext(ctx, message, "data", logData) } // recordGenerateActionInputLogs logs input information func (g *GenerateTelemetry) recordGenerateActionInputLogs(span sdktrace.ReadOnlySpan, model, featureName, qualifiedPath string, input *ai.GenerateActionOptions, projectID, sessionID, threadName string) { if input.Messages == nil { return } ctx := trace.ContextWithSpanContext(context.Background(), span.SpanContext()) path := truncatePath(toDisplayPath(qualifiedPath)) sharedMetadata := createCommonLogAttributes(span, projectID) baseLogData := map[string]interface{}{ "model": model, "path": path, "qualifiedPath": qualifiedPath, "featureName": featureName, } if sessionID != "" { baseLogData["sessionId"] = sessionID } if threadName != "" { baseLogData["threadName"] = threadName } for k, v := range sharedMetadata { baseLogData[k] = v } messages := len(input.Messages) for msgIdx, msg := range input.Messages { parts := len(msg.Content) for partIdx, part := range msg.Content { logData := make(map[string]interface{}) for k, v := range baseLogData { logData[k] = v } partCounts := g.toPartCounts(partIdx, parts, msgIdx, messages) logData["content"] = g.toPartLogContent(part) logData["role"] = msg.Role logData["partIndex"] = partIdx logData["totalParts"] = parts logData["messageIndex"] = msgIdx logData["totalMessages"] = messages message := fmt.Sprintf("[genkit] Input[%s, %s] %s", path, model, partCounts) slog.InfoContext(ctx, message, MetadataKey, logData) } } } // recordGenerateActionOutputLogs logs output information func (g *GenerateTelemetry) recordGenerateActionOutputLogs(span sdktrace.ReadOnlySpan, model, featureName, qualifiedPath string, output *ai.ModelResponse, projectID, sessionID, threadName string) { ctx := trace.ContextWithSpanContext(context.Background(), span.SpanContext()) path := truncatePath(toDisplayPath(qualifiedPath)) sharedMetadata := createCommonLogAttributes(span, projectID) baseLogData := map[string]interface{}{ "model": model, "path": path, "qualifiedPath": qualifiedPath, "featureName": featureName, } if sessionID != "" { baseLogData["sessionId"] = sessionID } if threadName != "" { baseLogData["threadName"] = threadName } for k, v := range sharedMetadata { baseLogData[k] = v } var message *ai.Message if output.Message != nil { message = output.Message } if message != nil && message.Content != nil { parts := len(message.Content) for partIdx, part := range message.Content { logData := make(map[string]interface{}) for k, v := range baseLogData { logData[k] = v } partCounts := g.toPartCounts(partIdx, parts, 0, 1) if output.FinishMessage != "" { logData["finishMessage"] = truncate(output.FinishMessage) } logData["content"] = g.toPartLogContent(part) logData["role"] = message.Role logData["partIndex"] = partIdx logData["totalParts"] = parts logData["candidateIndex"] = 0 logData["totalCandidates"] = 1 logData["messageIndex"] = 0 logData["finishReason"] = output.FinishReason message := fmt.Sprintf("[genkit] Output[%s, %s] %s", path, model, partCounts) slog.InfoContext(ctx, message, MetadataKey, logData) } } } // Helper functions func (g *GenerateTelemetry) extractErrorName(span sdktrace.ReadOnlySpan) string { if span.Status().Code == codes.Error { return span.Status().Description } for _, event := range span.Events() { if event.Name == "exception" { for _, attr := range event.Attributes { if string(attr.Key) == "exception.type" { return attr.Value.AsString() } } } } return "" } func (g *GenerateTelemetry) extractFeatureName(attributes []attribute.KeyValue, path string) string { flowName := extractStringAttribute(attributes, "genkit:metadata:flow:name") if flowName != "" { return flowName } pathFeature := extractOuterFeatureNameFromPath(path) return pathFeature } func (g *GenerateTelemetry) toPartCounts(partOrdinal, parts, msgOrdinal, messages int) string { if parts > 1 && messages > 1 { return fmt.Sprintf("(part %s in message %s)", g.xOfY(partOrdinal, parts), g.xOfY(msgOrdinal, messages)) } if parts > 1 { return fmt.Sprintf("(part %s)", g.xOfY(partOrdinal, parts)) } if messages > 1 { return fmt.Sprintf("(message %s)", g.xOfY(msgOrdinal, messages)) } return "" } func (g *GenerateTelemetry) xOfY(x, y int) string { return fmt.Sprintf("%d of %d", x+1, y) } // toPartLogContent processes different part types correctly based on Part.Kind func (g *GenerateTelemetry) toPartLogContent(part *ai.Part) string { switch part.Kind { case ai.PartText: return truncate(part.Text) case ai.PartData: return truncate(part.Text) case ai.PartMedia: return g.toPartLogMedia(part) case ai.PartCustom: if part.Custom != nil { data, _ := json.Marshal(part.Custom) return truncate(string(data)) } case ai.PartToolRequest: if part.ToolRequest != nil { return g.toPartLogToolRequest(part.ToolRequest) } case ai.PartToolResponse: if part.ToolResponse != nil { return g.toPartLogToolResponse(part.ToolResponse) } } return "<unknown format>" } func (g *GenerateTelemetry) toPartLogMedia(part *ai.Part) string { // For media parts, the content is stored in the Text field // and ContentType indicates the media type if strings.HasPrefix(part.Text, "data:") { splitIdx := strings.Index(part.Text, "base64,") if splitIdx < 0 { return "<unknown media format>" } prefix := part.Text[:splitIdx+7] hasher := sha256.New() hasher.Write([]byte(part.Text[splitIdx+7:])) hashedContent := hex.EncodeToString(hasher.Sum(nil)) return fmt.Sprintf("%s<sha256(%s)>", prefix, hashedContent) } return truncate(part.Text) } func (g *GenerateTelemetry) toPartLogToolRequest(tool *ai.ToolRequest) string { var inputText string if str, ok := tool.Input.(string); ok { inputText = str } else { data, _ := json.Marshal(tool.Input) inputText = string(data) } return truncate(fmt.Sprintf("Tool request: %s, ref: %s, input: %s", tool.Name, tool.Ref, inputText)) } func (g *GenerateTelemetry) toPartLogToolResponse(tool *ai.ToolResponse) string { var outputText string if str, ok := tool.Output.(string); ok { outputText = str } else { data, _ := json.Marshal(tool.Output) outputText = string(data) } return truncate(fmt.Sprintf("Tool response: %s, ref: %s, output: %s", tool.Name, tool.Ref, outputText)) } // Utility functions // toDisplayPath converts qualified paths to display paths // Converts /{name1,t:type}/{name2,t:type} to "name1 > name2" func toDisplayPath(qualifiedPath string) string { if qualifiedPath == "" { return "<unknown>" } re := regexp.MustCompile(`\{([^,}]+),[^}]+\}`) matches := re.FindAllStringSubmatch(qualifiedPath, -1) if len(matches) == 0 { return qualifiedPath } var names []string for _, match := range matches { if len(match) > 1 { names = append(names, match[1]) } } return strings.Join(names, " > ") }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/firebase/genkit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server