MCP Terminal Server
by dillip285
/**
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { randomUUID } from 'crypto';
import { getDatasetStore, getEvalStore } from '.';
import { RuntimeManager } from '../manager/manager';
import {
Action,
CandidateData,
Dataset,
DatasetSchema,
EvalInput,
EvalKeyAugments,
EvalRun,
EvalRunKey,
GenerateResponseSchema,
RunNewEvaluationRequest,
SpanData,
} from '../types';
import {
evaluatorName,
generateTestCaseId,
getEvalExtractors,
getModelInput,
hasAction,
isEvaluator,
logger,
stackTraceSpans,
} from '../utils';
import { enrichResultsWithScoring, extractMetricsMetadata } from './parser';
interface InferenceRunState {
testCaseId: string;
input: any;
reference?: any;
traceId?: string;
response?: any;
evalError?: string;
}
interface FullInferenceSample {
testCaseId: string;
input: any;
reference?: any;
}
const SUPPORTED_ACTION_TYPES = ['flow', 'model'] as const;
/**
* Starts a new evaluation run. Intended to be used via the reflection API.
*/
export async function runNewEvaluation(
manager: RuntimeManager,
request: RunNewEvaluationRequest
): Promise<EvalRunKey> {
const { dataSource, actionRef, evaluators } = request;
const { datasetId, data } = dataSource;
if (!datasetId && !data) {
throw new Error(`Either 'data' or 'datasetId' must be provided`);
}
const hasTargetAction = await hasAction({ manager, actionRef });
if (!hasTargetAction) {
throw new Error(`Cannot find action ${actionRef}.`);
}
let inferenceDataset: Dataset;
let metadata = {};
if (datasetId) {
const datasetStore = await getDatasetStore();
logger.info(`Fetching dataset ${datasetId}...`);
const dataset = await datasetStore.getDataset(datasetId);
if (dataset.length === 0) {
throw new Error(`Dataset ${datasetId} is empty`);
}
inferenceDataset = DatasetSchema.parse(dataset);
const datasetMetadatas = await datasetStore.listDatasets();
const targetDatasetMetadata = datasetMetadatas.find(
(d) => d.datasetId === datasetId
);
const datasetVersion = targetDatasetMetadata?.version;
metadata = { datasetId, datasetVersion };
} else {
const rawData = data!.map((sample) => ({
...sample,
testCaseId: sample.testCaseId ?? generateTestCaseId(),
}));
inferenceDataset = DatasetSchema.parse(rawData);
}
logger.info('Running inference...');
const evalDataset = await runInference({
manager,
actionRef,
inferenceDataset,
context: request.options?.context,
actionConfig: request.options?.actionConfig,
});
const evaluatorActions = await getMatchingEvaluatorActions(
manager,
evaluators
);
const evalRun = await runEvaluation({
manager,
evaluatorActions,
evalDataset,
augments: {
...metadata,
actionRef,
actionConfig: request.options?.actionConfig,
},
});
return evalRun.key;
}
/** Handles the Inference part of Inference-Evaluation cycle */
export async function runInference(params: {
manager: RuntimeManager;
actionRef: string;
inferenceDataset: Dataset;
context?: string;
actionConfig?: any;
}): Promise<EvalInput[]> {
const { manager, actionRef, inferenceDataset, context, actionConfig } =
params;
if (!isSupportedActionRef(actionRef)) {
throw new Error('Inference is only supported on flows and models');
}
const evalDataset: EvalInput[] = await bulkRunAction({
manager,
actionRef,
inferenceDataset,
context,
actionConfig,
});
return evalDataset;
}
/** Handles the Evaluation part of Inference-Evaluation cycle */
export async function runEvaluation(params: {
manager: RuntimeManager;
evaluatorActions: Action[];
evalDataset: EvalInput[];
augments?: EvalKeyAugments;
}): Promise<EvalRun> {
const { manager, evaluatorActions, evalDataset, augments } = params;
if (evalDataset.length === 0) {
throw new Error('Cannot run evaluation, no data provided');
}
const evalRunId = randomUUID();
const scores: Record<string, any> = {};
logger.info('Running evaluation...');
for (const action of evaluatorActions) {
const name = evaluatorName(action);
const response = await manager.runAction({
key: name,
input: {
dataset: evalDataset.filter((row) => !row.error),
evalRunId,
},
});
scores[name] = response.result;
}
const scoredResults = enrichResultsWithScoring(scores, evalDataset);
const metadata = extractMetricsMetadata(evaluatorActions);
const evalRun = {
key: {
evalRunId,
createdAt: new Date().toISOString(),
...augments,
},
results: scoredResults,
metricsMetadata: metadata,
};
logger.info('Finished evaluation, writing key...');
const evalStore = getEvalStore();
await evalStore.save(evalRun);
return evalRun;
}
export async function getAllEvaluatorActions(
manager: RuntimeManager
): Promise<Action[]> {
const allActions = await manager.listActions();
const allEvaluatorActions = [];
for (const key in allActions) {
if (isEvaluator(key)) {
allEvaluatorActions.push(allActions[key]);
}
}
return allEvaluatorActions;
}
export async function getMatchingEvaluatorActions(
manager: RuntimeManager,
evaluators?: string[]
): Promise<Action[]> {
if (!evaluators) {
return [];
}
const allEvaluatorActions = await getAllEvaluatorActions(manager);
const filteredEvaluatorActions = allEvaluatorActions.filter((action) =>
evaluators.includes(action.key)
);
if (filteredEvaluatorActions.length === 0) {
if (allEvaluatorActions.length == 0) {
throw new Error('No evaluators installed');
}
}
return filteredEvaluatorActions;
}
async function bulkRunAction(params: {
manager: RuntimeManager;
actionRef: string;
inferenceDataset: Dataset;
context?: string;
actionConfig?: any;
}): Promise<EvalInput[]> {
const { manager, actionRef, inferenceDataset, context, actionConfig } =
params;
const isModelAction = actionRef.startsWith('/model');
if (inferenceDataset.length === 0) {
throw new Error('Cannot run inference, no data provided');
}
// Convert to satisfy TS checks. `input` is required in `Dataset` type, but
// ZodAny also includes `undefined` in TS checks. This explcit conversion
// works around this.
const fullInferenceDataset = inferenceDataset as FullInferenceSample[];
let states: InferenceRunState[] = [];
let evalInputs: EvalInput[] = [];
for (const sample of fullInferenceDataset) {
logger.info(`Running inference '${actionRef}' ...`);
if (isModelAction) {
states.push(
await runModelAction({
manager,
actionRef,
sample,
modelConfig: actionConfig,
})
);
} else {
states.push(
await runFlowAction({
manager,
actionRef,
sample,
context,
})
);
}
}
logger.info(`Gathering evalInputs...`);
for (const state of states) {
evalInputs.push(await gatherEvalInput({ manager, actionRef, state }));
}
return evalInputs;
}
async function runFlowAction(params: {
manager: RuntimeManager;
actionRef: string;
sample: FullInferenceSample;
context?: any;
}): Promise<InferenceRunState> {
const { manager, actionRef, sample, context } = { ...params };
let state: InferenceRunState;
try {
const runActionResponse = await manager.runAction({
key: actionRef,
input: sample.input,
context: context ? JSON.parse(context) : undefined,
});
state = {
...sample,
traceId: runActionResponse.telemetry?.traceId,
response: runActionResponse.result,
};
} catch (e: any) {
const traceId = e?.data?.details?.traceId;
state = {
...sample,
traceId,
evalError: `Error when running inference. Details: ${e?.message ?? e}`,
};
}
return state;
}
async function runModelAction(params: {
manager: RuntimeManager;
actionRef: string;
sample: FullInferenceSample;
modelConfig?: any;
}): Promise<InferenceRunState> {
const { manager, actionRef, modelConfig, sample } = { ...params };
let state: InferenceRunState;
try {
const modelInput = getModelInput(sample.input, modelConfig);
const runActionResponse = await manager.runAction({
key: actionRef,
input: modelInput,
});
state = {
...sample,
traceId: runActionResponse.telemetry?.traceId,
response: runActionResponse.result,
};
} catch (e: any) {
const traceId = e?.data?.details?.traceId;
state = {
...sample,
traceId,
evalError: `Error when running inference. Details: ${e?.message ?? e}`,
};
}
return state;
}
async function gatherEvalInput(params: {
manager: RuntimeManager;
actionRef: string;
state: InferenceRunState;
}): Promise<EvalInput> {
const { manager, actionRef, state } = params;
const extractors = await getEvalExtractors(actionRef);
const traceId = state.traceId;
if (!traceId) {
logger.warn('No traceId available...');
return {
...state,
error: state.evalError,
testCaseId: randomUUID(),
traceIds: [],
};
}
const trace = await manager.getTrace({
traceId,
});
const isModelAction = actionRef.startsWith('/model');
// Always use original input for models.
const input = isModelAction ? state.input : extractors.input(trace);
const nestedSpan = stackTraceSpans(trace);
if (!nestedSpan) {
return {
testCaseId: state.testCaseId,
input,
error: `Unable to extract any spans from trace ${traceId}`,
reference: state.reference,
traceIds: [traceId],
};
}
if (nestedSpan.attributes['genkit:state'] === 'error') {
return {
testCaseId: state.testCaseId,
input,
error:
getSpanErrorMessage(nestedSpan) ?? `Unknown error in trace ${traceId}`,
reference: state.reference,
traceIds: [traceId],
};
}
const output = extractors.output(trace);
const context = extractors.context(trace);
const error = isModelAction ? getErrorFromModelResponse(output) : undefined;
return {
// TODO Replace this with unified trace class
testCaseId: state.testCaseId,
input,
output,
error,
context: Array.isArray(context) ? context : [context],
reference: state.reference,
traceIds: [traceId],
};
}
function getSpanErrorMessage(span: SpanData): string | undefined {
if (span && span.status?.code === 2 /* SpanStatusCode.ERROR */) {
// It's possible for a trace to have multiple exception events,
// however we currently only expect and display the first one.
const event = span.timeEvents?.timeEvent
?.filter((e) => e.annotation.description === 'exception')
.shift();
return (
(event?.annotation?.attributes['exception.message'] as string) ?? 'Error'
);
}
}
function getErrorFromModelResponse(obj: any): string | undefined {
const response = GenerateResponseSchema.parse(obj);
if (!response || !response.candidates || response.candidates.length === 0) {
return `No response was extracted from the output. '${JSON.stringify(obj)}'`;
}
// We currently only support the first candidate
const candidate = response.candidates[0] as CandidateData;
if (candidate.finishReason === 'blocked') {
return candidate.finishMessage || `Generation was blocked by the model.`;
}
}
function isSupportedActionRef(actionRef: string) {
return SUPPORTED_ACTION_TYPES.some((supportedType) =>
actionRef.startsWith(`/${supportedType}`)
);
}