MCP Terminal Server
by dillip285
/**
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { LoggingWinston } from '@google-cloud/logging-winston';
import { getCurrentEnv } from 'genkit';
import { logger } from 'genkit/logging';
import { Writable } from 'stream';
import { GcpTelemetryConfig } from './types.js';
import { loggingDenied, loggingDeniedHelpText } from './utils.js';
/**
* Additional streams for writing log data to. Useful for unit testing.
*/
let additionalStream: Writable;
let useJsonFormatOverride: boolean = false;
/**
* Provides a logger for exporting Genkit debug logs to GCP Cloud
* logs.
*/
export class GcpLogger {
constructor(private readonly config: GcpTelemetryConfig) {}
async getLogger(env: string) {
// Dynamically importing winston here more strictly controls
// the import order relative to registering instrumentation
// with OpenTelemetry. Incorrect import order will trigger
// an internal OT warning and will result in logs not being
// associated with correct spans/traces.
const winston = await import('winston');
const format =
useJsonFormatOverride || this.shouldExport(env)
? { format: winston.format.json() }
: {
format: winston.format.printf((info): string => {
return `[${info.level}] ${info.message}`;
}),
};
let transports: any[] = [];
transports.push(
this.shouldExport(env)
? new LoggingWinston({
labels: { module: 'genkit' },
prefix: 'genkit',
logName: 'genkit_log',
projectId: this.config.projectId,
credentials: this.config.credentials,
autoRetry: true,
defaultCallback: await this.getErrorHandler(),
})
: new winston.transports.Console()
);
if (additionalStream) {
transports.push(
new winston.transports.Stream({ stream: additionalStream })
);
}
return winston.createLogger({
transports: transports,
...format,
exceptionHandlers: [new winston.transports.Console()],
});
}
private async getErrorHandler(): Promise<(err: Error | null) => void> {
// only log the first time
let instructionsLogged = false;
let helpInstructions = await loggingDeniedHelpText();
return async (err: Error | null) => {
// Use the defaultLogger so that logs don't get swallowed by
// the open telemetry exporter
const defaultLogger = logger.defaultLogger;
if (err && loggingDenied(err)) {
if (!instructionsLogged) {
instructionsLogged = true;
defaultLogger.error(
`Unable to send logs to Google Cloud: ${err.message}\n\n${helpInstructions}\n`
);
}
} else if (err) {
defaultLogger.error(`Unable to send logs to Google Cloud: ${err}`);
}
if (err) {
// Assume the logger is compromised, and we need a new one
// Reinitialize the genkit logger with a new instance with the same config
logger.init(
await new GcpLogger(this.config).getLogger(getCurrentEnv())
);
defaultLogger.info('Initialized a new GcpLogger.');
}
};
}
private shouldExport(env?: string) {
return this.config.export;
}
}
/** @hidden */
export function __addTransportStreamForTesting(stream: Writable) {
additionalStream = stream;
}
/** @hidden */
export function __useJsonFormatForTesting() {
useJsonFormatOverride = true;
}