MCP Terminal Server

/** * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import { afterAll, beforeAll, beforeEach, describe, it, jest, } from '@jest/globals'; import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; import * as assert from 'assert'; import { GenerateResponseData, Genkit, genkit, z } from 'genkit'; import { Writable } from 'stream'; import { __addTransportStreamForTesting, __forceFlushSpansForTesting, __getSpanExporterForTesting, enableGoogleCloudTelemetry, } from '../src/index.js'; jest.mock('../src/auth.js', () => { const original = jest.requireActual('../src/auth.js'); return { ...(original || {}), resolveCurrentPrincipal: jest.fn().mockImplementation(() => { return Promise.resolve({ projectId: 'test', serviceAccountEmail: 'test@test.com', }); }), credentialsFromEnvironment: jest.fn().mockImplementation(() => { return Promise.resolve({ projectId: 'test', credentials: { client_email: 'test@genkit.com', private_key: '-----BEGIN PRIVATE KEY-----', }, }); }), }; }); describe('GoogleCloudLogs no I/O', () => { let logLines = ''; const logStream = new Writable(); logStream._write = (chunk, encoding, next) => { logLines = logLines += chunk.toString(); next(); }; let ai: Genkit; beforeAll(async () => { process.env.GCLOUD_PROJECT = 'test'; process.env.GENKIT_ENV = 'dev'; __addTransportStreamForTesting(logStream); await enableGoogleCloudTelemetry({ projectId: 'test', forceDevExport: false, metricExportIntervalMillis: 100, metricExportTimeoutMillis: 100, disableLoggingInputAndOutput: true, }); ai = genkit({}); // Wait for the telemetry plugin to be initialized await waitForLogsInit(ai, logLines); }); beforeEach(async () => { logLines = ''; __getSpanExporterForTesting().reset(); }); afterAll(async () => { await ai.stopServers(); }); it('writes path logs', async () => { const testFlow = createFlow(ai, 'testFlow'); await testFlow(); await getExportedSpans(); const logMessages = await getLogs(1, 100, logLines); assert.equal(logMessages.includes('[info] Paths[testFlow]'), true); }); it('writes error logs', async () => { const testFlow = createFlow(ai, 'testFlow', async () => { const nothing: { missing?: any } = { missing: 1 }; delete nothing.missing; return nothing.missing.explode; }); assert.rejects(async () => { await testFlow(); }); await getExportedSpans(); const logMessages = await getLogs(1, 100, logLines); assert.equal( logMessages.includes( "[error] Error[testFlow, TypeError] Cannot read properties of undefined (reading 'explode')" ), true ); }, 10000); //timeout it('writes generate logs', async () => { const testModel = createModel(ai, 'testModel', async () => { return { message: { role: 'user', content: [ { text: 'response', }, ], }, finishReason: 'stop', usage: { inputTokens: 10, outputTokens: 14, inputCharacters: 8, outputCharacters: 16, inputImages: 1, outputImages: 3, }, }; }); const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => { return await ai.run('sub1', async () => { return await ai.run('sub2', async () => { return await ai.generate({ model: testModel, prompt: `${input} prompt`, config: { temperature: 1.0, topK: 3, topP: 5, maxOutputTokens: 7, }, }); }); }); }); await testFlow('test'); await getExportedSpans(); const logMessages = await getLogs(1, 100, logLines); assert.equal( logMessages.includes( '[info] Config[testFlow > sub1 > sub2 > generate > testModel, testModel]' ), true ); assert.equal( logMessages.includes( '[info] Input[testFlow > sub1 > sub2 > generate > testModel, testModel]' ), false ); assert.equal( logMessages.includes( '[info] Output[testFlow > sub1 > sub2 > generate > testModel, testModel]' ), false ); assert.equal( logMessages.includes('[info] Input[testFlow, testModel]'), false ); assert.equal( logMessages.includes('[info] Output[testFlow, testModel]'), false ); }); }); /** Helper to create a flow with no inputs or outputs */ function createFlow( ai: Genkit, name: string, fn: () => Promise<any> = async () => {} ) { return ai.defineFlow( { name, inputSchema: z.void(), outputSchema: z.void(), }, fn ); } function createFlowWithInput( ai: Genkit, name: string, fn: (input: string) => Promise<any> ) { return ai.defineFlow( { name, inputSchema: z.string(), outputSchema: z.any(), }, fn ); } /** * Helper to create a model that returns the value produced by the given * response function. */ function createModel( genkit: Genkit, name: string, respFn: () => Promise<GenerateResponseData> ) { return genkit.defineModel({ name }, (req) => respFn()); } async function waitForLogsInit(genkit: Genkit, logLines: any) { await import('winston'); const testFlow = createFlow(genkit, 'testFlow'); await testFlow(); await getLogs(1, 100, logLines); } async function getLogs( logCount: number, maxAttempts: number, logLines: string ): Promise<String[]> { var attempts = 0; while (attempts++ < maxAttempts) { await new Promise((resolve) => setTimeout(resolve, 100)); const found = logLines .trim() .split('\n') .map((l) => l.trim()); if (found.length >= logCount) { return found; } } assert.fail(`Waiting for logs, but none have been written.`); } /** Polls the in memory metric exporter until the genkit scope is found. */ async function getExportedSpans( maxAttempts: number = 200 ): Promise<ReadableSpan[]> { __forceFlushSpansForTesting(); var attempts = 0; while (attempts++ < maxAttempts) { await new Promise((resolve) => setTimeout(resolve, 50)); const found = __getSpanExporterForTesting().getFinishedSpans(); if (found.length > 0) { return found; } } assert.fail(`Timed out while waiting for spans to be exported.`); }