MCP Terminal Server

/** * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import { RequestData } from '@genkit-ai/core'; import * as assert from 'assert'; import express from 'express'; import { GenerateResponseData, Genkit, UserFacingError, genkit, z, } from 'genkit'; import { runFlow, streamFlow } from 'genkit/beta/client'; import { ContextProvider } from 'genkit/context'; import { GenerateResponseChunkData, ModelAction } from 'genkit/model'; import getPort from 'get-port'; import * as http from 'http'; import { afterEach, beforeEach, describe, it } from 'node:test'; import { FlowServer, expressHandler, startFlowServer, withContextProvider, } from '../src/index.js'; interface Context { auth: { user: string; }; } const contextProvider: ContextProvider<Context> = (req: RequestData) => { assert.ok(req.method, 'method must be set'); assert.ok(req.headers, 'headers must be set'); assert.ok(req.input, 'input must be set'); if (req.headers['authorization'] !== 'open sesame') { throw new UserFacingError('PERMISSION_DENIED', 'not authorized'); } return { auth: { user: 'Ali Baba', }, }; }; describe('expressHandler', async () => { let server: http.Server; let port; beforeEach(async () => { const ai = genkit({}); const echoModel = defineEchoModel(ai); const voidInput = ai.defineFlow('voidInput', async () => { return 'banana'; }); const stringInput = ai.defineFlow('stringInput', async (input) => { const { text } = await ai.generate({ model: 'echoModel', prompt: input, }); return text; }); const objectInput = ai.defineFlow( { name: 'objectInput', inputSchema: z.object({ question: z.string() }) }, async (input) => { const { text } = await ai.generate({ model: 'echoModel', prompt: input.question, }); return text; } ); const streamingFlow = ai.defineFlow( { name: 'streamingFlow', inputSchema: z.object({ question: z.string() }), }, async (input, sendChunk) => { const { text } = await ai.generate({ model: 'echoModel', prompt: input.question, onChunk: sendChunk, }); return text; } ); const flowWithAuth = ai.defineFlow( { name: 'flowWithAuth', inputSchema: z.object({ question: z.string() }), }, async (input, { context }) => { return `${input.question} - ${JSON.stringify(context!.auth)}`; } ); const app = express(); app.use(express.json()); port = await getPort(); app.post('/voidInput', expressHandler(voidInput)); app.post('/stringInput', expressHandler(stringInput)); app.post('/objectInput', expressHandler(objectInput)); app.post('/streamingFlow', expressHandler(streamingFlow)); app.post( '/flowWithAuth', expressHandler(flowWithAuth, { contextProvider }) ); // Can also expose any action. app.post('/echoModel', expressHandler(echoModel)); app.post( '/echoModelWithAuth', expressHandler(echoModel, { contextProvider }) ); server = app.listen(port, () => { console.log(`Example app listening on port ${port}`); }); }); afterEach(() => { server.close(); }); describe('runFlow', () => { it('should call a void input flow', async () => { const result = await runFlow({ url: `http://localhost:${port}/voidInput`, }); assert.strictEqual(result, 'banana'); }); it('should run a flow with string input', async () => { const result = await runFlow({ url: `http://localhost:${port}/stringInput`, input: 'hello', }); assert.strictEqual(result, 'Echo: hello'); }); it('should run a flow with object input', async () => { const result = await runFlow({ url: `http://localhost:${port}/objectInput`, input: { question: 'olleh', }, }); assert.strictEqual(result, 'Echo: olleh'); }); it('should fail a bad input', async () => { const result = runFlow({ url: `http://localhost:${port}/objectInput`, input: { badField: 'hello', }, }); await assert.rejects(result, (err: Error) => { return err.message.includes('INVALID_ARGUMENT'); }); }); it('should call a flow with auth', async () => { const result = await runFlow<string>({ url: `http://localhost:${port}/flowWithAuth`, input: { question: 'hello', }, headers: { Authorization: 'open sesame', }, }); assert.strictEqual(result, 'hello - {"user":"Ali Baba"}'); }); it('should fail a flow with auth', async () => { const result = runFlow({ url: `http://localhost:${port}/flowWithAuth`, input: { question: 'hello', }, headers: { Authorization: 'thief #24', }, }); await assert.rejects(result, (err) => { return (err as Error).message.includes('not authorized'); }); }); it('should call a model', async () => { const result = await runFlow({ url: `http://localhost:${port}/echoModel`, input: { messages: [{ role: 'user', content: [{ text: 'hello' }] }], }, }); assert.strictEqual(result.finishReason, 'stop'); assert.deepStrictEqual(result.message, { role: 'model', content: [{ text: 'Echo: hello' }], }); }); it('should call a model with auth', async () => { const result = await runFlow<GenerateResponseData>({ url: `http://localhost:${port}/echoModelWithAuth`, input: { messages: [{ role: 'user', content: [{ text: 'hello' }] }], }, headers: { Authorization: 'open sesame', }, }); assert.strictEqual(result.finishReason, 'stop'); assert.deepStrictEqual(result.message, { role: 'model', content: [{ text: 'Echo: hello' }], }); }); it('should fail a flow with auth', async () => { const result = runFlow({ url: `http://localhost:${port}/echoModelWithAuth`, input: { messages: [ { role: 'user', content: [{ text: 'hello' }], }, ], }, headers: { Authorization: 'thief #24', }, }); await assert.rejects(result, (err) => { return (err as Error).message.includes('not authorized'); }); }); }); describe('streamFlow', () => { it('stream a flow', async () => { const result = streamFlow<string, GenerateResponseChunkData>({ url: `http://localhost:${port}/streamingFlow`, input: { question: 'olleh', }, }); const gotChunks: GenerateResponseChunkData[] = []; for await (const chunk of result.stream) { gotChunks.push(chunk); } assert.deepStrictEqual(gotChunks, [ { index: 0, role: 'model', content: [{ text: '3' }] }, { index: 0, role: 'model', content: [{ text: '2' }] }, { index: 0, role: 'model', content: [{ text: '1' }] }, ]); assert.strictEqual(await result.output, 'Echo: olleh'); }); it('stream a model', async () => { const result = streamFlow({ url: `http://localhost:${port}/echoModel`, input: { messages: [ { role: 'user', content: [{ text: 'olleh' }], }, ], }, }); const gotChunks: any[] = []; for await (const chunk of result.stream) { gotChunks.push(chunk); } const output = await result.output; assert.strictEqual(output.finishReason, 'stop'); assert.deepStrictEqual(output.message, { role: 'model', content: [{ text: 'Echo: olleh' }], }); assert.deepStrictEqual(gotChunks, [ { content: [{ text: '3' }] }, { content: [{ text: '2' }] }, { content: [{ text: '1' }] }, ]); }); }); }); describe('startFlowServer', async () => { let server: FlowServer; let port; beforeEach(async () => { const ai = genkit({}); defineEchoModel(ai); const voidInput = ai.defineFlow('voidInput', async () => { return 'banana'; }); const stringInput = ai.defineFlow('stringInput', async (input) => { const { text } = await ai.generate({ model: 'echoModel', prompt: input, }); return text; }); const objectInput = ai.defineFlow( { name: 'objectInput', inputSchema: z.object({ question: z.string() }) }, async (input) => { const { text } = await ai.generate({ model: 'echoModel', prompt: input.question, }); return text; } ); const streamingFlow = ai.defineFlow( { name: 'streamingFlow', inputSchema: z.object({ question: z.string() }), }, async (input, sendChunk) => { const { text } = await ai.generate({ model: 'echoModel', prompt: input.question, onChunk: sendChunk, }); return text; } ); const flowWithAuth = ai.defineFlow( { name: 'flowWithAuth', inputSchema: z.object({ question: z.string() }), }, async (input, { context }) => { return `${input.question} - ${JSON.stringify(context!.auth)}`; } ); port = await getPort(); server = startFlowServer({ flows: [ voidInput, stringInput, objectInput, streamingFlow, withContextProvider(flowWithAuth, contextProvider), ], port, }); }); afterEach(() => { server.stop(); }); describe('runFlow', () => { it('should call a void input flow', async () => { const result = await runFlow({ url: `http://localhost:${port}/voidInput`, }); assert.strictEqual(result, 'banana'); }); it('should run a flow with string input', async () => { const result = await runFlow({ url: `http://localhost:${port}/stringInput`, input: 'hello', }); assert.strictEqual(result, 'Echo: hello'); }); it('should run a flow with object input', async () => { const result = await runFlow({ url: `http://localhost:${port}/objectInput`, input: { question: 'olleh', }, }); assert.strictEqual(result, 'Echo: olleh'); }); it('should fail a bad input', async () => { const result = runFlow({ url: `http://localhost:${port}/objectInput`, input: { badField: 'hello', }, }); await assert.rejects(result, (err: Error) => { return err.message.includes('INVALID_ARGUMENT'); }); }); it('should call a flow with auth', async () => { const result = await runFlow<string>({ url: `http://localhost:${port}/flowWithAuth`, input: { question: 'hello', }, headers: { Authorization: 'open sesame', }, }); assert.strictEqual(result, 'hello - {"user":"Ali Baba"}'); }); it('should fail a flow with auth', async () => { const result = runFlow({ url: `http://localhost:${port}/flowWithAuth`, input: { question: 'hello', }, headers: { Authorization: 'thief #24', }, }); await assert.rejects(result, (err) => { return (err as Error).message.includes('not authorized'); }); }); }); describe('streamFlow', () => { it('stream a flow', async () => { const result = streamFlow<string, GenerateResponseChunkData>({ url: `http://localhost:${port}/streamingFlow`, input: { question: 'olleh', }, }); const gotChunks: GenerateResponseChunkData[] = []; for await (const chunk of result.stream) { gotChunks.push(chunk); } assert.deepStrictEqual(gotChunks, [ { index: 0, role: 'model', content: [{ text: '3' }] }, { index: 0, role: 'model', content: [{ text: '2' }] }, { index: 0, role: 'model', content: [{ text: '1' }] }, ]); assert.strictEqual(await result.output, 'Echo: olleh'); }); }); }); export function defineEchoModel(ai: Genkit): ModelAction { return ai.defineModel( { name: 'echoModel', }, async (request, streamingCallback) => { streamingCallback?.({ content: [ { text: '3', }, ], }); streamingCallback?.({ content: [ { text: '2', }, ], }); streamingCallback?.({ content: [ { text: '1', }, ], }); return { message: { role: 'model', content: [ { text: 'Echo: ' + request.messages .map( (m) => (m.role === 'user' || m.role === 'model' ? '' : `${m.role}: `) + m.content.map((c) => c.text).join() ) .join(), }, ], }, finishReason: 'stop', }; } ); }