MCP Terminal Server
by dillip285
/**
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Channel } from '@genkit-ai/core/async';
const __flowStreamDelimiter = '\n\n';
/**
* Invoke and stream response from a deployed flow.
*
* For example:
*
* ```js
* import { streamFlow } from 'genkit/beta/client';
*
* const response = streamFlow({
* url: 'https://my-flow-deployed-url',
* input: 'foo',
* });
* for await (const chunk of response.stream) {
* console.log(chunk);
* }
* console.log(await response.output);
* ```
*/
export function streamFlow<O = any, S = any>({
url,
input,
headers,
}: {
/** URL of the deployed flow. */
url: string;
/** Flow input. */
input?: any;
/** A map of HTTP headers to be added to the HTTP call. */
headers?: Record<string, string>;
}): {
readonly output: Promise<O>;
readonly stream: AsyncIterable<S>;
} {
const channel = new Channel<S>();
const operationPromise = __flowRunEnvelope({
url,
input,
sendChunk: (c) => channel.send(c),
headers,
});
operationPromise.then(
() => channel.close(),
(err) => channel.error(err)
);
return {
output: operationPromise,
stream: channel,
};
}
async function __flowRunEnvelope({
url,
input,
sendChunk,
headers,
}: {
url: string;
input: any;
sendChunk: (chunk: any) => void;
headers?: Record<string, string>;
}) {
const response = await fetch(url, {
method: 'POST',
body: JSON.stringify({
data: input,
}),
headers: {
Accept: 'text/event-stream',
'Content-Type': 'application/json',
...headers,
},
});
if (response.status !== 200) {
throw new Error(
`Server returned: ${response.status}: ${await response.text()}`
);
}
if (!response.body) {
throw new Error('Response body is empty');
}
var reader = response.body.getReader();
var decoder = new TextDecoder();
let buffer = '';
while (true) {
const result = await reader.read();
const decodedValue = decoder.decode(result.value);
if (decodedValue) {
buffer += decodedValue;
}
// If buffer includes the delimiter that means we are still recieving chunks.
while (buffer.includes(__flowStreamDelimiter)) {
const chunk = JSON.parse(
buffer
.substring(0, buffer.indexOf(__flowStreamDelimiter))
.substring('data: '.length)
);
if (chunk.hasOwnProperty('message')) {
sendChunk(chunk.message);
} else if (chunk.hasOwnProperty('result')) {
return chunk.result;
} else if (chunk.hasOwnProperty('error')) {
throw new Error(
`${chunk.error.status}: ${chunk.error.message}\n${chunk.error.details}`
);
} else {
throw new Error('unkown chunk format: ' + JSON.stringify(chunk));
}
buffer = buffer.substring(
buffer.indexOf(__flowStreamDelimiter) + __flowStreamDelimiter.length
);
}
}
throw new Error('stream did not terminate correctly');
}
/**
* Invoke a deployed flow over HTTP(s).
*
* For example:
*
* ```js
* import { runFlow } from 'genkit/beta/client';
*
* const response = await runFlow({
* url: 'https://my-flow-deployed-url',
* input: 'foo',
* });
* console.log(await response);
* ```
*/
export async function runFlow<O = any>({
url,
input,
headers,
}: {
/** URL of the deployed flow. */
url: string;
/** Flow input. */
input?: any;
/** A map of HTTP headers to be added to the HTTP call. */
headers?: Record<string, string>;
}): Promise<O> {
const response = await fetch(url, {
method: 'POST',
body: JSON.stringify({
data: input,
}),
headers: {
'Content-Type': 'application/json',
...headers,
},
});
if (response.status !== 200) {
throw new Error(
`Server returned: ${response.status}: ${await response.text()}`
);
}
const wrappedResult = (await response.json()) as
| { result: O }
| { error: unknown };
if ('error' in wrappedResult) {
if (typeof wrappedResult.error === 'string') {
throw new Error(wrappedResult.error);
}
// TODO: The callable protocol defines an HttpError that has a JSON format of
// details?: string
// httpErrorCode: { canonicalName: string }
// message: string
// Should we create a new error class that parses this and exposes it as fields?
throw new Error(JSON.stringify(wrappedResult.error));
}
return wrappedResult.result;
}