MCP Terminal Server
by dillip285
/**
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { type JSONSchema7 } from 'json-schema';
import * as z from 'zod';
import { lazy } from './async.js';
import { ActionContext, getContext, runWithContext } from './context.js';
import { ActionType, Registry } from './registry.js';
import { parseSchema } from './schema.js';
import {
SPAN_TYPE_ATTR,
newTrace,
setCustomMetadataAttributes,
} from './tracing.js';
export { StatusCodes, StatusSchema, type Status } from './statusTypes.js';
export { JSONSchema7 };
/**
* Action metadata.
*/
export interface ActionMetadata<
I extends z.ZodTypeAny,
O extends z.ZodTypeAny,
S extends z.ZodTypeAny,
> {
actionType?: ActionType;
name: string;
description?: string;
inputSchema?: I;
inputJsonSchema?: JSONSchema7;
outputSchema?: O;
outputJsonSchema?: JSONSchema7;
streamSchema?: S;
metadata?: Record<string, any>;
}
/**
* Results of an action run. Includes telemetry.
*/
export interface ActionResult<O> {
result: O;
telemetry: {
traceId: string;
spanId: string;
};
}
/**
* Options (side channel) data to pass to the model.
*/
export interface ActionRunOptions<S> {
/**
* Streaming callback (optional).
*/
onChunk?: StreamingCallback<S>;
/**
* Additional runtime context data (ex. auth context data).
*/
context?: ActionContext;
/**
* Additional span attributes to apply to OT spans.
*/
telemetryLabels?: Record<string, string>;
}
/**
* Options (side channel) data to pass to the model.
*/
export interface ActionFnArg<S> {
/**
* Streaming callback (optional).
*/
sendChunk: StreamingCallback<S>;
/**
* Additional runtime context data (ex. auth context data).
*/
context?: ActionContext;
}
/**
* Streaming response from an action.
*/
export interface StreamingResponse<
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
> {
/** Iterator over the streaming chunks. */
stream: AsyncGenerator<z.infer<S>>;
/** Final output of the action. */
output: Promise<z.infer<O>>;
}
/**
* Self-describing, validating, observable, locally and remotely callable function.
*/
export type Action<
I extends z.ZodTypeAny = z.ZodTypeAny,
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
RunOptions extends ActionRunOptions<S> = ActionRunOptions<S>,
> = ((input?: z.infer<I>, options?: RunOptions) => Promise<z.infer<O>>) & {
__action: ActionMetadata<I, O, S>;
__registry: Registry;
run(
input?: z.infer<I>,
options?: ActionRunOptions<z.infer<S>>
): Promise<ActionResult<z.infer<O>>>;
stream(
input?: z.infer<I>,
opts?: ActionRunOptions<z.infer<S>>
): StreamingResponse<O, S>;
};
/**
* Action factory params.
*/
export type ActionParams<
I extends z.ZodTypeAny,
O extends z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
> = {
name:
| string
| {
pluginId: string;
actionId: string;
};
description?: string;
inputSchema?: I;
inputJsonSchema?: JSONSchema7;
outputSchema?: O;
outputJsonSchema?: JSONSchema7;
metadata?: Record<string, any>;
use?: Middleware<z.infer<I>, z.infer<O>, z.infer<S>>[];
streamSchema?: S;
actionType: ActionType;
};
export type ActionAsyncParams<
I extends z.ZodTypeAny,
O extends z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
> = ActionParams<I, O, S> & {
fn: (
input: z.infer<I>,
options: ActionFnArg<z.infer<S>>
) => Promise<z.infer<O>>;
};
export type SimpleMiddleware<I = any, O = any> = (
req: I,
next: (req?: I) => Promise<O>
) => Promise<O>;
export type MiddlewareWithOptions<I = any, O = any, S = any> = (
req: I,
options: ActionRunOptions<S> | undefined,
next: (req?: I, options?: ActionRunOptions<S>) => Promise<O>
) => Promise<O>;
/**
* Middleware function for actions.
*/
export type Middleware<I = any, O = any, S = any> =
| SimpleMiddleware<I, O>
| MiddlewareWithOptions<I, O, S>;
/**
* Creates an action with provided middleware.
*/
export function actionWithMiddleware<
I extends z.ZodTypeAny,
O extends z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
>(
action: Action<I, O, S>,
middleware: Middleware<z.infer<I>, z.infer<O>, z.infer<S>>[]
): Action<I, O, S> {
const wrapped = (async (req: z.infer<I>) => {
return (await wrapped.run(req)).result;
}) as Action<I, O, S>;
wrapped.__action = action.__action;
wrapped.__registry = action.__registry;
wrapped.run = async (
req: z.infer<I>,
options?: ActionRunOptions<z.infer<S>>
): Promise<ActionResult<z.infer<O>>> => {
let telemetry;
const dispatch = async (
index: number,
req: z.infer<I>,
opts?: ActionRunOptions<z.infer<S>>
) => {
if (index === middleware.length) {
// end of the chain, call the original model action
const result = await action.run(req, opts);
telemetry = result.telemetry;
return result.result;
}
const currentMiddleware = middleware[index];
if (currentMiddleware.length === 3) {
return (currentMiddleware as MiddlewareWithOptions<I, O, z.infer<S>>)(
req,
opts,
async (modifiedReq, modifiedOptions) =>
dispatch(index + 1, modifiedReq || req, modifiedOptions || opts)
);
} else if (currentMiddleware.length === 2) {
return (currentMiddleware as SimpleMiddleware<I, O>)(
req,
async (modifiedReq) => dispatch(index + 1, modifiedReq || req, opts)
);
} else {
throw new Error('unspported middleware function shape');
}
};
wrapped.stream = action.stream;
return { result: await dispatch(0, req, options), telemetry };
};
return wrapped;
}
/**
* Creates an action with the provided config.
*/
export function action<
I extends z.ZodTypeAny,
O extends z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
>(
registry: Registry,
config: ActionParams<I, O, S>,
fn: (
input: z.infer<I>,
options: ActionFnArg<z.infer<S>>
) => Promise<z.infer<O>>
): Action<I, O, z.infer<S>> {
const actionName =
typeof config.name === 'string'
? config.name
: `${config.name.pluginId}/${config.name.actionId}`;
const actionFn = async (
input?: I,
options?: ActionRunOptions<z.infer<S>>
) => {
return (await actionFn.run(input, options)).result;
};
actionFn.__registry = registry;
actionFn.__action = {
name: actionName,
description: config.description,
inputSchema: config.inputSchema,
inputJsonSchema: config.inputJsonSchema,
outputSchema: config.outputSchema,
outputJsonSchema: config.outputJsonSchema,
metadata: config.metadata,
} as ActionMetadata<I, O, S>;
actionFn.run = async (
input: z.infer<I>,
options?: ActionRunOptions<z.infer<S>>
): Promise<ActionResult<z.infer<O>>> => {
input = parseSchema(input, {
schema: config.inputSchema,
jsonSchema: config.inputJsonSchema,
});
let traceId;
let spanId;
let output = await newTrace(
registry,
{
name: actionName,
labels: {
[SPAN_TYPE_ATTR]: 'action',
'genkit:metadata:subtype': config.actionType,
...options?.telemetryLabels,
},
},
async (metadata, span) => {
setCustomMetadataAttributes(registry, { subtype: config.actionType });
if (options?.context) {
setCustomMetadataAttributes(registry, {
context: JSON.stringify(options.context),
});
}
traceId = span.spanContext().traceId;
spanId = span.spanContext().spanId;
metadata.name = actionName;
metadata.input = input;
try {
const actionFn = () =>
fn(input, {
...options,
// Context can either be explicitly set, or inherited from the parent action.
context: options?.context ?? getContext(registry),
sendChunk: options?.onChunk ?? sentinelNoopStreamingCallback,
});
// if context is explicitly passed in, we run action with the provided context,
// otherwise we let upstream context carry through.
const output = await runWithContext(
registry,
options?.context,
actionFn
);
metadata.output = JSON.stringify(output);
return output;
} catch (err) {
if (typeof err === 'object') {
(err as any).traceId = traceId;
}
throw err;
}
}
);
output = parseSchema(output, {
schema: config.outputSchema,
jsonSchema: config.outputJsonSchema,
});
return {
result: output,
telemetry: {
traceId,
spanId,
},
};
};
actionFn.stream = (
input?: z.infer<I>,
opts?: ActionRunOptions<z.infer<S>>
): StreamingResponse<O, S> => {
let chunkStreamController: ReadableStreamController<z.infer<S>>;
const chunkStream = new ReadableStream<z.infer<S>>({
start(controller) {
chunkStreamController = controller;
},
pull() {},
cancel() {},
});
const invocationPromise = actionFn
.run(config.inputSchema ? config.inputSchema.parse(input) : input, {
onChunk: ((chunk: z.infer<S>) => {
chunkStreamController.enqueue(chunk);
}) as S extends z.ZodVoid ? undefined : StreamingCallback<z.infer<S>>,
context: opts?.context,
})
.then((s) => s.result)
.finally(() => {
chunkStreamController.close();
});
return {
output: invocationPromise,
stream: (async function* () {
const reader = chunkStream.getReader();
while (true) {
const chunk = await reader.read();
if (chunk.value) {
yield chunk.value;
}
if (chunk.done) {
break;
}
}
return await invocationPromise;
})(),
};
};
if (config.use) {
return actionWithMiddleware(actionFn, config.use);
}
return actionFn;
}
/**
* Defines an action with the given config and registers it in the registry.
*/
export function defineAction<
I extends z.ZodTypeAny,
O extends z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
>(
registry: Registry,
config: ActionParams<I, O, S>,
fn: (
input: z.infer<I>,
options: ActionFnArg<z.infer<S>>
) => Promise<z.infer<O>>
): Action<I, O, S> {
if (isInRuntimeContext(registry)) {
throw new Error(
'Cannot define new actions at runtime.\n' +
'See: https://github.com/firebase/genkit/blob/main/docs/errors/no_new_actions_at_runtime.md'
);
}
const act = action(
registry,
config,
async (i: I, options): Promise<z.infer<O>> => {
await registry.initializeAllPlugins();
return await runInActionRuntimeContext(registry, () => fn(i, options));
}
);
act.__action.actionType = config.actionType;
registry.registerAction(config.actionType, act);
return act;
}
/**
* Defines an action with the given config promise and registers it in the registry.
*/
export function defineActionAsync<
I extends z.ZodTypeAny,
O extends z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
>(
registry: Registry,
actionType: ActionType,
name:
| string
| {
pluginId: string;
actionId: string;
},
config: PromiseLike<ActionAsyncParams<I, O, S>>,
onInit?: (action: Action<I, O, S>) => void
): PromiseLike<Action<I, O, S>> {
const actionName =
typeof name === 'string' ? name : `${name.pluginId}/${name.actionId}`;
const actionPromise = lazy(() =>
config.then((resolvedConfig) => {
const act = action(
registry,
resolvedConfig,
async (i: I, options): Promise<z.infer<O>> => {
await registry.initializeAllPlugins();
return await runInActionRuntimeContext(registry, () =>
resolvedConfig.fn(i, options)
);
}
);
act.__action.actionType = actionType;
onInit?.(act);
return act;
})
);
registry.registerActionAsync(actionType, actionName, actionPromise);
return actionPromise;
}
// Streaming callback function.
export type StreamingCallback<T> = (chunk: T) => void;
const streamingAlsKey = 'core.action.streamingCallback';
export const sentinelNoopStreamingCallback = () => null;
/**
* Executes provided function with streaming callback in async local storage which can be retrieved
* using {@link getStreamingCallback}.
*/
export function runWithStreamingCallback<S, O>(
registry: Registry,
streamingCallback: StreamingCallback<S> | undefined,
fn: () => O
): O {
return registry.asyncStore.run(
streamingAlsKey,
streamingCallback || sentinelNoopStreamingCallback,
fn
);
}
/**
* Retrieves the {@link StreamingCallback} previously set by {@link runWithStreamingCallback}
*
* @hidden
*/
export function getStreamingCallback<S>(
registry: Registry
): StreamingCallback<S> | undefined {
const cb =
registry.asyncStore.getStore<StreamingCallback<S>>(streamingAlsKey);
if (cb === sentinelNoopStreamingCallback) {
return undefined;
}
return cb;
}
const runtimeContextAslKey = 'core.action.runtimeContext';
/**
* Checks whether the caller is currently in the runtime context of an action.
*/
export function isInRuntimeContext(registry: Registry) {
return registry.asyncStore.getStore(runtimeContextAslKey) === 'runtime';
}
/**
* Execute the provided function in the action runtime context.
*/
export function runInActionRuntimeContext<R>(registry: Registry, fn: () => R) {
return registry.asyncStore.run(runtimeContextAslKey, 'runtime', fn);
}
/**
* Execute the provided function outside the action runtime context.
*/
export function runOutsideActionRuntimeContext<R>(
registry: Registry,
fn: () => R
) {
return registry.asyncStore.run(runtimeContextAslKey, 'outside', fn);
}