Genkit MCP

Apache 2.0
/** * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import { afterAll, beforeAll, beforeEach, describe, expect, it, jest, } from '@jest/globals'; import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; import * as assert from 'assert'; import { GenerateResponseData, Genkit, genkit, z } from 'genkit'; import { SPAN_TYPE_ATTR, appendSpan } from 'genkit/tracing'; import { Writable } from 'stream'; import { __addTransportStreamForTesting, __forceFlushSpansForTesting, __getSpanExporterForTesting, __useJsonFormatForTesting, enableGoogleCloudTelemetry, } from '../src/index.js'; jest.mock('../src/auth.js', () => { const original = jest.requireActual('../src/auth.js'); return { ...(original || {}), resolveCurrentPrincipal: jest.fn().mockImplementation(() => { return Promise.resolve({ projectId: 'test', serviceAccountEmail: '', }); }), credentialsFromEnvironment: jest.fn().mockImplementation(() => { return Promise.resolve({ projectId: 'test', credentials: { client_email: '', private_key: '-----BEGIN PRIVATE KEY-----', }, }); }), }; }); describe('GoogleCloudLogs with truncation', () => { let logLines = ''; const logStream = new Writable(); logStream._write = (chunk, encoding, next) => { logLines = logLines += chunk.toString(); next(); }; let ai: Genkit; beforeAll(async () => { process.env.GCLOUD_PROJECT = 'test'; process.env.GENKIT_ENV = 'dev'; __useJsonFormatForTesting(); __addTransportStreamForTesting(logStream); await enableGoogleCloudTelemetry({ projectId: 'test', forceDevExport: false, metricExportIntervalMillis: 100, metricExportTimeoutMillis: 100, }); ai = genkit({ // Force GCP Plugin to use in-memory metrics exporter plugins: [], }); await waitForLogsInit(ai, logLines); }); beforeEach(async () => { logLines = ''; __getSpanExporterForTesting().reset(); }); afterAll(async () => { await ai.stopServers(); }); it('truncates large output logs', async () => { const testModel = createModel(ai, 'testModel', async () => { return { message: { role: 'user', content: [ { text: 'r'.repeat(130_000), }, ], }, finishReason: 'stop', usage: { inputTokens: 10, outputTokens: 14, inputCharacters: 8, outputCharacters: 16, inputImages: 1, outputImages: 3, }, }; }); const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => { return await'sub1', async () => { return await'sub2', async () => { return await ai.generate({ model: testModel, prompt: `${input} prompt`, config: { temperature: 1.0, topK: 3, topP: 5, maxOutputTokens: 7, }, }); }); }); }); await testFlow('test'); await getExportedSpans(); const logMessages = await getLogs(1, 100, logLines); const logObjects = => JSON.parse(l as string)); const logObjectMessages = (structuredLog) => structuredLog.message ); expect(logObjectMessages).toContain('Output[testFlow, testFlow]'); => { if (structuredLog.message === 'Output[testFlow, testFlow]') { expect(structuredLog.content.length).toBe(128_000); } }); }); it('truncates large input logs', async () => { const testModel = createModel(ai, 'testModel', async () => { return { message: { role: 'user', content: [ { text: 'response', }, ], }, finishReason: 'stop', usage: { inputTokens: 10, outputTokens: 14, inputCharacters: 8, outputCharacters: 16, inputImages: 1, outputImages: 3, }, }; }); const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => { return await'sub1', async () => { return await'sub2', async () => { return await ai.generate({ model: testModel, prompt: `${input} prompt`, config: { temperature: 1.0, topK: 3, topP: 5, maxOutputTokens: 7, }, }); }); }); }); await testFlow('t'.repeat(130_000)); await getExportedSpans(); const logMessages = await getLogs(1, 100, logLines); const logObjects = => JSON.parse(l as string)); const logObjectMessages = (structuredLog) => structuredLog.message ); expect(logObjectMessages).toContain('Input[testFlow, testFlow]'); => { if (structuredLog.message === 'Input[testFlow, testFlow]') { expect(structuredLog.content.length).toBe(128_000); } }); }); it.only('truncates large model names', async () => { const testModel = createModel(ai, 'm'.repeat(2046), async () => { return { message: { role: 'user', content: [ { text: 'response', }, ], }, finishReason: 'stop', usage: { inputTokens: 10, outputTokens: 14, inputCharacters: 8, outputCharacters: 16, inputImages: 1, outputImages: 3, }, }; }); const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => { return await'sub1', async () => { return await'sub2', async () => { return await ai.generate({ model: testModel, prompt: `${input} prompt`, config: { temperature: 1.0, topK: 3, topP: 5, maxOutputTokens: 7, }, }); }); }); }); await testFlow('test'); await getExportedSpans(); const logMessages = await getLogs(1, 100, logLines); const logObjects = => JSON.parse(l as string)); const logObjectModels = (structuredLog) => structuredLog.model ); expect(logObjectModels).toContain('m'.repeat(1024)); }); }); describe('GoogleCloudLogs', () => { let logLines = ''; const logStream = new Writable(); logStream._write = (chunk, encoding, next) => { logLines = logLines += chunk.toString(); next(); }; let ai: Genkit; beforeAll(async () => { process.env.GCLOUD_PROJECT = 'test'; process.env.GENKIT_ENV = 'dev'; __addTransportStreamForTesting(logStream); await enableGoogleCloudTelemetry({ projectId: 'test', forceDevExport: false, metricExportIntervalMillis: 100, metricExportTimeoutMillis: 100, }); ai = genkit({ // Force GCP Plugin to use in-memory metrics exporter plugins: [], }); await waitForLogsInit(ai, logLines); }); beforeEach(async () => { logLines = ''; __getSpanExporterForTesting().reset(); }); afterAll(async () => { await ai.stopServers(); }); it('writes path logs', async () => { const testFlow = createFlow(ai, 'testFlow'); await testFlow(); await getExportedSpans(); const logMessages = await getLogs(1, 100, logLines); assert.equal(logMessages.includes('[info] Paths[testFlow]'), true); }); it('writes error logs', async () => { const testFlow = createFlow(ai, 'testFlow', async () => { const nothing: { missing?: any } = { missing: 1 }; delete nothing.missing; return nothing.missing.explode; }); assert.rejects(async () => { await testFlow(); }); await getExportedSpans(); const logMessages = await getLogs(1, 100, logLines); assert.equal( logMessages.includes( "[error] Error[testFlow, TypeError] Cannot read properties of undefined (reading 'explode')" ), true ); }, 10000); //timeout it('writes generate logs', async () => { const testModel = createModel(ai, 'testModel', async () => { return { message: { role: 'user', content: [ { text: 'response', }, ], }, finishReason: 'stop', usage: { inputTokens: 10, outputTokens: 14, inputCharacters: 8, outputCharacters: 16, inputImages: 1, outputImages: 3, }, }; }); const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => { return await'sub1', async () => { return await'sub2', async () => { return await ai.generate({ model: testModel, prompt: `${input} prompt`, config: { temperature: 1.0, topK: 3, topP: 5, maxOutputTokens: 7, }, }); }); }); }); await testFlow('test'); await getExportedSpans(); const logMessages = await getLogs(1, 100, logLines); assert.equal( logMessages.includes( '[info] Config[testFlow > sub1 > sub2 > generate > testModel, testModel]' ), true ); assert.equal( logMessages.includes( '[info] Input[testFlow > sub1 > sub2 > generate > testModel, testModel]' ), true ); assert.equal( logMessages.includes( '[info] Output[testFlow > sub1 > sub2 > generate > testModel, testModel]' ), true ); assert.equal( logMessages.includes('[info] Input[testFlow, testFlow]'), true ); assert.equal( logMessages.includes('[info] Output[testFlow, testFlow]'), true ); }); it('writes user feedback log', async () => { await appendSpan( 'trace1', 'parent1', { name: 'user-feedback', path: '/{flowName}', metadata: { subtype: 'userFeedback', feedbackValue: 'negative', textFeedback: 'terrible', }, }, { [SPAN_TYPE_ATTR]: 'userEngagement' } ); await getExportedSpans(); const logMessages = await getLogs(1, 100, logLines); assert.equal(logMessages.includes('[info] UserFeedback[flowName]'), true); }); it('writes user acceptance log', async () => { await appendSpan( 'trace1', 'parent1', { name: 'user-acceptance', path: '/{flowName}', metadata: { subtype: 'userAcceptance', acceptanceValue: 'rejected' }, }, { [SPAN_TYPE_ATTR]: 'userEngagement' } ); await getExportedSpans(); const logMessages = await getLogs(1, 100, logLines); assert.equal(logMessages.includes('[info] UserAcceptance[flowName]'), true); }); it('writes tool input and output logs', async () => { const echoTool = ai.defineTool( { name: 'echoTool', description: 'echo' }, async (input) => input ); await echoTool('Helllooooo!'); await getExportedSpans(); const logMessages = await getLogs(2, 100, logLines); assert.ok(logMessages.includes('[info] Input[echoTool, echoTool]')); assert.ok(logMessages.includes('[info] Output[echoTool, echoTool]')); }); }); /** Helper to create a flow with no inputs or outputs */ function createFlow( ai: Genkit, name: string, fn: () => Promise<any> = async () => {} ) { return ai.defineFlow( { name, inputSchema: z.void(), outputSchema: z.void(), }, fn ); } function createFlowWithInput( ai: Genkit, name: string, fn: (input: string) => Promise<any> ) { return ai.defineFlow( { name, inputSchema: z.string(), outputSchema: z.any(), }, fn ); } /** * Helper to create a model that returns the value produced by the given * response function. */ function createModel( genkit: Genkit, name: string, respFn: () => Promise<GenerateResponseData> ) { return genkit.defineModel({ name }, (req) => respFn()); } async function waitForLogsInit(genkit: Genkit, logLines: any) { await import('winston'); const testFlow = createFlow(genkit, 'testFlow'); await testFlow(); await getLogs(1, 100, logLines); } async function getLogs( logCount: number, maxAttempts: number, logLines: string ): Promise<String[]> { var attempts = 0; while (attempts++ < maxAttempts) { await new Promise((resolve) => setTimeout(resolve, 100)); const found = logLines .trim() .split('\n') .map((l) => l.trim()); if (found.length >= logCount) { return found; } }`Waiting for logs, but none have been written.`); } /** Polls the in memory metric exporter until the genkit scope is found. */ async function getExportedSpans( maxAttempts: number = 200 ): Promise<ReadableSpan[]> { __forceFlushSpansForTesting(); var attempts = 0; while (attempts++ < maxAttempts) { await new Promise((resolve) => setTimeout(resolve, 50)); const found = __getSpanExporterForTesting().getFinishedSpans(); if (found.length > 0) { return found; } }`Timed out while waiting for spans to be exported.`); }