MCP Terminal Server

/** * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import axios, { AxiosError } from 'axios'; import chokidar from 'chokidar'; import EventEmitter from 'events'; import fs from 'fs/promises'; import path from 'path'; import { Action, RunActionResponse, RunActionResponseSchema, } from '../types/action'; import * as apis from '../types/apis'; import { GenkitErrorData } from '../types/error'; import { TraceData } from '../types/trace'; import { logger } from '../utils/logger'; import { checkServerHealth, findRuntimesDir, projectNameFromGenkitFilePath, retriable, } from '../utils/utils'; import { GenkitToolsError, RuntimeEvent, RuntimeInfo, StreamingCallback, } from './types'; const STREAM_DELIMITER = '\n'; const HEALTH_CHECK_INTERVAL = 5000; export const GENKIT_REFLECTION_API_SPEC_VERSION = 1; interface RuntimeManagerOptions { /** URL of the telemetry server. */ telemetryServerUrl?: string; /** Whether to clean up unhealthy runtimes. */ manageHealth?: boolean; } export class RuntimeManager { private filenameToRuntimeMap: Record<string, RuntimeInfo> = {}; private idToFileMap: Record<string, string> = {}; private eventEmitter = new EventEmitter(); private constructor( readonly telemetryServerUrl?: string, private manageHealth: boolean = true ) {} /** * Creates a new runtime manager. */ static async create(options: RuntimeManagerOptions) { const manager = new RuntimeManager( options.telemetryServerUrl, options.manageHealth ?? true ); await manager.setupRuntimesWatcher(); if (manager.manageHealth) { setInterval( async () => await manager.performHealthChecks(), HEALTH_CHECK_INTERVAL ); } return manager; } /** * Lists all active runtimes. */ listRuntimes(): Record<string, RuntimeInfo> { return Object.fromEntries( Object.values(this.filenameToRuntimeMap).map((runtime) => [ runtime.id, runtime, ]) ); } /** * Gets the runtime info for a given ID. */ getRuntimeById(id: string): RuntimeInfo | undefined { const fileName = this.idToFileMap[id]; return fileName ? this.filenameToRuntimeMap[fileName] : undefined; } /** * Gets the runtime that was started most recently. */ getMostRecentRuntime(): RuntimeInfo | undefined { const runtimes = Object.values(this.filenameToRuntimeMap); return runtimes.length === 0 ? undefined : runtimes.reduce((a, b) => new Date(a.timestamp) > new Date(b.timestamp) ? a : b ); } /** * Subscribe to changes to the available runtimes. e.g.) whenever a new * runtime is added or removed. * * The `listener` will be called with the `eventType` that occured and the * `runtime` to which it applies. * * @param listener the callback function. */ onRuntimeEvent( listener: (eventType: RuntimeEvent, runtime: RuntimeInfo) => void ) { Object.values(RuntimeEvent).forEach((event) => this.eventEmitter.on(event, (rt) => listener(event, rt)) ); } /** * Retrieves all runnable actions. */ async listActions(): Promise<Record<string, Action>> { // TODO: Allow selecting a runtime by pid. const runtime = this.getMostRecentRuntime(); if (!runtime) { throw new Error('No runtimes found'); } const response = await axios .get(`${runtime.reflectionServerUrl}/api/actions`) .catch((err) => this.httpErrorHandler(err, 'Error listing actions.')); return response.data as Record<string, Action>; } /** * Runs an action. */ async runAction( input: apis.RunActionRequest, streamingCallback?: StreamingCallback<any> ): Promise<RunActionResponse> { // TODO: Allow selecting a runtime by pid. const runtime = this.getMostRecentRuntime(); if (!runtime) { throw new Error('No runtimes found'); } if (streamingCallback) { const response = await axios .post( `${runtime.reflectionServerUrl}/api/runAction?stream=true`, input, { headers: { 'Content-Type': 'application/json', }, responseType: 'stream', } ) .catch(this.httpErrorHandler); let genkitVersion: string; if (response.headers['x-genkit-version']) { genkitVersion = response.headers['x-genkit-version']; } const stream = response.data; let buffer = ''; stream.on('data', (data: string) => { buffer += data; while (buffer.includes(STREAM_DELIMITER)) { try { streamingCallback( JSON.parse(buffer.substring(0, buffer.indexOf(STREAM_DELIMITER))) ); buffer = buffer.substring( buffer.indexOf(STREAM_DELIMITER) + STREAM_DELIMITER.length ); } catch (err) { logger.error(`Bad stream: ${err}`); break; } } }); let resolver: (op: RunActionResponse) => void; let rejecter: (err: Error) => void; const promise = new Promise<RunActionResponse>((resolve, reject) => { resolver = resolve; rejecter = reject; }); stream.on('end', () => { const parsedBuffer = JSON.parse(buffer); if (parsedBuffer.error) { const err = new GenkitToolsError( `Error running action key='${input.key}'.` ); // massage the error into a shape dev ui expects err.data = { ...parsedBuffer.error, stack: (parsedBuffer.error?.details as any).stack, data: { genkitErrorMessage: parsedBuffer.error?.message, genkitErrorDetails: parsedBuffer.error?.details, }, }; rejecter(err); return; } const actionResponse = RunActionResponseSchema.parse(parsedBuffer); if (genkitVersion) { actionResponse.genkitVersion = genkitVersion; } resolver(actionResponse); }); stream.on('error', (err: Error) => { rejecter(err); }); return promise; } else { const response = await axios .post(`${runtime.reflectionServerUrl}/api/runAction`, input, { headers: { 'Content-Type': 'application/json', }, }) .catch((err) => this.httpErrorHandler(err, `Error running action key='${input.key}'.`) ); const resp = RunActionResponseSchema.parse(response.data); if (response.headers['x-genkit-version']) { resp.genkitVersion = response.headers['x-genkit-version']; } return resp; } } /** * Retrieves all traces */ async listTraces( input: apis.ListTracesRequest ): Promise<apis.ListTracesResponse> { const { limit, continuationToken } = input; let query = ''; if (limit) { query += `limit=${limit}`; } if (continuationToken) { if (query !== '') { query += '&'; } query += `continuationToken=${continuationToken}`; } const response = await axios .get(`${this.telemetryServerUrl}/api/traces?${query}`) .catch((err) => this.httpErrorHandler(err, `Error listing traces for query='${query}'.`) ); return apis.ListTracesResponseSchema.parse(response.data); } /** * Retrieves a trace for a given ID. */ async getTrace(input: apis.GetTraceRequest): Promise<TraceData> { const { traceId } = input; const response = await axios .get(`${this.telemetryServerUrl}/api/traces/${traceId}`) .catch((err) => this.httpErrorHandler( err, `Error getting trace for traceId='${traceId}'` ) ); return response.data as TraceData; } /** * Notifies the runtime of dependencies it may need (e.g. telemetry server URL). */ private async notifyRuntime(runtime: RuntimeInfo) { try { await axios.post(`${runtime.reflectionServerUrl}/api/notify`, { telemetryServerUrl: this.telemetryServerUrl, reflectionApiSpecVersion: GENKIT_REFLECTION_API_SPEC_VERSION, }); } catch (error) { logger.error(`Failed to notify runtime ${runtime.id}: ${error}`); } } /** * Sets up a watcher for the runtimes directory. */ private async setupRuntimesWatcher() { try { const runtimesDir = await findRuntimesDir(); await fs.mkdir(runtimesDir, { recursive: true }); const watcher = chokidar.watch(runtimesDir, { persistent: true, ignoreInitial: false, }); watcher.on('add', (filePath) => this.handleNewRuntime(filePath)); if (this.manageHealth) { watcher.on('unlink', (filePath) => this.handleRemovedRuntime(filePath)); } // eagerly check existing runtimes on first load. for (const runtime of await fs.readdir(runtimesDir)) { await this.handleNewRuntime(path.resolve(runtimesDir, runtime)); } } catch (error) { logger.error('Failed to set up runtimes watcher:', error); } } /** * Handles a new runtime file. */ private async handleNewRuntime(filePath: string) { try { const { content, runtimeInfo } = await retriable( async () => { const content = await fs.readFile(filePath, 'utf-8'); const runtimeInfo = JSON.parse(content) as RuntimeInfo; runtimeInfo.projectName = projectNameFromGenkitFilePath(filePath); return { content, runtimeInfo }; }, { maxRetries: 10, delayMs: 500 } ); if (isValidRuntimeInfo(runtimeInfo)) { const fileName = path.basename(filePath); if (await checkServerHealth(runtimeInfo.reflectionServerUrl)) { if ( runtimeInfo.reflectionApiSpecVersion != GENKIT_REFLECTION_API_SPEC_VERSION ) { if ( !runtimeInfo.reflectionApiSpecVersion || runtimeInfo.reflectionApiSpecVersion < GENKIT_REFLECTION_API_SPEC_VERSION ) { logger.warn( 'Genkit CLI is newer than runtime library. Some feature may not be supported. ' + 'Consider upgrading your runtime library version (debug info: expected ' + `${GENKIT_REFLECTION_API_SPEC_VERSION}, got ${runtimeInfo.reflectionApiSpecVersion}).` ); } else { logger.error( 'Genkit CLI version is outdated. Please update `genkit-cli` to the latest version.' ); process.exit(1); } } this.filenameToRuntimeMap[fileName] = runtimeInfo; this.idToFileMap[runtimeInfo.id] = fileName; this.eventEmitter.emit(RuntimeEvent.ADD, runtimeInfo); await this.notifyRuntime(runtimeInfo); logger.debug( `Added runtime with ID ${runtimeInfo.id} at URL: ${runtimeInfo.reflectionServerUrl}` ); } else { await this.removeRuntime(fileName); } } else { logger.error(`Unexpected file in the runtimes directory: ${content}`); } } catch (error) { logger.error(`Error processing file ${filePath}:`, error); } } /** * Handles a removed runtime file. */ private handleRemovedRuntime(filePath: string) { const fileName = path.basename(filePath); if (fileName in this.filenameToRuntimeMap) { const runtime = this.filenameToRuntimeMap[fileName]; delete this.filenameToRuntimeMap[fileName]; delete this.idToFileMap[runtime.id]; this.eventEmitter.emit(RuntimeEvent.REMOVE, runtime); logger.debug(`Removed runtime with id ${runtime.id}.`); } } /** * Handles an HTTP error. */ private httpErrorHandler(error: AxiosError, message?: string): any { const newError = new GenkitToolsError(message || 'Internal Error'); if (error.response) { if ((error.response?.data as any).message) { newError.message = (error.response?.data as any).message; } // we got a non-200 response; copy the payload and rethrow newError.data = error.response.data as GenkitErrorData; throw newError; } // We actually have an exception; wrap it and re-throw. throw new GenkitToolsError(message || 'Internal Error', { cause: error.cause, }); } /** * Performs health checks on all runtimes. */ private async performHealthChecks() { const healthCheckPromises = Object.entries(this.filenameToRuntimeMap).map( async ([fileName, runtime]) => { if (!(await checkServerHealth(runtime.reflectionServerUrl))) { await this.removeRuntime(fileName); } } ); return Promise.all(healthCheckPromises); } /** * Removes the runtime file which will trigger the removal watcher. */ private async removeRuntime(fileName: string) { const runtime = this.filenameToRuntimeMap[fileName]; if (runtime) { try { const runtimesDir = await findRuntimesDir(); const runtimeFilePath = path.join(runtimesDir, fileName); await fs.unlink(runtimeFilePath); } catch (error) { logger.debug(`Failed to delete runtime file: ${error}`); } logger.debug( `Removed unhealthy runtime with ID ${runtime.id} from manager.` ); } } } /** * Checks if the runtime file is valid. */ function isValidRuntimeInfo(data: any): data is RuntimeInfo { return ( typeof data === 'object' && typeof data.id === 'string' && typeof data.pid === 'number' && typeof data.reflectionServerUrl === 'string' && typeof data.timestamp === 'string' && !isNaN(Date.parse(data.timestamp)) ); }