workflow_tool
Orchestrate serial or parallel workflows by configuring steps, tools, and parameters to automate complex processes efficiently within ToolBox MCP Server.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| outputFile | No | Path to output file (optional) | |
| steps | Yes | List of workflow steps | |
| version | No | Workflow definition version (e.g., '1.0.1') | 1.0 |
Implementation Reference
- src/tools/workflow_tool.ts:7-63 (schema)Input schema definition for the workflow_tool, specifying parameters like steps, parallel execution, version, and output file.export const schema = { name: "workflow_tool", description: "Orchestrate tools in serial/parallel workflows", type: "object", properties: { version: { type: "string", default: "1.0", description: "Workflow definition version (e.g., '1.0.1')" }, parallel: { type: "boolean", default: false, description: "If true, executes all steps in parallel." }, steps: { type: "array", description: "List of workflow steps", items: { type: "object", description: "Step configuration (tool, args, retry)", properties: { tool: { type: "string", description: "Tool name (e.g., 'sftp_tool')" }, args: { type: "object", description: "Tool parameters (e.g., {action:'upload'})" }, retry: { type: "number", default: 0, description: "Number of retries" }, timeout: { type: "number", description: "Timeout (ms)" }, compensation: { type: "object", description: "Compensation config (tool, args)", properties: { tool: { type: "string" }, args: { type: "object" } } } } } }, outputFile: { type: "string", description: "Path to output file (optional)" } }, required: ["steps"] };
- src/tools/workflow_tool.ts:86-164 (handler)Main handler function for workflow_tool. Orchestrates serial or parallel execution of tool steps, handles errors, compensation, logging to file, and returns execution report.export default async function (request: any) { const { steps, outputFile, parallel = false } = request.params.arguments; let stepDetails: any[] = []; let workflowStatus = "success"; const startTime = new Date().toISOString(); let executionTime = 0; const outputPath = getOutputPath(outputFile); // Check for restricted tools const restrictedTools = ["buildReload_tool", "workflow_tool"]; for (const step of steps) { if (restrictedTools.includes(step.tool)) { return { content: [ { type: "text", text: `Workflow cannot include ${step.tool} tool.` } ], isError: true }; } } try { if (parallel) { // Parallel execution const promises = steps.map((step, index) => executeStep(step, index)); const results = await Promise.all(promises); stepDetails = results; if (results.some(r => r.status === "failed")) { workflowStatus = "failed"; } } else { // Serial execution for (const [index, step] of steps.entries()) { const stepResult = await executeStep(step, index); stepDetails.push(stepResult); if (stepResult.status === "failed") { workflowStatus = "failed"; // In serial execution, we could choose to break here. // For now, we continue to allow compensation on later steps if needed. } } } executionTime = new Date().getTime() - new Date(startTime).getTime(); // Sort by index for consistent reports, especially after parallel execution const sortedStepDetails = stepDetails.sort((a, b) => a.index - b.index); const report = { workflowStatus, executionTime, steps: sortedStepDetails }; await saveReport(report, outputPath); return { content: [ { type: "text", text: JSON.stringify(report, null, 2) } ] }; } catch (error) { console.error("Workflow failed:", error); workflowStatus = "failed"; executionTime = new Date().getTime() - new Date(startTime).getTime(); const finalReport = { workflowStatus, executionTime, error: error instanceof Error ? error.message : String(error), steps: stepDetails }; await saveReport(finalReport, outputPath); return { content: [ { type: "text", text: JSON.stringify(finalReport, null, 2) } ], isError: true }; } }
- src/tools/workflow_tool.ts:166-202 (helper)Helper function to execute a single workflow step, including calling the tool, handling retries implicitly via compensation, and capturing results/errors.async function executeStep(step: any, index: number) { const startTime = new Date().toISOString(); let duration = 0; let result = null; let status = "success"; let error = null; try { const start = Date.now(); result = await callToolHandler({ params: { name: step.tool, arguments: step.args } }, `workflow_tool_step_${index}`); duration = Date.now() - start; } catch (e) { status = "failed"; error = e instanceof Error ? e.message : String(e); if (step.compensation) { try { await callToolHandler({ params: { name: step.compensation.tool, arguments: step.compensation.args } }, `workflow_tool_compensation_${index}`); } catch (compensationError) { console.error(`Compensation failed for step ${index}:`, compensationError); } } } return { index: index, tool: step.tool, status: status, startTime: startTime, duration: duration, result: result, error: error }; }
- src/tools/workflow_tool.ts:205-208 (helper)Destroy function for cleanup of workflow_tool resources.export async function destroy() { // Release resources, stop timers, disconnect, etc. console.log("Destroy workflow_tool"); }
- src/handler/ToolHandler.ts:91-139 (registration)Dynamic registration of all tools including workflow_tool by scanning src/tools directory, importing modules, and adding to global handlers map.export async function loadTools(reload: boolean = false): Promise<{ [key: string]: (request: ToolRequest) => Promise<ToolResponse> }> { // 如果是初始加载且已加载,则直接返回 if (!reload && isLoaded) return; // 如果是重新加载,则重置状态 if (reload) { for (const tool of tools) { await tool?.destroy?.(); delete handlers[tool.name]; } tools.length = 0; isLoaded = false; } // 获取所有工具文件 const toolFiles = fs.readdirSync(toolsDir).filter(file => file.endsWith('.js') || file.endsWith('.ts')); // 加载每个工具 for (const file of toolFiles) { const toolPath = path.join(toolsDir, file); try { // 如果是重新加载,清除模块缓存 if (reload) clearModuleCache(toolPath); // 导入模块,重新加载时添加时间戳防止缓存 const importPath = 'file://' + toolPath + (reload ? `?update=${Date.now()}` : ''); const { default: tool, schema, destroy } = await import(importPath); const toolName = path.parse(toolPath).name; // 注册工具 tools.push({ name: toolName, description: tool.description, inputSchema: schema, destroy: destroy }); // 注册处理函数 handlers[toolName] = async (request: ToolRequest) => { return await tool(request); }; } catch (error) { console.error(`Failed to ${reload ? 'reload' : 'load'} tool ${file}:`, error); } } isLoaded = true; if (reload) console.log(`Successfully reloaded ${tools.length} tools`); return handlers; }