Genkit MCP

Apache 2.0
/** * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import { afterAll, beforeAll, beforeEach, describe, expect, it, jest, } from '@jest/globals'; import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; import * as assert from 'assert'; import { Genkit, z } from 'genkit'; import { GenkitBeta, genkit } from 'genkit/beta'; import { appendSpan } from 'genkit/tracing'; import { __forceFlushSpansForTesting, __getSpanExporterForTesting, } from '../src/gcpOpenTelemetry.js'; import { enableGoogleCloudTelemetry } from '../src/index.js'; jest.mock('../src/auth.js', () => { const original = jest.requireActual('../src/auth.js'); return { ...(original || {}), resolveCurrentPrincipal: jest.fn().mockImplementation(() => { return Promise.resolve({ projectId: 'test', serviceAccountEmail: '', }); }), credentialsFromEnvironment: jest.fn().mockImplementation(() => { return Promise.resolve({ projectId: 'test', credentials: { client_email: '', private_key: '-----BEGIN PRIVATE KEY-----', }, }); }), }; }); describe('GoogleCloudTracing', () => { let ai: GenkitBeta; beforeAll(async () => { process.env.GCLOUD_PROJECT = 'test'; process.env.GENKIT_ENV = 'dev'; await enableGoogleCloudTelemetry({ projectId: 'test', forceDevExport: false, }); ai = genkit({}); }); beforeEach(async () => { __getSpanExporterForTesting().reset(); }); afterAll(async () => { await ai.stopServers(); }); it('writes traces', async () => { const testFlow = createFlow(ai, 'testFlow'); await testFlow(); const spans = await getExportedSpans(); assert.equal(spans.length, 1); assert.equal(spans[0].name, 'testFlow'); }); it('Adjusts attributes to support GCP trace filtering', async () => { const testFlow = createFlow(ai, 'testFlow'); await testFlow(); const spans = await getExportedSpans(); // Check some common attributes assert.equal(spans[0].attributes['genkit/name'], 'testFlow'); assert.equal(spans[0].attributes['genkit/type'], 'action'); assert.equal(spans[0].attributes['genkit/metadata/subtype'], 'flow'); // Ensure we have no attributes with ':' because these are awkward to use in // Cloud Trace. const spanAttrKeys = Object.entries(spans[0].attributes).map(([k, v]) => k); for (const key in spanAttrKeys) { assert.equal(key.indexOf(':'), -1); } }); it('sub actions are contained within flows', async () => { const testFlow = createFlow(ai, 'testFlow', async () => { return await'subAction', async () => { return await'subAction2', async () => { return 'done'; }); }); }); await testFlow(); const spans = await getExportedSpans(); assert.equal(spans.length, 3); assert.equal(spans[2].name, 'testFlow'); assert.equal(spans[2].parentSpanId, undefined); assert.equal(spans[1].name, 'subAction'); assert.equal(spans[1].parentSpanId, spans[2].spanContext().spanId); assert.equal(spans[0].name, 'subAction2'); assert.equal(spans[0].parentSpanId, spans[1].spanContext().spanId); }); it('different flows run independently', async () => { const testFlow1 = createFlow(ai, 'testFlow1'); const testFlow2 = createFlow(ai, 'testFlow2'); await testFlow1(); await testFlow2(); const spans = await getExportedSpans(); assert.equal(spans.length, 2); assert.equal(spans[0].parentSpanId, undefined); assert.equal(spans[1].parentSpanId, undefined); }); it('labels failed spans', async () => { const testFlow = createFlow(ai, 'badFlow', async () => { return await'badStep', async () => { throw new Error('oh no!'); }); }); try { await testFlow(); } catch (e) {} const spans = await getExportedSpans(); assert.equal(spans.length, 2); assert.equal(spans[0].name, 'badStep'); assert.equal(spans[0].attributes['genkit/failedSpan'], 'badStep'); assert.equal( spans[0].attributes['genkit/failedPath'], '/{badFlow,t:flow}/{badStep,t:flowStep}' ); assert.equal(spans[1].attributes['genkit/isRoot'], true); assert.equal(spans[1].attributes['genkit/rootState'], 'error'); }); it('labels the root feature', async () => { const testFlow = createFlow(ai, 'niceFlow', async () => { return'niceStep', async () => {}); }); await testFlow(); const spans = await getExportedSpans(); assert.equal(spans[0].name, 'niceStep'); assert.equal(spans[0].attributes['genkit/feature'], undefined); assert.equal(spans[1].name, 'niceFlow'); assert.equal(spans[1].attributes['genkit/feature'], 'niceFlow'); assert.equal(spans[1].attributes['genkit/rootState'], 'success'); }); it('marks the root feature failed when it is the failure', async () => { const testFlow = createFlow(ai, 'failingFlow', async () => { await'good step', async () => {}); throw new Error('oops!'); }); try { await testFlow(); } catch (e) {} const spans = await getExportedSpans(); assert.equal(spans.length, 2); assert.equal(spans[1].name, 'failingFlow'); assert.equal(spans[1].attributes['genkit/failedSpan'], 'failingFlow'); assert.equal( spans[1].attributes['genkit/failedPath'], '/{failingFlow,t:flow}' ); assert.equal(spans[1].attributes['genkit/isRoot'], true); assert.equal(spans[1].attributes['genkit/rootState'], 'error'); }); it('adds the genkit/model label for model actions', async () => { const echoModel = ai.defineModel( { name: 'echoModel', }, async (request) => { return { message: { role: 'model', content: [ { text: 'Echo: ' + request.messages .map((m) => => c.text).join()) .join(), }, ], }, finishReason: 'stop', }; } ); const testFlow = createFlow(ai, 'modelFlow', async () => { return'runFlow', async () => { await ai.generate({ model: echoModel, prompt: 'Testing model telemetry', }); }); }); await testFlow(); const spans = await getExportedSpans(); assert.equal(spans[0].name, 'echoModel'); assert.equal(spans[0].attributes['genkit/model'], 'echoModel'); assert.equal(spans[1].name, 'generate'); assert.equal(spans[2].name, 'runFlow'); assert.equal(spans[3].name, 'modelFlow'); }); it('attaches additional span', async () => { await appendSpan( 'trace1', 'parent1', { name: 'span-name', metadata: { metadata_key: 'metadata_value' } }, { ['label_key']: 'label_value' } ); const spans = await getExportedSpans(); const span = spans.find((it) => === 'span-name'); assert.equal(Object.keys(span?.attributes || {}).length, 3); assert.equal(span?.attributes['genkit/name'], 'span-name'); assert.equal(span?.attributes['label_key'], 'label_value'); assert.equal( span?.attributes['genkit/metadata/metadata_key'], 'metadata_value' ); }); it('writes sessionId and threadName for chats', async () => { const testModel = ai.defineModel({ name: 'testModel' }, async () => { return { message: { role: 'user', content: [ { text: 'response', }, ], }, finishReason: 'stop', usage: { inputTokens: 10, outputTokens: 14, inputCharacters: 8, outputCharacters: 16, inputImages: 1, outputImages: 3, }, }; }); const chat =; await chat.send({ model: testModel, prompt: 'Sending test prompt' }); const spans = await getExportedSpans(); // We should get 3 spans from this chat -- "send" which delegates to "generate" which delegates to our "testModel" // Only the top level span will have the sessionId and threadName until we make sessionId more ubiquitous expect(spans).toHaveLength(3); spans.forEach((span) => { if ( === 'send') { expect(span?.attributes['genkit/sessionId']).not.toBeUndefined(); expect(span?.attributes['genkit/threadName']).not.toBeUndefined(); return; } // Once we make the change to have sessionId on all relevant spans, then these should verify they are populated. expect(span?.attributes['genkit/sessionId']).toBeUndefined(); expect(span?.attributes['genkit/threadName']).toBeUndefined(); }); }); /** Helper to create a flow with no inputs or outputs */ function createFlow( ai: Genkit, name: string, fn: () => Promise<any> = async () => {} ) { return ai.defineFlow( { name, inputSchema: z.void(), outputSchema: z.void(), }, fn ); } /** Polls the in memory metric exporter until the genkit scope is found. */ async function getExportedSpans( maxAttempts: number = 200 ): Promise<ReadableSpan[]> { __forceFlushSpansForTesting(); var attempts = 0; while (attempts++ < maxAttempts) { await new Promise((resolve) => setTimeout(resolve, 50)); const found = __getSpanExporterForTesting().getFinishedSpans(); if (found.length > 0) { return found; } }`Timed out while waiting for spans to be exported.`); } });