Skip to main content
Glama
executor.ts13.6 kB
/** * Graph execution engine */ import type { McpGraphConfig, ToolDefinition, ServerConfig, NodeDefinition } from "../types/config.js"; import type { ExecutionOptions, ExecutionResult, ExecutionHooks, ExecutionTelemetry, ExecutionController as IExecutionController, } from "../types/execution.js"; import { Graph } from "../graph/graph.js"; import { ExecutionContext } from "./context.js"; import { ExecutionController } from "./controller.js"; import { executeEntryNode } from "./nodes/entry-executor.js"; import { executeExitNode } from "./nodes/exit-executor.js"; import { executeTransformNode } from "./nodes/transform-executor.js"; import { executeMcpToolNode } from "./nodes/mcp-tool-executor.js"; import { executeSwitchNode } from "./nodes/switch-executor.js"; import type { McpClientManager } from "../mcp/client-manager.js"; import { logger } from "../logger.js"; export class GraphExecutor { private config: McpGraphConfig; private graph: Graph; private clientManager: McpClientManager; private controller: ExecutionController | null = null; constructor(config: McpGraphConfig, clientManager: McpClientManager) { this.config = config; this.graph = new Graph(config.nodes); this.clientManager = clientManager; } getController(): IExecutionController | null { return this.controller; } getGraph(): Graph { return this.graph; } getConfig(): McpGraphConfig { return this.config; } private getServerConfig(serverName: string): ServerConfig { if (!this.config.servers || !this.config.servers[serverName]) { throw new Error(`Server configuration not found: ${serverName}`); } return this.config.servers[serverName]; } async executeTool( toolName: string, toolInput: Record<string, unknown>, options?: ExecutionOptions ): Promise<ExecutionResult> { const tool = this.config.tools.find((t) => t.name === toolName); if (!tool) { throw new Error(`Tool not found: ${toolName}`); } logger.info(`Executing tool: ${toolName}`); // Initialize execution options const hooks = options?.hooks; const breakpoints = options?.breakpoints || []; const enableTelemetry = options?.enableTelemetry ?? false; const startPaused = options?.startPaused ?? false; // Initialize controller if hooks, breakpoints, or startPaused are provided if (hooks || breakpoints.length > 0 || startPaused) { this.controller = new ExecutionController(); this.controller.setBreakpoints(breakpoints); this.controller.setStatus("not_started"); // If startPaused is true, set pauseRequested so execution pauses at entry node if (startPaused && this.controller) { // Set pauseRequested directly - it will be checked after status is set to "running" // and the first node is processed (this.controller as any).pauseRequested = true; } } // Find entry node for this tool const entryNode = this.config.nodes.find( (n) => n.type === "entry" && (n as { tool: string }).tool === toolName ); if (!entryNode) { throw new Error(`Entry node not found for tool: ${toolName}`); } // Find exit node for this tool const exitNode = this.config.nodes.find( (n) => n.type === "exit" && (n as { tool: string }).tool === toolName ); if (!exitNode) { throw new Error(`Exit node not found for tool: ${toolName}`); } const context = new ExecutionContext(toolInput); if (this.controller) { this.controller.setContext(context); } const startTime = Date.now(); let currentNodeId = entryNode.id; try { if (this.controller) { this.controller.setStatus("running"); } // Execute nodes until we reach the exit node while (true) { // Check for stop request before processing next node if (this.controller && this.controller.shouldStop()) { if (this.controller) { this.controller.setStatus("stopped"); this.controller.setCurrentNode(null); } throw new Error("Execution was stopped"); } const node = this.graph.getNode(currentNodeId); if (!node) { throw new Error(`Node not found: ${currentNodeId}`); } // Update controller with current node if (this.controller) { this.controller.setCurrentNode(currentNodeId); } // Check for breakpoint or pause request if (this.controller && this.controller.shouldPause(currentNodeId)) { // If breakpoint hit, set pauseRequested this.controller.checkAndSetBreakpointPause(currentNodeId); // Set status to paused this.controller.setStatus("paused"); // Wait for resume - this creates the promise and blocks until resume() is called const pausePromise = this.controller.waitIfPaused(); // Call onPause hook to notify that we've paused (status is "paused", waiting for resume) if (hooks?.onPause) { await hooks.onPause(currentNodeId, context); } // Wait for resume to be called await pausePromise; // Call onResume hook after resuming if (hooks?.onResume) { await hooks.onResume(); } // Check again for stop after resuming from pause if (this.controller && this.controller.shouldStop()) { if (this.controller) { this.controller.setStatus("stopped"); this.controller.setCurrentNode(null); } throw new Error("Execution was stopped"); } } // Call onNodeStart hook if (hooks?.onNodeStart) { const shouldContinue = await hooks.onNodeStart(currentNodeId, node, context); // Check for stop after hook (hook may have called stop()) if (this.controller && this.controller.shouldStop()) { if (this.controller) { this.controller.setStatus("stopped"); this.controller.setCurrentNode(null); } throw new Error("Execution was stopped"); } if (shouldContinue === false) { // Hook requested pause if (this.controller) { // Call onPause hook before waiting if (hooks?.onPause) { await hooks.onPause(currentNodeId, context); } await this.controller.waitIfPaused(); // Call onResume hook after resuming if (hooks?.onResume) { await hooks.onResume(); } // Check for stop after resuming from pause if (this.controller && this.controller.shouldStop()) { if (this.controller) { this.controller.setStatus("stopped"); this.controller.setCurrentNode(null); } throw new Error("Execution was stopped"); } } } } logger.debug(`Executing node: ${currentNodeId} (type: ${node.type})`); const nodeStartTime = Date.now(); let result: { output: unknown; nextNode?: string }; let nodeError: Error | undefined; try { switch (node.type) { case "entry": result = executeEntryNode(node, toolInput, context, nodeStartTime); break; case "exit": result = executeExitNode(node, context, nodeStartTime); // Call onNodeComplete hook for exit node if (hooks?.onNodeComplete) { await hooks.onNodeComplete( currentNodeId, node, context.getData(), result.output, Date.now() - nodeStartTime ); } if (this.controller) { this.controller.setStatus("finished"); this.controller.setCurrentNode(null); } const endTime = Date.now(); const telemetry = enableTelemetry ? this.buildTelemetry(context, startTime, endTime) : undefined; return { result: result.output, executionHistory: context.getHistory(), telemetry, }; case "transform": result = await executeTransformNode(node, context, nodeStartTime); break; case "mcp": // Check for stop before starting MCP call (which may take time) if (this.controller && this.controller.shouldStop()) { if (this.controller) { this.controller.setStatus("stopped"); this.controller.setCurrentNode(null); } throw new Error("Execution was stopped"); } const serverConfig = this.getServerConfig(node.server); result = await executeMcpToolNode( node, context, this.clientManager, serverConfig, nodeStartTime ); // Check for stop after MCP call completes if (this.controller && this.controller.shouldStop()) { if (this.controller) { this.controller.setStatus("stopped"); this.controller.setCurrentNode(null); } throw new Error("Execution was stopped"); } break; case "switch": result = await executeSwitchNode(node, context, nodeStartTime); break; default: throw new Error(`Unknown node type: ${(node as { type: string }).type}`); } // Call onNodeComplete hook if (hooks?.onNodeComplete) { await hooks.onNodeComplete( currentNodeId, node, context.getData(), result.output, Date.now() - nodeStartTime ); } // Mark step complete if stepping if (this.controller) { this.controller.markStepComplete(); } } catch (error) { nodeError = error instanceof Error ? error : new Error(String(error)); const nodeEndTime = Date.now(); // Record error in history (if not already recorded by node executor) const history = context.getHistory(); const lastRecord = history[history.length - 1]; if (!lastRecord || lastRecord.nodeId !== currentNodeId) { context.addHistory( currentNodeId, node.type, context.getData(), null, nodeStartTime, nodeEndTime, nodeError ); } // Call onNodeError hook if (hooks?.onNodeError) { await hooks.onNodeError(currentNodeId, node, nodeError, context); } if (this.controller) { this.controller.setStatus("error"); this.controller.setCurrentNode(null); } throw nodeError; } if (result.nextNode) { currentNodeId = result.nextNode; // If next node is the exit node, continue to process it in next iteration if (currentNodeId === exitNode.id) { continue; } } else { throw new Error(`Node ${currentNodeId} has no next node and is not the exit node`); } } // Should not reach here - exit node should have been processed and returned throw new Error(`Exit node was not reached`); } catch (error) { if (this.controller) { // Don't override "stopped" status with "error" if execution was stopped const state = this.controller.getState(); if (this.controller.shouldStop() && state.status === "stopped") { // Status already set to stopped, just clean up this.controller.setCurrentNode(null); } else if (!(error instanceof Error && error.message === "Execution was stopped")) { // Only set error status if it wasn't a stop request this.controller.setStatus("error"); this.controller.setCurrentNode(null); } } throw error; } finally { // Clean up controller after execution if (this.controller) { this.controller = null; } } } private buildTelemetry( context: ExecutionContext, startTime: number, endTime: number ): ExecutionTelemetry { const history = context.getHistory(); const nodeDurations = new Map<string, number>(); const nodeCounts = new Map<string, number>(); let errorCount = 0; for (const record of history) { // Aggregate durations by node type const currentDuration = nodeDurations.get(record.nodeType) || 0; nodeDurations.set(record.nodeType, currentDuration + record.duration); // Count nodes by type const currentCount = nodeCounts.get(record.nodeType) || 0; nodeCounts.set(record.nodeType, currentCount + 1); // Count errors if (record.error) { errorCount++; } } return { totalDuration: endTime - startTime, nodeDurations, nodeCounts, errorCount, }; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TeamSparkAI/mcpGraph'

If you have feedback or need assistance with the MCP directory API, please join our Discord server