MCP Terminal Server
by dillip285
/**
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
ExporterOptions,
MetricExporter,
} from '@google-cloud/opentelemetry-cloud-monitoring-exporter';
import { TraceExporter } from '@google-cloud/opentelemetry-cloud-trace-exporter';
import { GcpDetectorSync } from '@google-cloud/opentelemetry-resource-util';
import { Span, SpanStatusCode, TraceFlags } from '@opentelemetry/api';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import {
ExportResult,
hrTimeDuration,
hrTimeToMilliseconds,
} from '@opentelemetry/core';
import { Instrumentation } from '@opentelemetry/instrumentation';
import { PinoInstrumentation } from '@opentelemetry/instrumentation-pino';
import { WinstonInstrumentation } from '@opentelemetry/instrumentation-winston';
import { Resource } from '@opentelemetry/resources';
import {
AggregationTemporality,
DefaultAggregation,
ExponentialHistogramAggregation,
InMemoryMetricExporter,
InstrumentType,
PeriodicExportingMetricReader,
PushMetricExporter,
ResourceMetrics,
} from '@opentelemetry/sdk-metrics';
import { NodeSDKConfiguration } from '@opentelemetry/sdk-node';
import {
BatchSpanProcessor,
InMemorySpanExporter,
ReadableSpan,
SpanExporter,
} from '@opentelemetry/sdk-trace-base';
import { GENKIT_VERSION } from 'genkit';
import { logger } from 'genkit/logging';
import { PathMetadata } from 'genkit/tracing';
import { actionTelemetry } from './telemetry/action.js';
import { engagementTelemetry } from './telemetry/engagement.js';
import { featuresTelemetry } from './telemetry/feature.js';
import { generateTelemetry } from './telemetry/generate.js';
import { pathsTelemetry } from './telemetry/path.js';
import { GcpTelemetryConfig } from './types.js';
import {
extractErrorName,
metricsDenied,
metricsDeniedHelpText,
tracingDenied,
tracingDeniedHelpText,
} from './utils.js';
let metricExporter: PushMetricExporter;
let spanProcessor: BatchSpanProcessor;
let spanExporter: AdjustingTraceExporter;
/**
* Provides a {TelemetryConfig} for exporting OpenTelemetry data (Traces,
* Metrics, and Logs) to the Google Cloud Operations Suite.
*/
export class GcpOpenTelemetry {
private readonly config: GcpTelemetryConfig;
private readonly resource: Resource;
constructor(config: GcpTelemetryConfig) {
this.config = config;
this.resource = new Resource({ type: 'global' }).merge(
new GcpDetectorSync().detect()
);
}
/**
* Log hook for writing trace and span metadata to log messages in the format
* required by GCP.
*/
private gcpTraceLogHook = (span: Span, record: any) => {
const spanContext = span.spanContext();
const isSampled = !!(spanContext.traceFlags & TraceFlags.SAMPLED);
const projectId = this.config.projectId;
record['logging.googleapis.com/trace'] ??=
`projects/${projectId}/traces/${spanContext.traceId}`;
record['logging.googleapis.com/trace_sampled'] ??= isSampled ? '1' : '0';
record['logging.googleapis.com/spanId'] ??= spanContext.spanId;
// Clear out the duplicate trace and span information in the log metadata.
// These will be incorrect for logs written during span export time since
// the logs are written after the span has fully executed. Those logs are
// explicitly tied to the correct span in createCommonLogAttributes in
// utils.ts.
delete record['span_id'];
delete record['trace_id'];
delete record['trace_flags'];
};
async getConfig(): Promise<Partial<NodeSDKConfiguration>> {
spanProcessor = new BatchSpanProcessor(await this.createSpanExporter());
return {
resource: this.resource,
spanProcessor: spanProcessor,
sampler: this.config.sampler,
instrumentations: this.getInstrumentations(),
metricReader: await this.createMetricReader(),
};
}
private async createSpanExporter(): Promise<SpanExporter> {
spanExporter = new AdjustingTraceExporter(
this.shouldExportTraces()
? new TraceExporter({
// provided projectId should take precedence over env vars, etc
projectId: this.config.projectId,
// creds for non-GCP environments, in lieu of using ADC.
credentials: this.config.credentials,
})
: new InMemorySpanExporter(),
this.config.exportInputAndOutput,
this.config.projectId,
getErrorHandler(
(err) => {
return tracingDenied(err);
},
await tracingDeniedHelpText()
)
);
return spanExporter;
}
/**
* Creates a {MetricReader} for pushing metrics out to GCP via OpenTelemetry.
*/
private async createMetricReader(): Promise<PeriodicExportingMetricReader> {
metricExporter = await this.buildMetricExporter();
return new PeriodicExportingMetricReader({
exportIntervalMillis: this.config.metricExportIntervalMillis,
exportTimeoutMillis: this.config.metricExportTimeoutMillis,
exporter: metricExporter,
});
}
/** Gets all open telemetry instrumentations as configured by the plugin. */
private getInstrumentations() {
let instrumentations: Instrumentation[] = [];
if (this.config.autoInstrumentation) {
instrumentations = getNodeAutoInstrumentations(
this.config.autoInstrumentationConfig
);
}
return instrumentations
.concat(this.getDefaultLoggingInstrumentations())
.concat(this.config.instrumentations ?? []);
}
private shouldExportTraces(): boolean {
return this.config.export && !this.config.disableTraces;
}
private shouldExportMetrics(): boolean {
return this.config.export && !this.config.disableMetrics;
}
/** Always configure the Pino and Winston instrumentations */
private getDefaultLoggingInstrumentations(): Instrumentation[] {
return [
new WinstonInstrumentation({ logHook: this.gcpTraceLogHook }),
new PinoInstrumentation({ logHook: this.gcpTraceLogHook }),
];
}
private async buildMetricExporter(): Promise<PushMetricExporter> {
const exporter: PushMetricExporter = this.shouldExportMetrics()
? new MetricExporterWrapper(
{
userAgent: {
product: 'genkit',
version: GENKIT_VERSION,
},
// provided projectId should take precedence over env vars, etc
projectId: this.config.projectId,
// creds for non-GCP environments, in lieu of using ADC.
credentials: this.config.credentials,
},
getErrorHandler(
(err) => {
return metricsDenied(err);
},
await metricsDeniedHelpText()
)
)
: new InMemoryMetricExporter(AggregationTemporality.DELTA);
return exporter;
}
}
/**
* Rewrites the export method to include an error handler which logs
* helpful information about how to set up metrics/telemetry in GCP.
*/
class MetricExporterWrapper extends MetricExporter {
private promise = new Promise<void>((resolve) => resolve());
constructor(
options?: ExporterOptions,
private errorHandler?: (error: Error) => void
) {
super(options);
}
async export(
metrics: ResourceMetrics,
resultCallback: (result: ExportResult) => void
): Promise<void> {
await this.promise;
this.modifyStartTimes(metrics);
this.promise = new Promise<void>((resolve) => {
super.export(metrics, (result) => {
try {
if (this.errorHandler && result.error) {
this.errorHandler(result.error);
}
resultCallback(result);
} finally {
resolve();
}
});
});
}
selectAggregation(instrumentType: InstrumentType) {
if (instrumentType === InstrumentType.HISTOGRAM) {
return new ExponentialHistogramAggregation();
}
return new DefaultAggregation();
}
selectAggregationTemporality(instrumentType: InstrumentType) {
return AggregationTemporality.DELTA;
}
/**
* Modify the start times of each data point to ensure no
* overlap with previous exports.
*
* Cloud metrics do not support delta metrics for custom metrics
* and will convert any DELTA aggregations to CUMULATIVE ones on
* export. There is implicit overlap in the start/end times that
* the Metric reader is sending -- the end_time of the previous
* export will become the start_time of the current export. The
* overlap in times means that only one of those records will
* persist and the other will be overwritten. This
* method adds a thousandth of a second to ensure discrete export
* timeframes.
*/
private modifyStartTimes(metrics: ResourceMetrics): void {
metrics.scopeMetrics.forEach((scopeMetric) => {
scopeMetric.metrics.forEach((metric) => {
metric.dataPoints.forEach((dataPoint) => {
dataPoint.startTime[1] = dataPoint.startTime[1] + 1_000_000;
});
});
});
}
async shutdown(): Promise<void> {
return await this.forceFlush();
}
async forceFlush(): Promise<void> {
await this.promise;
}
}
/**
* Adjusts spans before exporting to GCP. Redacts model input
* and output content, and augments span attributes before sending to GCP.
*/
class AdjustingTraceExporter implements SpanExporter {
constructor(
private exporter: SpanExporter,
private logInputAndOutput: boolean,
private projectId?: string,
private errorHandler?: (error: Error) => void
) {}
export(
spans: ReadableSpan[],
resultCallback: (result: ExportResult) => void
): void {
this.exporter?.export(this.adjust(spans), (result) => {
if (this.errorHandler && result.error) {
this.errorHandler(result.error);
}
resultCallback(result);
});
}
shutdown(): Promise<void> {
return this.exporter?.shutdown();
}
getExporter(): SpanExporter {
return this.exporter;
}
forceFlush(): Promise<void> {
if (this.exporter?.forceFlush) {
return this.exporter.forceFlush();
}
return Promise.resolve();
}
private adjust(spans: ReadableSpan[]): ReadableSpan[] {
const allPaths = spans
.filter((span) => span.attributes['genkit:path'])
.map(
(span) =>
({
path: span.attributes['genkit:path'] as string,
status:
(span.attributes['genkit:state'] as string) === 'error'
? 'failure'
: 'success',
error: extractErrorName(span.events),
latency: hrTimeToMilliseconds(
hrTimeDuration(span.startTime, span.endTime)
),
}) as PathMetadata
);
const allLeafPaths = new Set<PathMetadata>(
allPaths.filter((leafPath) =>
allPaths.every(
(path) =>
path.path === leafPath.path ||
!path.path.startsWith(leafPath.path) ||
(path.path.startsWith(leafPath.path) &&
path.status !== leafPath.status)
)
)
);
// Check if we have a failure in the root span that requires special handling
const rootSpan = spans.find((s) =>
Object.keys(s.attributes).includes('genkit:isRoot')
);
if (rootSpan) {
const rootSpanFailed =
(rootSpan.attributes['genkit:state'] as string) === 'error';
const anotherFailedSpan = spans.find(
(s) =>
!Object.keys(s.attributes).includes('genkit:isRoot') &&
s.attributes['genkit:state'] === 'error'
);
if (rootSpanFailed && !anotherFailedSpan) {
rootSpan.attributes['genkit:failedSpan'] =
rootSpan.attributes['genkit:name'];
rootSpan.attributes['genkit:failedPath'] =
rootSpan.attributes['genkit:path'];
}
}
return spans.map((span) => {
this.tickTelemetry(span, allLeafPaths);
span = this.redactInputOutput(span);
span = this.markErrorSpanAsError(span);
span = this.markFailedSpan(span);
span = this.markGenkitFeature(span);
span = this.markGenkitModel(span);
span = this.normalizeLabels(span);
return span;
});
}
private tickTelemetry(span: ReadableSpan, paths: Set<PathMetadata>) {
const attributes = span.attributes;
if (!Object.keys(attributes).includes('genkit:type')) {
return;
}
const type = attributes['genkit:type'] as string;
const subtype = attributes['genkit:metadata:subtype'] as string;
const isRoot = !!span.attributes['genkit:isRoot'];
const unused: Set<PathMetadata> = new Set();
if (isRoot) {
// Report top level feature request and latency only for root spans
// Log input to and output from to the feature
featuresTelemetry.tick(
span,
unused,
this.logInputAndOutput,
this.projectId
);
// Report executions and latency for all flow paths only on the root span
pathsTelemetry.tick(span, paths, this.logInputAndOutput, this.projectId);
// Set root status explicitly
span.attributes['genkit:rootState'] = span.attributes['genkit:state'];
}
if (type === 'action' && subtype === 'model') {
// Report generate metrics () for all model actions
generateTelemetry.tick(
span,
unused,
this.logInputAndOutput,
this.projectId
);
}
if (type === 'action' && subtype === 'tool') {
// TODO: Report input and output for tool actions
}
if (type === 'action' || type === 'flow' || type == 'flowStep') {
// Report request and latency metrics for all actions
actionTelemetry.tick(
span,
unused,
this.logInputAndOutput,
this.projectId
);
}
if (type === 'userEngagement') {
// Report user acceptance and feedback metrics
engagementTelemetry.tick(
span,
unused,
this.logInputAndOutput,
this.projectId
);
}
}
private redactInputOutput(span: ReadableSpan): ReadableSpan {
const hasInput = 'genkit:input' in span.attributes;
const hasOutput = 'genkit:output' in span.attributes;
return !hasInput && !hasOutput
? span
: {
...span,
spanContext: span.spanContext,
attributes: {
...span.attributes,
'genkit:input': '<redacted>',
'genkit:output': '<redacted>',
},
};
}
// This is a workaround for GCP Trace to mark a span with a red
// exclamation mark indicating that it is an error.
private markErrorSpanAsError(span: ReadableSpan): ReadableSpan {
return span.status.code !== SpanStatusCode.ERROR
? span
: {
...span,
spanContext: span.spanContext,
attributes: {
...span.attributes,
'/http/status_code': '599',
},
};
}
private normalizeLabels(span: ReadableSpan): ReadableSpan {
const normalized = {} as Record<string, any>;
for (const [key, value] of Object.entries(span.attributes)) {
normalized[key.replace(/\:/g, '/')] = value;
}
return {
...span,
spanContext: span.spanContext,
attributes: normalized,
};
}
private markFailedSpan(span: ReadableSpan): ReadableSpan {
if (
span.attributes['genkit:state'] === 'error' &&
!span.attributes['genkit:isRoot']
) {
if (!!span.attributes['genkit:name']) {
span.attributes['genkit:failedSpan'] = span.attributes['genkit:name'];
}
if (!!span.attributes['genkit:path']) {
span.attributes['genkit:failedPath'] = span.attributes['genkit:path'];
}
}
return span;
}
private markGenkitFeature(span: ReadableSpan): ReadableSpan {
if (span.attributes['genkit:isRoot'] && !!span.attributes['genkit:name']) {
span.attributes['genkit:feature'] = span.attributes['genkit:name'];
}
return span;
}
private markGenkitModel(span: ReadableSpan): ReadableSpan {
if (
span.attributes['genkit:metadata:subtype'] === 'model' &&
!!span.attributes['genkit:name']
) {
span.attributes['genkit:model'] = span.attributes['genkit:name'];
}
return span;
}
}
function getErrorHandler(
shouldLogFn: (err: Error) => boolean,
helpText: string
): (err: Error) => void {
// only log the first time
let instructionsLogged = false;
return (err) => {
// Use the defaultLogger so that logs don't get swallowed by the open
// telemetry exporter
const defaultLogger = logger.defaultLogger;
if (err && shouldLogFn(err)) {
if (!instructionsLogged) {
instructionsLogged = true;
defaultLogger.error(
`Unable to send telemetry to Google Cloud: ${err.message}\n\n${helpText}\n`
);
}
} else if (err) {
defaultLogger.error(`Unable to send telemetry to Google Cloud: ${err}`);
}
};
}
/** @hidden */
export function __getMetricExporterForTesting(): InMemoryMetricExporter {
return metricExporter as InMemoryMetricExporter;
}
/** @hidden */
export function __getSpanExporterForTesting(): InMemorySpanExporter {
return spanExporter.getExporter() as InMemorySpanExporter;
}
/** @hidden */
export function __forceFlushSpansForTesting() {
spanProcessor.forceFlush();
}