MCP Terminal Server

/** * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import { Channel } from '@genkit-ai/core/async'; const __flowStreamDelimiter = '\n\n'; /** * Invoke and stream response from a deployed flow. * * For example: * * ```js * import { streamFlow } from 'genkit/beta/client'; * * const response = streamFlow({ * url: 'https://my-flow-deployed-url', * input: 'foo', * }); * for await (const chunk of response.stream) { * console.log(chunk); * } * console.log(await response.output); * ``` */ export function streamFlow<O = any, S = any>({ url, input, headers, }: { /** URL of the deployed flow. */ url: string; /** Flow input. */ input?: any; /** A map of HTTP headers to be added to the HTTP call. */ headers?: Record<string, string>; }): { readonly output: Promise<O>; readonly stream: AsyncIterable<S>; } { const channel = new Channel<S>(); const operationPromise = __flowRunEnvelope({ url, input, sendChunk: (c) => channel.send(c), headers, }); operationPromise.then( () => channel.close(), (err) => channel.error(err) ); return { output: operationPromise, stream: channel, }; } async function __flowRunEnvelope({ url, input, sendChunk, headers, }: { url: string; input: any; sendChunk: (chunk: any) => void; headers?: Record<string, string>; }) { const response = await fetch(url, { method: 'POST', body: JSON.stringify({ data: input, }), headers: { Accept: 'text/event-stream', 'Content-Type': 'application/json', ...headers, }, }); if (response.status !== 200) { throw new Error( `Server returned: ${response.status}: ${await response.text()}` ); } if (!response.body) { throw new Error('Response body is empty'); } var reader = response.body.getReader(); var decoder = new TextDecoder(); let buffer = ''; while (true) { const result = await reader.read(); const decodedValue = decoder.decode(result.value); if (decodedValue) { buffer += decodedValue; } // If buffer includes the delimiter that means we are still recieving chunks. while (buffer.includes(__flowStreamDelimiter)) { const chunk = JSON.parse( buffer .substring(0, buffer.indexOf(__flowStreamDelimiter)) .substring('data: '.length) ); if (chunk.hasOwnProperty('message')) { sendChunk(chunk.message); } else if (chunk.hasOwnProperty('result')) { return chunk.result; } else if (chunk.hasOwnProperty('error')) { throw new Error( `${chunk.error.status}: ${chunk.error.message}\n${chunk.error.details}` ); } else { throw new Error('unkown chunk format: ' + JSON.stringify(chunk)); } buffer = buffer.substring( buffer.indexOf(__flowStreamDelimiter) + __flowStreamDelimiter.length ); } } throw new Error('stream did not terminate correctly'); } /** * Invoke a deployed flow over HTTP(s). * * For example: * * ```js * import { runFlow } from 'genkit/beta/client'; * * const response = await runFlow({ * url: 'https://my-flow-deployed-url', * input: 'foo', * }); * console.log(await response); * ``` */ export async function runFlow<O = any>({ url, input, headers, }: { /** URL of the deployed flow. */ url: string; /** Flow input. */ input?: any; /** A map of HTTP headers to be added to the HTTP call. */ headers?: Record<string, string>; }): Promise<O> { const response = await fetch(url, { method: 'POST', body: JSON.stringify({ data: input, }), headers: { 'Content-Type': 'application/json', ...headers, }, }); if (response.status !== 200) { throw new Error( `Server returned: ${response.status}: ${await response.text()}` ); } const wrappedResult = (await response.json()) as | { result: O } | { error: unknown }; if ('error' in wrappedResult) { if (typeof wrappedResult.error === 'string') { throw new Error(wrappedResult.error); } // TODO: The callable protocol defines an HttpError that has a JSON format of // details?: string // httpErrorCode: { canonicalName: string } // message: string // Should we create a new error class that parses this and exposes it as fields? throw new Error(JSON.stringify(wrappedResult.error)); } return wrappedResult.result; }