MCP Terminal Server
by dillip285
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { GenkitError, stripUndefinedProps } from '@genkit-ai/core';
import { logger } from '@genkit-ai/core/logging';
import { Registry } from '@genkit-ai/core/registry';
import {
GenerateActionOptions,
GenerateResponseData,
MessageData,
Part,
ToolRequestPart,
ToolResponsePart,
} from '../model.js';
import { isPromptAction } from '../prompt.js';
import {
ToolAction,
ToolInterruptError,
ToolRunOptions,
isToolRequest,
resolveTools,
} from '../tool.js';
export function toToolMap(tools: ToolAction[]): Record<string, ToolAction> {
assertValidToolNames(tools);
const out: Record<string, ToolAction> = {};
for (const tool of tools) {
const name = tool.__action.name;
const shortName = name.substring(name.lastIndexOf('/') + 1);
out[shortName] = tool;
}
return out;
}
/** Ensures that each tool has a unique name. */
export function assertValidToolNames(tools: ToolAction[]) {
const nameMap: Record<string, string> = {};
for (const tool of tools) {
const name = tool.__action.name;
const shortName = name.substring(name.lastIndexOf('/') + 1);
if (nameMap[shortName]) {
throw new GenkitError({
status: 'INVALID_ARGUMENT',
message: `Cannot provide two tools with the same name: '${name}' and '${nameMap[shortName]}'`,
});
}
nameMap[shortName] = name;
}
}
function toRunOptions(part: ToolRequestPart): ToolRunOptions {
const out: ToolRunOptions = { metadata: part.metadata };
if (part.metadata?.resumed) out.resumed = part.metadata.resumed;
return out;
}
export function toPendingOutput(
part: ToolRequestPart,
response: ToolResponsePart
): ToolRequestPart {
return {
...part,
metadata: {
...part.metadata,
pendingOutput: response.toolResponse.output,
},
};
}
export async function resolveToolRequest(
rawRequest: GenerateActionOptions,
part: ToolRequestPart,
toolMap: Record<string, ToolAction>,
runOptions?: ToolRunOptions
): Promise<{
response?: ToolResponsePart;
interrupt?: ToolRequestPart;
preamble?: GenerateActionOptions;
}> {
const tool = toolMap[part.toolRequest.name];
if (!tool) {
throw new GenkitError({
status: 'NOT_FOUND',
message: `Tool ${part.toolRequest.name} not found`,
detail: { request: rawRequest },
});
}
// if it's a prompt action, go ahead and render the preamble
if (isPromptAction(tool)) {
const preamble = await tool(part.toolRequest.input);
const response = {
toolResponse: {
name: part.toolRequest.name,
ref: part.toolRequest.ref,
output: `transferred to ${part.toolRequest.name}`,
},
};
return { preamble, response };
}
// otherwise, execute the tool and catch interrupts
try {
const output = await tool(part.toolRequest.input, toRunOptions(part));
const response = stripUndefinedProps({
toolResponse: {
name: part.toolRequest.name,
ref: part.toolRequest.ref,
output,
},
});
return { response };
} catch (e) {
if (e instanceof ToolInterruptError) {
logger.debug(
`tool '${toolMap[part.toolRequest?.name].__action.name}' triggered an interrupt${e.metadata ? `: ${JSON.stringify(e.metadata)}` : ''}`
);
const interrupt = {
toolRequest: part.toolRequest,
metadata: { ...part.metadata, interrupt: e.metadata || true },
};
return { interrupt };
}
throw e;
}
}
/**
* resolveToolRequests is responsible for executing the tools requested by the model for a single turn. it
* returns either a toolMessage to append or a revisedModelMessage when an interrupt occurs, and a transferPreamble
* if a prompt tool is called
*/
export async function resolveToolRequests(
registry: Registry,
rawRequest: GenerateActionOptions,
generatedMessage: MessageData
): Promise<{
revisedModelMessage?: MessageData;
toolMessage?: MessageData;
transferPreamble?: GenerateActionOptions;
}> {
const toolMap = toToolMap(await resolveTools(registry, rawRequest.tools));
const responseParts: ToolResponsePart[] = [];
let hasInterrupts: boolean = false;
let transferPreamble: GenerateActionOptions | undefined;
const revisedModelMessage = {
...generatedMessage,
content: [...generatedMessage.content],
};
await Promise.all(
revisedModelMessage.content.map(async (part, i) => {
if (!part.toolRequest) return; // skip non-tool-request parts
const { preamble, response, interrupt } = await resolveToolRequest(
rawRequest,
part as ToolRequestPart,
toolMap
);
if (preamble) {
if (transferPreamble) {
throw new GenkitError({
status: 'INVALID_ARGUMENT',
message: `Model attempted to transfer to multiple prompt tools.`,
});
}
transferPreamble = preamble;
}
// this happens for preamble or normal tools
if (response) {
responseParts.push(response!);
revisedModelMessage.content.splice(
i,
1,
toPendingOutput(part, response)
);
}
if (interrupt) {
revisedModelMessage.content.splice(i, 1, interrupt);
hasInterrupts = true;
}
})
);
if (hasInterrupts) {
return { revisedModelMessage };
}
return {
toolMessage: { role: 'tool', content: responseParts },
transferPreamble,
};
}
function findCorrespondingToolRequest(
parts: Part[],
part: ToolRequestPart | ToolResponsePart
): ToolRequestPart | undefined {
const name = part.toolRequest?.name || part.toolResponse?.name;
const ref = part.toolRequest?.ref || part.toolResponse?.ref;
return parts.find(
(p) => p.toolRequest?.name === name && p.toolRequest?.ref === ref
) as ToolRequestPart | undefined;
}
function findCorrespondingToolResponse(
parts: Part[],
part: ToolRequestPart | ToolResponsePart
): ToolResponsePart | undefined {
const name = part.toolRequest?.name || part.toolResponse?.name;
const ref = part.toolRequest?.ref || part.toolResponse?.ref;
return parts.find(
(p) => p.toolResponse?.name === name && p.toolResponse?.ref === ref
) as ToolResponsePart | undefined;
}
async function resolveResumedToolRequest(
rawRequest: GenerateActionOptions,
part: ToolRequestPart,
toolMap: Record<string, ToolAction>
): Promise<{
toolRequest?: ToolRequestPart;
toolResponse?: ToolResponsePart;
interrupt?: ToolRequestPart;
}> {
if (part.metadata?.pendingOutput) {
const { pendingOutput, ...metadata } = part.metadata;
const toolResponse = {
toolResponse: {
name: part.toolRequest.name,
ref: part.toolRequest.ref,
output: pendingOutput,
},
metadata: { ...metadata, source: 'pending' },
};
// strip pendingOutput from metadata when returning
return stripUndefinedProps({
toolResponse,
toolRequest: { ...part, metadata },
});
}
// if there's a corresponding reply, append it to toolResponses
const providedResponse = findCorrespondingToolResponse(
rawRequest.resume?.respond || [],
part
);
if (providedResponse) {
const toolResponse = providedResponse;
// remove the 'interrupt' but leave a 'resolvedInterrupt'
const { interrupt, ...metadata } = part.metadata || {};
return stripUndefinedProps({
toolResponse,
toolRequest: {
...part,
metadata: { ...metadata, resolvedInterrupt: interrupt },
},
});
}
// if there's a corresponding restart, execute then add to toolResponses
const restartRequest = findCorrespondingToolRequest(
rawRequest.resume?.restart || [],
part
);
if (restartRequest) {
const { response, interrupt, preamble } = await resolveToolRequest(
rawRequest,
restartRequest,
toolMap
);
if (preamble) {
throw new GenkitError({
status: 'INTERNAL',
message: `Prompt tool '${restartRequest.toolRequest.name}' executed inside 'restart' resolution. This should never happen.`,
});
}
// if there's a new interrupt, return it
if (interrupt) return { interrupt };
if (response) {
const toolResponse = response;
// remove the 'interrupt' but leave a 'resolvedInterrupt'
const { interrupt, ...metadata } = part.metadata || {};
return stripUndefinedProps({
toolResponse,
toolRequest: {
...part,
metadata: { ...metadata, resolvedInterrupt: interrupt },
},
});
}
}
throw new GenkitError({
status: 'INVALID_ARGUMENT',
message: `Unresolved tool request '${part.toolRequest.name}${part.toolRequest.ref ? `#${part.toolRequest.ref}` : ''}' was not handled by the 'resume' argument. You must supply replies or restarts for all interrupted tool requests.'`,
});
}
/** Amends message history to handle `resume` arguments. Returns the amended history. */
export async function resolveResumeOption(
registry: Registry,
rawRequest: GenerateActionOptions
): Promise<{
revisedRequest?: GenerateActionOptions;
interruptedResponse?: GenerateResponseData;
toolMessage?: MessageData;
}> {
if (!rawRequest.resume) return { revisedRequest: rawRequest }; // no-op if no resume option
const toolMap = toToolMap(await resolveTools(registry, rawRequest.tools));
const messages = rawRequest.messages;
const lastMessage = messages.at(-1);
if (
!lastMessage ||
lastMessage.role !== 'model' ||
!lastMessage.content.find((p) => p.toolRequest)
) {
throw new GenkitError({
status: 'FAILED_PRECONDITION',
message: `Cannot 'resume' generation unless the previous message is a model message with at least one tool request.`,
});
}
const toolResponses: ToolResponsePart[] = [];
let interrupted = false;
lastMessage.content = await Promise.all(
lastMessage.content.map(async (part) => {
if (!isToolRequest(part)) return part;
const resolved = await resolveResumedToolRequest(
rawRequest,
part,
toolMap
);
if (resolved.interrupt) {
interrupted = true;
return resolved.interrupt;
}
toolResponses.push(resolved.toolResponse!);
return resolved.toolRequest!;
})
);
if (interrupted) {
// TODO: figure out how to make this trigger an interrupt response.
return {
interruptedResponse: {
finishReason: 'interrupted',
finishMessage:
'One or more tools triggered interrupts while resuming generation. The model was not called.',
message: lastMessage,
},
};
}
const numToolRequests = lastMessage.content.filter(
(p) => !!p.toolRequest
).length;
if (toolResponses.length !== numToolRequests) {
throw new GenkitError({
status: 'FAILED_PRECONDITION',
message: `Expected ${numToolRequests} tool responses but resolved to ${toolResponses.length}.`,
detail: { toolResponses, message: lastMessage },
});
}
const toolMessage: MessageData = {
role: 'tool',
content: toolResponses,
metadata: {
resumed: rawRequest.resume.metadata || true,
},
};
return stripUndefinedProps({
revisedRequest: {
...rawRequest,
resume: undefined,
messages: [...messages, toolMessage],
},
toolMessage,
});
}
export async function resolveRestartedTools(
registry: Registry,
rawRequest: GenerateActionOptions
): Promise<ToolRequestPart[]> {
const toolMap = toToolMap(await resolveTools(registry, rawRequest.tools));
const lastMessage = rawRequest.messages.at(-1);
if (!lastMessage || lastMessage.role !== 'model') return [];
const restarts = lastMessage.content.filter(
(p) => p.toolRequest && p.metadata?.resumed
) as ToolRequestPart[];
return await Promise.all(
restarts.map(async (p) => {
const { response, interrupt } = await resolveToolRequest(
rawRequest,
p,
toolMap
);
// this means that it interrupted *again* after the restart
if (interrupt) return interrupt;
return toPendingOutput(p, response!);
})
);
}