Skip to main content
Glama
tool-executor.ts17.4 kB
import { ApiConfig, ExecutionStep, Integration, RequestOptions, Workflow as Tool, WorkflowResult as ToolResult, WorkflowStepResult as ToolStepResult } from "@superglue/client"; import { Metadata } from "@superglue/shared"; import { flattenAndNamespaceCredentials } from "@superglue/shared/utils"; import { JSONSchema } from "openai/lib/jsonschema.mjs"; import { z } from "zod"; import { zodToJsonSchema } from "zod-to-json-schema"; import { getEvaluateStepResponseContext, getGenerateStepConfigContext } from "../context/context-builders.js"; import { EVALUATE_STEP_RESPONSE_SYSTEM_PROMPT, GENERATE_STEP_CONFIG_SYSTEM_PROMPT } from "../context/context-prompts.js"; import { server_defaults } from "../default.js"; import { IntegrationManager } from "../integrations/integration-manager.js"; import { LanguageModel, LLMMessage } from "../llm/llm-base-model.js"; import { isSelfHealingEnabled, maskCredentials, transformData } from "../utils/helpers.js"; import { logMessage } from "../utils/logs.js"; import { telemetryClient } from "../utils/telemetry.js"; import { FTPStepExecutionStrategy } from "./strategies/ftp/ftp.js"; import { AbortError, ApiCallError, HttpStepExecutionStrategy } from "./strategies/http/http.js"; import { PostgresStepExecutionStrategy } from "./strategies/postgres/postgres.js"; import { StepExecutionStrategyRegistry } from "./strategies/strategy.js"; import { generateStepConfig } from "./tool-step-builder.js"; import { executeAndEvaluateFinalTransform, generateWorkingTransform } from "./tool-transform.js"; export interface ToolExecutorOptions { tool: Tool; metadata: Metadata; integrations: IntegrationManager[]; } export class ToolExecutor implements Tool { public id: string; public steps: ExecutionStep[]; public finalTransform?: string; public result: ToolResult; public responseSchema?: JSONSchema; public metadata: Metadata; public instruction?: string; public inputSchema?: JSONSchema; public integrationIds: string[]; private integrations: Record<string, IntegrationManager>; private strategyRegistry: StepExecutionStrategyRegistry; constructor({ tool, metadata, integrations }: ToolExecutorOptions) { this.id = tool.id; this.steps = tool.steps; this.finalTransform = tool.finalTransform; this.responseSchema = tool.responseSchema; this.instruction = tool.instruction; this.metadata = metadata; this.inputSchema = tool.inputSchema; this.integrations = integrations.reduce((acc, int) => { acc[int.id] = int; return acc; }, {} as Record<string, IntegrationManager>); this.integrationIds = tool.integrationIds; this.result = { id: crypto.randomUUID(), success: false, data: {}, stepResults: [], startedAt: new Date(), completedAt: undefined, config: tool, } as ToolResult; this.strategyRegistry = new StepExecutionStrategyRegistry(); this.strategyRegistry.register(new HttpStepExecutionStrategy()); this.strategyRegistry.register(new PostgresStepExecutionStrategy()); this.strategyRegistry.register(new FTPStepExecutionStrategy()); } public async execute({ payload = {}, credentials = {}, options = {} }: { payload?: Record<string, any>, credentials?: Record<string, string>, options?: RequestOptions }): Promise<ToolResult & { data?: any }> { try { this.validate({ payload, credentials }); logMessage("debug", `Executing tool ${this.id}`, this.metadata); for (const step of this.steps) { const aggregatedStepData = this.buildAggregatedStepData(payload); const stepResult = await this.executeStep({ step, stepInput: aggregatedStepData, credentials, options }); this.result.stepResults.push(stepResult.result); step.apiConfig = stepResult.updatedStep.apiConfig; step.loopSelector = stepResult.updatedStep.loopSelector; if (!stepResult.result.success) { return this.completeWithFailure(stepResult.result.error || 'Step execution failed'); } } if (this.finalTransform || this.responseSchema) { const finalAggregatedStepData = this.buildAggregatedStepData(payload); const finalTransformResult = await executeAndEvaluateFinalTransform({ aggregatedStepData: finalAggregatedStepData, finalTransform: this.finalTransform, responseSchema: this.responseSchema, instruction: this.instruction, options: options, metadata: this.metadata }); if (!finalTransformResult.success) { return this.completeWithFailure(finalTransformResult.error); } this.result.data = finalTransformResult.transformedData || {}; this.result.config = { id: this.id, integrationIds: this.integrationIds, steps: this.steps, finalTransform: finalTransformResult.finalTransform, inputSchema: this.inputSchema, responseSchema: this.responseSchema, instruction: this.instruction } as Tool; } else { // Always set config to propagate wrapped loopSelectors back to frontend this.result.config = { id: this.id, steps: this.steps, finalTransform: this.finalTransform, inputSchema: this.inputSchema, responseSchema: this.responseSchema, instruction: this.instruction } as Tool; } return this.completeWithSuccess(); } catch (error) { return this.completeWithFailure(error?.message || error); } } private async executeStep({ step, stepInput, credentials, options }: { step: ExecutionStep, stepInput: Record<string, any>, credentials: Record<string, string>, options: RequestOptions }): Promise<{result: ToolStepResult, updatedStep: ExecutionStep}> { try { const isSelfHealing = isSelfHealingEnabled(options, "api"); let retryCount = 0; let lastError: string | null = null; let messages: LLMMessage[] = []; let currentConfig = step.apiConfig; let currentDataSelector = step.loopSelector; let stepCredentials = credentials; let isLoopStep = false; let stepResults: any[] = []; const integrationManager = this.integrations[step.integrationId]; let currentIntegration: Integration | null = null; let loopPayload: any = null; if (!integrationManager) { throw new Error(`Integration '${step.integrationId}' not found. Available integrations: ${Object.keys(this.integrations).join(', ')}`); } // Get integration early so it's available for self-healing even if data selector fails await integrationManager?.refreshTokenIfNeeded(); currentIntegration = await integrationManager?.getIntegration(); if (currentIntegration) { stepCredentials = { ...credentials, ...flattenAndNamespaceCredentials([currentIntegration]) } as Record<string, string>; } const maxRetries = isSelfHealing ? (options?.retries !== undefined ? options.retries : server_defaults.MAX_CALL_RETRIES) : 1; while (retryCount < maxRetries) { try { const dataSelectorTransformResult = await transformData(stepInput, currentDataSelector); if (!dataSelectorTransformResult.success) { throw new Error(`Loop selector for '${step.id}' failed. ${dataSelectorTransformResult.error}\nCode: ${currentDataSelector}\nPayload: ${JSON.stringify(stepInput).slice(0, 1000)}...`); } const dataSelectorOutput = dataSelectorTransformResult.data || {}; isLoopStep = Array.isArray(dataSelectorOutput); let itemsToExecuteStepOn = isLoopStep ? dataSelectorOutput : [dataSelectorOutput || {}]; itemsToExecuteStepOn = itemsToExecuteStepOn.slice(0, step.loopMaxIters || server_defaults.DEFAULT_LOOP_MAX_ITERS); stepResults = []; for (let i = 0; i < itemsToExecuteStepOn.length; i++) { const currentItem = itemsToExecuteStepOn[i]; if (itemsToExecuteStepOn.length > 1) { logMessage("debug", `Executing loop iteration ${i + 1}/${itemsToExecuteStepOn.length} with item: ${JSON.stringify(currentItem).slice(0, 100)}...`, this.metadata); } loopPayload = { currentItem, ...stepInput }; // Refresh integration token if needed (important for long-running loops) await integrationManager?.refreshTokenIfNeeded(); currentIntegration = await integrationManager?.getIntegration(); if (currentIntegration) { stepCredentials = { ...credentials, ...flattenAndNamespaceCredentials([currentIntegration]) } as Record<string, string>; } const itemExecutionResult = await this.strategyRegistry.routeAndExecute({ stepConfig: currentConfig, stepInputData: loopPayload, credentials: stepCredentials, requestOptions: { ...options, testMode: false } }); if (!itemExecutionResult.success || itemExecutionResult.strategyExecutionData === undefined) { throw new Error(itemExecutionResult.error || `No data returned from iteration: ${i + 1} in step: ${step.id}`); } const stepResponseData = { currentItem, data: itemExecutionResult.strategyExecutionData }; stepResults.push({ stepId: step.id, success: true, stepResponseData, }); } // llm as a judge validation on the output data, this function throws an error if the data does not align with the instruction if (options.testMode || isSelfHealing) { await this.validateStepResponse( isLoopStep ? stepResults.map(r => r.stepResponseData) : stepResults[0]?.stepResponseData || null, currentConfig, integrationManager ); } // we went through the whole step without errors, so we can break out of the retry loop break; } catch (error) { lastError = maskCredentials(error?.message || String(error), stepCredentials).slice(0, 10000); if (retryCount > 0) { messages.push({ role: "user", content: `Error: ${lastError}` }); logMessage('info', `Step execution failed: ${lastError}`, this.metadata); } if (error instanceof AbortError) throw error; retryCount++; if (retryCount >= maxRetries) { this.handleMaxRetriesExceeded(step.id,currentConfig, retryCount, lastError); } if (isSelfHealing) { logMessage('info', `Self healing step config (retry ${retryCount})`, this.metadata); if (messages.length === 0) { messages = await this.initializeSelfHealingContext( integrationManager, currentConfig, currentDataSelector, loopPayload, stepCredentials, currentIntegration ); } const generateStepConfigResult = await generateStepConfig({ retryCount, messages, integration: currentIntegration }); if (!generateStepConfigResult.success) { throw new Error(generateStepConfigResult.error); } currentConfig = { ...currentConfig, ...generateStepConfigResult.config } as ApiConfig; currentDataSelector = generateStepConfigResult.dataSelector; } } } logMessage("info", `'${step.id}' Complete`, this.metadata); const toolStepResult = { stepId: step.id, success: true, data: isLoopStep ? stepResults.map(r => r.stepResponseData) : stepResults[0]?.stepResponseData || null, error: undefined } as ToolStepResult; const updatedStep = { ...step, apiConfig: currentConfig, loopSelector: currentDataSelector } as ExecutionStep; return { result: toolStepResult, updatedStep: updatedStep }; } catch (error) { logMessage("info", `'${step.id}' Failed`, this.metadata); return { result: { stepId: step.id, success: false, data: null, error: error.message || error }, updatedStep: step } as {result: ToolStepResult, updatedStep: ExecutionStep}; } } private async initializeSelfHealingContext( integrationManager: IntegrationManager, config: ApiConfig, loopSelector: string, payload: any, credentials: Record<string, string>, integration: Integration ): Promise<LLMMessage[]> { const docs = await integrationManager.getDocumentation(); const userPrompt = getGenerateStepConfigContext({ instruction: config.instruction, previousStepConfig: config, previousStepDataSelector: loopSelector, stepInput: payload, credentials, integrationDocumentation: docs?.content || '', integrationSpecificInstructions: integration.specificInstructions || '' }, { characterBudget: 50000, mode: 'self-healing' }); return [ { role: "system", content: GENERATE_STEP_CONFIG_SYSTEM_PROMPT }, { role: "user", content: userPrompt } ]; } private async validateStepResponse( data: any, config: ApiConfig, integrationManager: IntegrationManager ): Promise<void> { const evaluation = await this.evaluateStepResponse({ data, config, docSearchResultsForStepInstruction: await integrationManager?.searchDocumentation(config.instruction) }); if (!evaluation.success) { throw new Error(evaluation.shortReason + " " + JSON.stringify(data).slice(0, 10000)); } } private handleMaxRetriesExceeded( stepId: string, config: ApiConfig, retryCount: number, lastError: string, ): never { telemetryClient?.captureException( new Error(`API call failed after ${retryCount} attempts: ${lastError}`), this.metadata?.orgId, { config, retryCount } ); throw new ApiCallError( `Error executing step ${stepId}: ${lastError}`, 500 ); } private validate(payload: Record<string, unknown>): void { if (!this.id) { throw new Error("Tool must have a valid ID"); } if (!this.steps || !Array.isArray(this.steps)) { throw new Error("Execution steps must be an array"); } if (!this.strategyRegistry) { throw new Error("Step execution strategy registry is required"); } for (const step of this.steps) { if (!step.id) { throw new Error("Each step must have an ID"); } if (!step.apiConfig) { throw new Error("Each step must have an API config"); } } } private buildAggregatedStepData(originalPayload: Record<string, unknown>): Record<string, unknown> { const stepResults: Record<string, unknown> = {}; for (const result of this.result.stepResults) { if (result?.data) { stepResults[result.stepId] = result.data; } } return { ...originalPayload, ...stepResults, }; } private completeWithSuccess(): ToolResult { this.result.success = true; this.result.error = undefined; this.result.completedAt = new Date(); return this.result; } private completeWithFailure(error: any): ToolResult { this.result.success = false; this.result.error = error; this.result.completedAt = new Date(); return this.result; } private async evaluateStepResponse({ data, config, docSearchResultsForStepInstruction }: { data: any, config: ApiConfig, docSearchResultsForStepInstruction?: string }): Promise<{ success?: boolean; refactorNeeded?: boolean; shortReason?: string }> { const evaluateStepResponsePrompt = getEvaluateStepResponseContext({ data, config, docSearchResultsForStepInstruction }, { characterBudget: 20000 }); const messages = [ { role: "system", content: EVALUATE_STEP_RESPONSE_SYSTEM_PROMPT }, { role: "user", content: evaluateStepResponsePrompt } ] as LLMMessage[]; const evaluationSchema = z.object({ success: z.boolean().describe("Whether the step execution was successful"), refactorNeeded: z.boolean().describe("Whether the configuration needs to be refactored"), shortReason: z.string().describe("Brief reason for the evaluation result") }); const evaluationResult = await LanguageModel.generateObject<z.infer<typeof evaluationSchema>>( { messages: messages, schema: zodToJsonSchema(evaluationSchema), temperature: 0 }); if (!evaluationResult.success) { throw new Error(`Error evaluating config response: ${evaluationResult.response}`); } return evaluationResult.response; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/superglue-ai/superglue'

If you have feedback or need assistance with the MCP directory API, please join our Discord server