MCP Terminal Server
by dillip285
/**
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
afterAll,
beforeAll,
beforeEach,
describe,
expect,
it,
jest,
} from '@jest/globals';
import { ReadableSpan } from '@opentelemetry/sdk-trace-base';
import * as assert from 'assert';
import { Genkit, z } from 'genkit';
import { GenkitBeta, genkit } from 'genkit/beta';
import { appendSpan } from 'genkit/tracing';
import {
__forceFlushSpansForTesting,
__getSpanExporterForTesting,
} from '../src/gcpOpenTelemetry.js';
import { enableGoogleCloudTelemetry } from '../src/index.js';
jest.mock('../src/auth.js', () => {
const original = jest.requireActual('../src/auth.js');
return {
...(original || {}),
resolveCurrentPrincipal: jest.fn().mockImplementation(() => {
return Promise.resolve({
projectId: 'test',
serviceAccountEmail: 'test@test.com',
});
}),
credentialsFromEnvironment: jest.fn().mockImplementation(() => {
return Promise.resolve({
projectId: 'test',
credentials: {
client_email: 'test@genkit.com',
private_key: '-----BEGIN PRIVATE KEY-----',
},
});
}),
};
});
describe('GoogleCloudTracing', () => {
let ai: GenkitBeta;
beforeAll(async () => {
process.env.GCLOUD_PROJECT = 'test';
process.env.GENKIT_ENV = 'dev';
await enableGoogleCloudTelemetry({
projectId: 'test',
forceDevExport: false,
});
ai = genkit({});
});
beforeEach(async () => {
__getSpanExporterForTesting().reset();
});
afterAll(async () => {
await ai.stopServers();
});
it('writes traces', async () => {
const testFlow = createFlow(ai, 'testFlow');
await testFlow();
const spans = await getExportedSpans();
assert.equal(spans.length, 1);
assert.equal(spans[0].name, 'testFlow');
});
it('Adjusts attributes to support GCP trace filtering', async () => {
const testFlow = createFlow(ai, 'testFlow');
await testFlow();
const spans = await getExportedSpans();
// Check some common attributes
assert.equal(spans[0].attributes['genkit/name'], 'testFlow');
assert.equal(spans[0].attributes['genkit/type'], 'action');
assert.equal(spans[0].attributes['genkit/metadata/subtype'], 'flow');
// Ensure we have no attributes with ':' because these are awkward to use in
// Cloud Trace.
const spanAttrKeys = Object.entries(spans[0].attributes).map(([k, v]) => k);
for (const key in spanAttrKeys) {
assert.equal(key.indexOf(':'), -1);
}
});
it('sub actions are contained within flows', async () => {
const testFlow = createFlow(ai, 'testFlow', async () => {
return await ai.run('subAction', async () => {
return await ai.run('subAction2', async () => {
return 'done';
});
});
});
await testFlow();
const spans = await getExportedSpans();
assert.equal(spans.length, 3);
assert.equal(spans[2].name, 'testFlow');
assert.equal(spans[2].parentSpanId, undefined);
assert.equal(spans[1].name, 'subAction');
assert.equal(spans[1].parentSpanId, spans[2].spanContext().spanId);
assert.equal(spans[0].name, 'subAction2');
assert.equal(spans[0].parentSpanId, spans[1].spanContext().spanId);
});
it('different flows run independently', async () => {
const testFlow1 = createFlow(ai, 'testFlow1');
const testFlow2 = createFlow(ai, 'testFlow2');
await testFlow1();
await testFlow2();
const spans = await getExportedSpans();
assert.equal(spans.length, 2);
assert.equal(spans[0].parentSpanId, undefined);
assert.equal(spans[1].parentSpanId, undefined);
});
it('labels failed spans', async () => {
const testFlow = createFlow(ai, 'badFlow', async () => {
return await ai.run('badStep', async () => {
throw new Error('oh no!');
});
});
try {
await testFlow();
} catch (e) {}
const spans = await getExportedSpans();
assert.equal(spans.length, 2);
assert.equal(spans[0].name, 'badStep');
assert.equal(spans[0].attributes['genkit/failedSpan'], 'badStep');
assert.equal(
spans[0].attributes['genkit/failedPath'],
'/{badFlow,t:flow}/{badStep,t:flowStep}'
);
assert.equal(spans[1].attributes['genkit/isRoot'], true);
assert.equal(spans[1].attributes['genkit/rootState'], 'error');
});
it('labels the root feature', async () => {
const testFlow = createFlow(ai, 'niceFlow', async () => {
return ai.run('niceStep', async () => {});
});
await testFlow();
const spans = await getExportedSpans();
assert.equal(spans[0].name, 'niceStep');
assert.equal(spans[0].attributes['genkit/feature'], undefined);
assert.equal(spans[1].name, 'niceFlow');
assert.equal(spans[1].attributes['genkit/feature'], 'niceFlow');
assert.equal(spans[1].attributes['genkit/rootState'], 'success');
});
it('marks the root feature failed when it is the failure', async () => {
const testFlow = createFlow(ai, 'failingFlow', async () => {
await ai.run('good step', async () => {});
throw new Error('oops!');
});
try {
await testFlow();
} catch (e) {}
const spans = await getExportedSpans();
assert.equal(spans.length, 2);
assert.equal(spans[1].name, 'failingFlow');
assert.equal(spans[1].attributes['genkit/failedSpan'], 'failingFlow');
assert.equal(
spans[1].attributes['genkit/failedPath'],
'/{failingFlow,t:flow}'
);
assert.equal(spans[1].attributes['genkit/isRoot'], true);
assert.equal(spans[1].attributes['genkit/rootState'], 'error');
});
it('adds the genkit/model label for model actions', async () => {
const echoModel = ai.defineModel(
{
name: 'echoModel',
},
async (request) => {
return {
message: {
role: 'model',
content: [
{
text:
'Echo: ' +
request.messages
.map((m) => m.content.map((c) => c.text).join())
.join(),
},
],
},
finishReason: 'stop',
};
}
);
const testFlow = createFlow(ai, 'modelFlow', async () => {
return ai.run('runFlow', async () => {
await ai.generate({
model: echoModel,
prompt: 'Testing model telemetry',
});
});
});
await testFlow();
const spans = await getExportedSpans();
assert.equal(spans[0].name, 'echoModel');
assert.equal(spans[0].attributes['genkit/model'], 'echoModel');
assert.equal(spans[1].name, 'generate');
assert.equal(spans[2].name, 'runFlow');
assert.equal(spans[3].name, 'modelFlow');
});
it('attaches additional span', async () => {
await appendSpan(
'trace1',
'parent1',
{ name: 'span-name', metadata: { metadata_key: 'metadata_value' } },
{ ['label_key']: 'label_value' }
);
const spans = await getExportedSpans();
const span = spans.find((it) => it.name === 'span-name');
assert.equal(Object.keys(span?.attributes || {}).length, 3);
assert.equal(span?.attributes['genkit/name'], 'span-name');
assert.equal(span?.attributes['label_key'], 'label_value');
assert.equal(
span?.attributes['genkit/metadata/metadata_key'],
'metadata_value'
);
});
it('writes sessionId and threadName for chats', async () => {
const testModel = ai.defineModel({ name: 'testModel' }, async () => {
return {
message: {
role: 'user',
content: [
{
text: 'response',
},
],
},
finishReason: 'stop',
usage: {
inputTokens: 10,
outputTokens: 14,
inputCharacters: 8,
outputCharacters: 16,
inputImages: 1,
outputImages: 3,
},
};
});
const chat = ai.chat();
await chat.send({ model: testModel, prompt: 'Sending test prompt' });
const spans = await getExportedSpans();
// We should get 3 spans from this chat -- "send" which delegates to "generate" which delegates to our "testModel"
// Only the top level span will have the sessionId and threadName until we make sessionId more ubiquitous
expect(spans).toHaveLength(3);
spans.forEach((span) => {
if (span.name === 'send') {
expect(span?.attributes['genkit/sessionId']).not.toBeUndefined();
expect(span?.attributes['genkit/threadName']).not.toBeUndefined();
return;
}
// Once we make the change to have sessionId on all relevant spans, then these should verify they are populated.
expect(span?.attributes['genkit/sessionId']).toBeUndefined();
expect(span?.attributes['genkit/threadName']).toBeUndefined();
});
});
/** Helper to create a flow with no inputs or outputs */
function createFlow(
ai: Genkit,
name: string,
fn: () => Promise<any> = async () => {}
) {
return ai.defineFlow(
{
name,
inputSchema: z.void(),
outputSchema: z.void(),
},
fn
);
}
/** Polls the in memory metric exporter until the genkit scope is found. */
async function getExportedSpans(
maxAttempts: number = 200
): Promise<ReadableSpan[]> {
__forceFlushSpansForTesting();
var attempts = 0;
while (attempts++ < maxAttempts) {
await new Promise((resolve) => setTimeout(resolve, 50));
const found = __getSpanExporterForTesting().getFinishedSpans();
if (found.length > 0) {
return found;
}
}
assert.fail(`Timed out while waiting for spans to be exported.`);
}
});