Skip to main content
Glama
flow-execution-context.ts7.24 kB
import { assertEqual, FailedStep, FlowActionType, FlowRunStatus, GenericStepOutput, isNil, LoopStepOutput, LoopStepResult, PauseMetadata, PauseType, RespondResponse, StepOutput, StepOutputStatus } from '@activepieces/shared' import dayjs from 'dayjs' import { nanoid } from 'nanoid' import { EngineGenericError } from '../../helper/execution-errors' import { loggingUtils } from '../../helper/logging-utils' import { StepExecutionPath } from './step-execution-path' export type FlowVerdict = { status: FlowRunStatus.PAUSED pauseMetadata: PauseMetadata } | { status: FlowRunStatus.SUCCEEDED stopResponse: RespondResponse | undefined } | { status: FlowRunStatus.FAILED failedStep: FailedStep } | { status: FlowRunStatus.RUNNING } export class FlowExecutorContext { tags: readonly string[] steps: Readonly<Record<string, StepOutput>> pauseRequestId: string verdict: FlowVerdict currentPath: StepExecutionPath stepNameToTest?: boolean /** * Execution time in milliseconds */ duration: number constructor(copyFrom?: FlowExecutorContext) { this.tags = copyFrom?.tags ?? [] this.steps = copyFrom?.steps ?? {} this.pauseRequestId = copyFrom?.pauseRequestId ?? nanoid() this.duration = copyFrom?.duration ?? -1 this.verdict = copyFrom?.verdict ?? { status: FlowRunStatus.RUNNING } this.currentPath = copyFrom?.currentPath ?? StepExecutionPath.empty() this.stepNameToTest = copyFrom?.stepNameToTest ?? false } static empty(): FlowExecutorContext { return new FlowExecutorContext() } public setPauseRequestId(pauseRequestId: string): FlowExecutorContext { return new FlowExecutorContext({ ...this, pauseRequestId, }) } public getDelayedInSeconds(): number | undefined { if (this.verdict.status === FlowRunStatus.PAUSED && this.verdict.pauseMetadata.type === PauseType.DELAY) { return dayjs(this.verdict.pauseMetadata.resumeDateTime).diff(Date.now(), 'seconds') } return undefined } public finishExecution(): FlowExecutorContext { if (this.verdict.status === FlowRunStatus.RUNNING) { return new FlowExecutorContext({ ...this, verdict: { status: FlowRunStatus.SUCCEEDED }, }) } return this } public trimmedSteps(): Promise<Record<string, StepOutput>> { return loggingUtils.trimExecution(this.steps) } public getLoopStepOutput({ stepName }: { stepName: string }): LoopStepOutput | undefined { const stateAtPath = getStateAtPath({ currentPath: this.currentPath, steps: this.steps }) const stepOutput = stateAtPath[stepName] if (isNil(stepOutput)) { return undefined } assertEqual(stepOutput.type, FlowActionType.LOOP_ON_ITEMS, 'stepOutput.type', 'LOOP_ON_ITEMS') // The new LoopStepOutput is needed as casting directly to LoopClassOutput will just cast the data but the class methods will not be available return new LoopStepOutput(stepOutput as GenericStepOutput<FlowActionType.LOOP_ON_ITEMS, LoopStepResult>) } public isCompleted({ stepName }: { stepName: string }): boolean { const stateAtPath = getStateAtPath({ currentPath: this.currentPath, steps: this.steps }) const stepOutput = stateAtPath[stepName] if (isNil(stepOutput)) { return false } return stepOutput.status !== StepOutputStatus.PAUSED } public isPaused({ stepName }: { stepName: string }): boolean { const stateAtPath = getStateAtPath({ currentPath: this.currentPath, steps: this.steps }) const stepOutput = stateAtPath[stepName] if (isNil(stepOutput)) { return false } return stepOutput.status === StepOutputStatus.PAUSED } public setDuration(duration: number): FlowExecutorContext { return new FlowExecutorContext({ ...this, duration, }) } public addTags(tags: string[]): FlowExecutorContext { return new FlowExecutorContext({ ...this, tags: [...this.tags, ...tags].filter((value, index, self) => { return self.indexOf(value) === index }), }) } public upsertStep(stepName: string, stepOutput: StepOutput): FlowExecutorContext { const steps = { ...this.steps, } const targetMap = getStateAtPath({ currentPath: this.currentPath, steps }) targetMap[stepName] = stepOutput return new FlowExecutorContext({ ...this, steps, }) } public getStepOutput(stepName: string): StepOutput | undefined { const stateAtPath = getStateAtPath({ currentPath: this.currentPath, steps: this.steps }) return stateAtPath[stepName] } public setCurrentPath(currentStatePath: StepExecutionPath): FlowExecutorContext { return new FlowExecutorContext({ ...this, currentPath: currentStatePath, }) } public setVerdict(verdict: FlowVerdict): FlowExecutorContext { return new FlowExecutorContext({ ...this, verdict, }) } public setRetryable(retryable: boolean): FlowExecutorContext { return new FlowExecutorContext({ ...this, retryable, }) } public currentState(): Record<string, unknown> { let flattenedSteps: Record<string, unknown> = extractOutput(this.steps) let targetMap = this.steps this.currentPath.path.forEach(([stepName, iteration]) => { const stepOutput = targetMap[stepName] if (!stepOutput.output || stepOutput.type !== FlowActionType.LOOP_ON_ITEMS) { throw new EngineGenericError('NotInstanceOfLoopOnItemsStepOutputError', '[ExecutionState#getTargetMap] Not instance of Loop On Items step output') } targetMap = stepOutput.output.iterations[iteration] flattenedSteps = { ...flattenedSteps, ...extractOutput(targetMap), } }) return flattenedSteps } } function extractOutput(steps: Record<string, StepOutput>): Record<string, unknown> { return Object.entries(steps).reduce((acc: Record<string, unknown>, [stepName, step]) => { acc[stepName] = step.output return acc }, {} as Record<string, unknown>) } function getStateAtPath({ currentPath, steps }: { currentPath: StepExecutionPath, steps: Record<string, StepOutput> }): Record<string, StepOutput> { let targetMap = steps currentPath.path.forEach(([stepName, iteration]) => { const stepOutput = targetMap[stepName] if (!stepOutput.output || stepOutput.type !== FlowActionType.LOOP_ON_ITEMS) { throw new EngineGenericError('NotInstanceOfLoopOnItemsStepOutputError', `[ExecutionState#getTargetMap] Not instance of Loop On Items step output: ${stepOutput.type}`) } targetMap = stepOutput.output.iterations[iteration] }) return targetMap }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server