MCP Terminal Server
by dillip285
/**
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
afterAll,
beforeAll,
beforeEach,
describe,
expect,
it,
jest,
} from '@jest/globals';
import { ReadableSpan } from '@opentelemetry/sdk-trace-base';
import * as assert from 'assert';
import { GenerateResponseData, Genkit, genkit, z } from 'genkit';
import { SPAN_TYPE_ATTR, appendSpan } from 'genkit/tracing';
import { Writable } from 'stream';
import {
__addTransportStreamForTesting,
__forceFlushSpansForTesting,
__getSpanExporterForTesting,
__useJsonFormatForTesting,
enableGoogleCloudTelemetry,
} from '../src/index.js';
jest.mock('../src/auth.js', () => {
const original = jest.requireActual('../src/auth.js');
return {
...(original || {}),
resolveCurrentPrincipal: jest.fn().mockImplementation(() => {
return Promise.resolve({
projectId: 'test',
serviceAccountEmail: 'test@test.com',
});
}),
credentialsFromEnvironment: jest.fn().mockImplementation(() => {
return Promise.resolve({
projectId: 'test',
credentials: {
client_email: 'test@genkit.com',
private_key: '-----BEGIN PRIVATE KEY-----',
},
});
}),
};
});
describe('GoogleCloudLogs with truncation', () => {
let logLines = '';
const logStream = new Writable();
logStream._write = (chunk, encoding, next) => {
logLines = logLines += chunk.toString();
next();
};
let ai: Genkit;
beforeAll(async () => {
process.env.GCLOUD_PROJECT = 'test';
process.env.GENKIT_ENV = 'dev';
__useJsonFormatForTesting();
__addTransportStreamForTesting(logStream);
await enableGoogleCloudTelemetry({
projectId: 'test',
forceDevExport: false,
metricExportIntervalMillis: 100,
metricExportTimeoutMillis: 100,
});
ai = genkit({
// Force GCP Plugin to use in-memory metrics exporter
plugins: [],
});
await waitForLogsInit(ai, logLines);
});
beforeEach(async () => {
logLines = '';
__getSpanExporterForTesting().reset();
});
afterAll(async () => {
await ai.stopServers();
});
it('truncates large output logs', async () => {
const testModel = createModel(ai, 'testModel', async () => {
return {
message: {
role: 'user',
content: [
{
text: 'r'.repeat(130_000),
},
],
},
finishReason: 'stop',
usage: {
inputTokens: 10,
outputTokens: 14,
inputCharacters: 8,
outputCharacters: 16,
inputImages: 1,
outputImages: 3,
},
};
});
const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => {
return await ai.run('sub1', async () => {
return await ai.run('sub2', async () => {
return await ai.generate({
model: testModel,
prompt: `${input} prompt`,
config: {
temperature: 1.0,
topK: 3,
topP: 5,
maxOutputTokens: 7,
},
});
});
});
});
await testFlow('test');
await getExportedSpans();
const logMessages = await getLogs(1, 100, logLines);
const logObjects = logMessages.map((l) => JSON.parse(l as string));
const logObjectMessages = logObjects.map(
(structuredLog) => structuredLog.message
);
expect(logObjectMessages).toContain('Output[testFlow, testFlow]');
logObjects.map((structuredLog) => {
if (structuredLog.message === 'Output[testFlow, testFlow]') {
expect(structuredLog.content.length).toBe(128_000);
}
});
});
it('truncates large input logs', async () => {
const testModel = createModel(ai, 'testModel', async () => {
return {
message: {
role: 'user',
content: [
{
text: 'response',
},
],
},
finishReason: 'stop',
usage: {
inputTokens: 10,
outputTokens: 14,
inputCharacters: 8,
outputCharacters: 16,
inputImages: 1,
outputImages: 3,
},
};
});
const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => {
return await ai.run('sub1', async () => {
return await ai.run('sub2', async () => {
return await ai.generate({
model: testModel,
prompt: `${input} prompt`,
config: {
temperature: 1.0,
topK: 3,
topP: 5,
maxOutputTokens: 7,
},
});
});
});
});
await testFlow('t'.repeat(130_000));
await getExportedSpans();
const logMessages = await getLogs(1, 100, logLines);
const logObjects = logMessages.map((l) => JSON.parse(l as string));
const logObjectMessages = logObjects.map(
(structuredLog) => structuredLog.message
);
expect(logObjectMessages).toContain('Input[testFlow, testFlow]');
logObjects.map((structuredLog) => {
if (structuredLog.message === 'Input[testFlow, testFlow]') {
expect(structuredLog.content.length).toBe(128_000);
}
});
});
it.only('truncates large model names', async () => {
const testModel = createModel(ai, 'm'.repeat(2046), async () => {
return {
message: {
role: 'user',
content: [
{
text: 'response',
},
],
},
finishReason: 'stop',
usage: {
inputTokens: 10,
outputTokens: 14,
inputCharacters: 8,
outputCharacters: 16,
inputImages: 1,
outputImages: 3,
},
};
});
const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => {
return await ai.run('sub1', async () => {
return await ai.run('sub2', async () => {
return await ai.generate({
model: testModel,
prompt: `${input} prompt`,
config: {
temperature: 1.0,
topK: 3,
topP: 5,
maxOutputTokens: 7,
},
});
});
});
});
await testFlow('test');
await getExportedSpans();
const logMessages = await getLogs(1, 100, logLines);
const logObjects = logMessages.map((l) => JSON.parse(l as string));
const logObjectModels = logObjects.map(
(structuredLog) => structuredLog.model
);
expect(logObjectModels).toContain('m'.repeat(1024));
});
});
describe('GoogleCloudLogs', () => {
let logLines = '';
const logStream = new Writable();
logStream._write = (chunk, encoding, next) => {
logLines = logLines += chunk.toString();
next();
};
let ai: Genkit;
beforeAll(async () => {
process.env.GCLOUD_PROJECT = 'test';
process.env.GENKIT_ENV = 'dev';
__addTransportStreamForTesting(logStream);
await enableGoogleCloudTelemetry({
projectId: 'test',
forceDevExport: false,
metricExportIntervalMillis: 100,
metricExportTimeoutMillis: 100,
});
ai = genkit({
// Force GCP Plugin to use in-memory metrics exporter
plugins: [],
});
await waitForLogsInit(ai, logLines);
});
beforeEach(async () => {
logLines = '';
__getSpanExporterForTesting().reset();
});
afterAll(async () => {
await ai.stopServers();
});
it('writes path logs', async () => {
const testFlow = createFlow(ai, 'testFlow');
await testFlow();
await getExportedSpans();
const logMessages = await getLogs(1, 100, logLines);
assert.equal(logMessages.includes('[info] Paths[testFlow]'), true);
});
it('writes error logs', async () => {
const testFlow = createFlow(ai, 'testFlow', async () => {
const nothing: { missing?: any } = { missing: 1 };
delete nothing.missing;
return nothing.missing.explode;
});
assert.rejects(async () => {
await testFlow();
});
await getExportedSpans();
const logMessages = await getLogs(1, 100, logLines);
assert.equal(
logMessages.includes(
"[error] Error[testFlow, TypeError] Cannot read properties of undefined (reading 'explode')"
),
true
);
}, 10000); //timeout
it('writes generate logs', async () => {
const testModel = createModel(ai, 'testModel', async () => {
return {
message: {
role: 'user',
content: [
{
text: 'response',
},
],
},
finishReason: 'stop',
usage: {
inputTokens: 10,
outputTokens: 14,
inputCharacters: 8,
outputCharacters: 16,
inputImages: 1,
outputImages: 3,
},
};
});
const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => {
return await ai.run('sub1', async () => {
return await ai.run('sub2', async () => {
return await ai.generate({
model: testModel,
prompt: `${input} prompt`,
config: {
temperature: 1.0,
topK: 3,
topP: 5,
maxOutputTokens: 7,
},
});
});
});
});
await testFlow('test');
await getExportedSpans();
const logMessages = await getLogs(1, 100, logLines);
assert.equal(
logMessages.includes(
'[info] Config[testFlow > sub1 > sub2 > generate > testModel, testModel]'
),
true
);
assert.equal(
logMessages.includes(
'[info] Input[testFlow > sub1 > sub2 > generate > testModel, testModel]'
),
true
);
assert.equal(
logMessages.includes(
'[info] Output[testFlow > sub1 > sub2 > generate > testModel, testModel]'
),
true
);
assert.equal(
logMessages.includes('[info] Input[testFlow, testFlow]'),
true
);
assert.equal(
logMessages.includes('[info] Output[testFlow, testFlow]'),
true
);
});
it('writes user feedback log', async () => {
await appendSpan(
'trace1',
'parent1',
{
name: 'user-feedback',
path: '/{flowName}',
metadata: {
subtype: 'userFeedback',
feedbackValue: 'negative',
textFeedback: 'terrible',
},
},
{ [SPAN_TYPE_ATTR]: 'userEngagement' }
);
await getExportedSpans();
const logMessages = await getLogs(1, 100, logLines);
assert.equal(logMessages.includes('[info] UserFeedback[flowName]'), true);
});
it('writes user acceptance log', async () => {
await appendSpan(
'trace1',
'parent1',
{
name: 'user-acceptance',
path: '/{flowName}',
metadata: { subtype: 'userAcceptance', acceptanceValue: 'rejected' },
},
{ [SPAN_TYPE_ATTR]: 'userEngagement' }
);
await getExportedSpans();
const logMessages = await getLogs(1, 100, logLines);
assert.equal(logMessages.includes('[info] UserAcceptance[flowName]'), true);
});
it('writes tool input and output logs', async () => {
const echoTool = ai.defineTool(
{ name: 'echoTool', description: 'echo' },
async (input) => input
);
await echoTool('Helllooooo!');
await getExportedSpans();
const logMessages = await getLogs(2, 100, logLines);
assert.ok(logMessages.includes('[info] Input[echoTool, echoTool]'));
assert.ok(logMessages.includes('[info] Output[echoTool, echoTool]'));
});
});
/** Helper to create a flow with no inputs or outputs */
function createFlow(
ai: Genkit,
name: string,
fn: () => Promise<any> = async () => {}
) {
return ai.defineFlow(
{
name,
inputSchema: z.void(),
outputSchema: z.void(),
},
fn
);
}
function createFlowWithInput(
ai: Genkit,
name: string,
fn: (input: string) => Promise<any>
) {
return ai.defineFlow(
{
name,
inputSchema: z.string(),
outputSchema: z.any(),
},
fn
);
}
/**
* Helper to create a model that returns the value produced by the given
* response function.
*/
function createModel(
genkit: Genkit,
name: string,
respFn: () => Promise<GenerateResponseData>
) {
return genkit.defineModel({ name }, (req) => respFn());
}
async function waitForLogsInit(genkit: Genkit, logLines: any) {
await import('winston');
const testFlow = createFlow(genkit, 'testFlow');
await testFlow();
await getLogs(1, 100, logLines);
}
async function getLogs(
logCount: number,
maxAttempts: number,
logLines: string
): Promise<String[]> {
var attempts = 0;
while (attempts++ < maxAttempts) {
await new Promise((resolve) => setTimeout(resolve, 100));
const found = logLines
.trim()
.split('\n')
.map((l) => l.trim());
if (found.length >= logCount) {
return found;
}
}
assert.fail(`Waiting for logs, but none have been written.`);
}
/** Polls the in memory metric exporter until the genkit scope is found. */
async function getExportedSpans(
maxAttempts: number = 200
): Promise<ReadableSpan[]> {
__forceFlushSpansForTesting();
var attempts = 0;
while (attempts++ < maxAttempts) {
await new Promise((resolve) => setTimeout(resolve, 50));
const found = __getSpanExporterForTesting().getFinishedSpans();
if (found.length > 0) {
return found;
}
}
assert.fail(`Timed out while waiting for spans to be exported.`);
}