MCP Terminal Server

/** * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import bodyParser from 'body-parser'; import cors, { CorsOptions } from 'cors'; import express from 'express'; import { Action, ActionContext, Flow, runWithStreamingCallback, z, } from 'genkit'; import { ContextProvider, RequestData, getCallableJSON, getHttpStatus, } from 'genkit/context'; import { logger } from 'genkit/logging'; import { Server } from 'http'; const streamDelimiter = '\n\n'; /** * Exposes provided flow or an action as express handler. */ export function expressHandler< C extends ActionContext = ActionContext, I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, >( action: Action<I, O, S>, opts?: { contextProvider?: ContextProvider<C, I>; } ): express.RequestHandler { return async ( request: express.Request, response: express.Response ): Promise<void> => { const { stream } = request.query; let input = request.body.data as z.infer<I>; let context: Record<string, any>; try { context = (await opts?.contextProvider?.({ method: request.method as RequestData['method'], headers: Object.fromEntries( Object.entries(request.headers).map(([key, value]) => [ key.toLowerCase(), Array.isArray(value) ? value.join(' ') : String(value), ]) ), input, })) || {}; } catch (e: any) { logger.error( `Auth policy failed with error: ${(e as Error).message}\n${(e as Error).stack}` ); response.status(getHttpStatus(e)).json(getCallableJSON(e)).end(); return; } if (request.get('Accept') === 'text/event-stream' || stream === 'true') { response.writeHead(200, { 'Content-Type': 'text/plain', 'Transfer-Encoding': 'chunked', }); try { const onChunk = (chunk: z.infer<S>) => { response.write( 'data: ' + JSON.stringify({ message: chunk }) + streamDelimiter ); }; const result = await runWithStreamingCallback( action.__registry, onChunk, () => action.run(input, { onChunk, context, }) ); response.write( 'data: ' + JSON.stringify({ result: result.result }) + streamDelimiter ); response.end(); } catch (e) { logger.error( `Streaming request failed with error: ${(e as Error).message}\n${(e as Error).stack}` ); response.write( `error: ${JSON.stringify({ error: getCallableJSON(e) })}${streamDelimiter}` ); response.end(); } } else { try { const result = await action.run(input, { context }); response.setHeader('x-genkit-trace-id', result.telemetry.traceId); response.setHeader('x-genkit-span-id', result.telemetry.spanId); // Responses for non-streaming flows are passed back with the flow result stored in a field called "result." response .status(200) .json({ result: result.result, }) .end(); } catch (e) { // Errors for non-streaming flows are passed back as standard API errors. logger.error( `Non-streaming request failed with error: ${(e as Error).message}\n${(e as Error).stack}` ); response.status(getHttpStatus(e)).json(getCallableJSON(e)).end(); } } }; } /** * A wrapper object containing a flow with its associated auth policy. */ export type FlowWithContextProvider< C extends ActionContext = ActionContext, I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, > = { flow: Flow<I, O, S>; context: ContextProvider<C, I>; }; /** * Adds an auth policy to the flow. */ export function withContextProvider< C extends ActionContext = ActionContext, I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, >( flow: Flow<I, O, S>, context: ContextProvider<C, I> ): FlowWithContextProvider<C, I, O, S> { return { flow, context, }; } /** * Options to configure the flow server. */ export interface FlowServerOptions { /** List of flows to expose via the flow server. */ flows: (Flow<any, any, any> | FlowWithContextProvider<any, any, any>)[]; /** Port to run the server on. Defaults to env.PORT or 3400. */ port?: number; /** CORS options for the server. */ cors?: CorsOptions; /** HTTP method path prefix for the exposed flows. */ pathPrefix?: string; /** JSON body parser options. */ jsonParserOptions?: bodyParser.OptionsJson; } /** * Starts an express server with the provided flows and options. */ export function startFlowServer(options: FlowServerOptions): FlowServer { const server = new FlowServer(options); server.start(); return server; } /** * Flow server exposes registered flows as HTTP endpoints. * * This is for use in production environments. * * @hidden */ export class FlowServer { /** List of all running servers needed to be cleaned up on process exit. */ private static RUNNING_SERVERS: FlowServer[] = []; /** Options for the flow server configured by the developer. */ private options: FlowServerOptions; /** Port the server is actually running on. This may differ from `options.port` if the original was occupied. Null is server is not running. */ private port: number | null = null; /** Express server instance. Null if server is not running. */ private server: Server | null = null; constructor(options: FlowServerOptions) { this.options = { ...options, }; } /** * Starts the server and adds it to the list of running servers to clean up on exit. */ async start() { const server = express(); server.use(bodyParser.json(this.options.jsonParserOptions)); server.use(cors(this.options.cors)); logger.debug('Running flow server with flow paths:'); const pathPrefix = this.options.pathPrefix ?? ''; this.options.flows?.forEach((flow) => { if ('context' in flow) { const flowPath = `/${pathPrefix}${flow.flow.__action.name}`; logger.debug(` - ${flowPath}`); server.post( flowPath, expressHandler(flow.flow, { contextProvider: flow.context }) ); } else { const flowPath = `/${pathPrefix}${flow.__action.name}`; logger.debug(` - ${flowPath}`); server.post(flowPath, expressHandler(flow)); } }); this.port = this.options?.port || (process.env.PORT ? parseInt(process.env.PORT) : 0) || 3400; this.server = server.listen(this.port, () => { logger.debug(`Flow server running on http://localhost:${this.port}`); FlowServer.RUNNING_SERVERS.push(this); }); } /** * Stops the server and removes it from the list of running servers to clean up on exit. */ async stop(): Promise<void> { if (!this.server) { return; } return new Promise<void>((resolve, reject) => { this.server!.close((err) => { if (err) { logger.error( `Error shutting down flow server on port ${this.port}: ${err}` ); reject(err); } const index = FlowServer.RUNNING_SERVERS.indexOf(this); if (index > -1) { FlowServer.RUNNING_SERVERS.splice(index, 1); } logger.debug( `Flow server on port ${this.port} has successfully shut down.` ); this.port = null; this.server = null; resolve(); }); }); } /** * Stops all running servers. */ static async stopAll() { return Promise.all( FlowServer.RUNNING_SERVERS.map((server) => server.stop()) ); } }