MCP Terminal Server

/** * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import { ValueType } from '@opentelemetry/api'; import { hrTimeDuration, hrTimeToMilliseconds } from '@opentelemetry/core'; import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; import { GENKIT_VERSION } from 'genkit'; import { logger } from 'genkit/logging'; import { PathMetadata, toDisplayPath } from 'genkit/tracing'; import { MetricCounter, MetricHistogram, Telemetry, internalMetricNamespaceWrap, } from '../metrics.js'; import { createCommonLogAttributes, extractErrorMessage, extractErrorName, extractErrorStack, truncatePath, } from '../utils.js'; class PathsTelemetry implements Telemetry { /** * Wraps the declared metrics in a Genkit-specific, internal namespace. */ private _N = internalMetricNamespaceWrap.bind(null, 'feature'); private pathCounter = new MetricCounter(this._N('path/requests'), { description: 'Tracks unique flow paths per flow.', valueType: ValueType.INT, }); private pathLatencies = new MetricHistogram(this._N('path/latency'), { description: 'Latencies per flow path.', ValueType: ValueType.DOUBLE, unit: 'ms', }); tick( span: ReadableSpan, paths: Set<PathMetadata>, logInputAndOutput: boolean, projectId?: string ): void { const attributes = span.attributes; const name = attributes['genkit:name'] as string; const path = attributes['genkit:path'] as string; const sessionId = attributes['genkit:sessionId'] as string; const threadName = attributes['genkit:threadName'] as string; const latencyMs = hrTimeToMilliseconds( hrTimeDuration(span.startTime, span.endTime) ); const state = attributes['genkit:state'] as string; if (state === 'success') { this.writePathSuccess( span, paths!, name, path, latencyMs, projectId, sessionId, threadName ); return; } if (state === 'error') { const errorName = extractErrorName(span.events) || '<unknown>'; const errorMessage = extractErrorMessage(span.events) || '<unknown>'; const errorStack = extractErrorStack(span.events) || ''; this.writePathFailure( span, paths!, name, path, latencyMs, errorName, projectId, sessionId, threadName ); this.recordError( span, path, errorName, errorMessage, errorStack, projectId, sessionId, threadName ); return; } logger.warn(`Unknown state; ${state}`); } private recordError( span: ReadableSpan, path: string, errorName: string, errorMessage: string, errorStack: string, projectId?: string, sessionId?: string, threadName?: string ) { const displayPath = truncatePath(toDisplayPath(path)); logger.logStructuredError(`Error[${displayPath}, ${errorName}]`, { ...createCommonLogAttributes(span, projectId), path: displayPath, qualifiedPath: path, name: errorName, message: errorMessage, stack: errorStack, source: 'ts', sourceVersion: GENKIT_VERSION, sessionId, threadName, }); } private writePathSuccess( span: ReadableSpan, paths: Set<PathMetadata>, featureName: string, path: string, latencyMs: number, projectId?: string, sessionId?: string, threadName?: string ) { this.writePathMetrics( span, path, paths, featureName, latencyMs, undefined, projectId, sessionId, threadName ); } private writePathFailure( span: ReadableSpan, paths: Set<PathMetadata>, featureName: string, path: string, latencyMs: number, errorName: string, projectId?: string, sessionId?: string, threadName?: string ) { this.writePathMetrics( span, path, paths, featureName, latencyMs, errorName, projectId, sessionId, threadName ); } /** Writes all path-level metrics stored in the current flow execution. */ private writePathMetrics( span: ReadableSpan, rootPath: string, paths: Set<PathMetadata>, featureName: string, latencyMs: number, err?: string, projectId?: string, sessionId?: string, threadName?: string ) { const flowPaths = Array.from(paths).filter((meta) => meta.path.includes(featureName) ); if (flowPaths) { logger.logStructured(`Paths[${featureName}]`, { ...createCommonLogAttributes(span, projectId), flowName: featureName, sessionId, threadName, paths: flowPaths.map((p) => truncatePath(toDisplayPath(p.path))), }); flowPaths.forEach((p) => this.writePathMetric(featureName, p)); // If we're writing a failure, but none of the stored paths have failed, // this means the root flow threw the error. if (err && !flowPaths.some((p) => p.status === 'failure')) { this.writePathMetric(featureName, { status: 'failure', path: rootPath, error: err, latency: latencyMs, }); } } } /** Writes metrics for a single PathMetadata */ private writePathMetric(featureName: string, meta: PathMetadata) { const pathDimensions = { featureName: featureName, status: meta.status, error: meta.error, path: meta.path, source: 'ts', sourceVersion: GENKIT_VERSION, }; this.pathCounter.add(1, pathDimensions); this.pathLatencies.record(meta.latency, pathDimensions); } } const pathsTelemetry = new PathsTelemetry(); export { pathsTelemetry };