Skip to main content
Glama
flow.operation.ts5.3 kB
import { BeginExecuteFlowOperation, EngineResponse, EngineResponseStatus, ExecuteFlowOperation, ExecuteTriggerResponse, ExecutionType, FlowActionType, flowStructureUtil, GenericStepOutput, isNil, LoopStepOutput, StepOutput, StepOutputStatus, TriggerHookType, TriggerPayload, } from '@activepieces/shared' import { EngineConstants } from '../handler/context/engine-constants' import { FlowExecutorContext } from '../handler/context/flow-execution-context' import { testExecutionContext } from '../handler/context/test-execution-context' import { flowExecutor } from '../handler/flow-executor' import { triggerHelper } from '../helper/trigger-helper' import { progressService } from '../services/progress.service' export const flowOperation = { execute: async (operation: ExecuteFlowOperation): Promise<EngineResponse<undefined>> => { const input = operation as ExecuteFlowOperation const constants = EngineConstants.fromExecuteFlowInput(input) const output: FlowExecutorContext = (await executieSingleStepOrFlowOperation(input)).finishExecution() await progressService.sendUpdate({ engineConstants: constants, flowExecutorContext: output, updateImmediate: true, }) return { status: EngineResponseStatus.OK, response: undefined, delayInSeconds: output.getDelayedInSeconds(), } }, } const executieSingleStepOrFlowOperation = async (input: ExecuteFlowOperation): Promise<FlowExecutorContext> => { const constants = EngineConstants.fromExecuteFlowInput(input) const testSingleStepMode = !isNil(constants.stepNameToTest) if (testSingleStepMode) { const testContext = await testExecutionContext.stateFromFlowVersion({ apiUrl: input.internalApiUrl, flowVersion: input.flowVersion, excludedStepName: input.stepNameToTest!, projectId: input.projectId, engineToken: input.engineToken, sampleData: input.sampleData, }) const step = flowStructureUtil.getActionOrThrow(input.stepNameToTest!, input.flowVersion.trigger) return flowExecutor.execute({ action: step, executionState: await getFlowExecutionState(input, testContext), constants: EngineConstants.fromExecuteFlowInput(input), }) } return flowExecutor.executeFromTrigger({ executionState: await getFlowExecutionState(input, FlowExecutorContext.empty()), constants, input, }) } async function getFlowExecutionState(input: ExecuteFlowOperation, flowContext: FlowExecutorContext): Promise<FlowExecutorContext> { switch (input.executionType) { case ExecutionType.BEGIN: { const newPayload = await runOrReturnPayload(input) flowContext = flowContext.upsertStep(input.flowVersion.trigger.name, GenericStepOutput.create({ type: input.flowVersion.trigger.type, status: StepOutputStatus.SUCCEEDED, input: {}, }).setOutput(newPayload)) break } case ExecutionType.RESUME: { break } } for (const [step, output] of Object.entries(input.executionState.steps)) { if ([StepOutputStatus.SUCCEEDED, StepOutputStatus.PAUSED].includes(output.status)) { const newOutput = await insertSuccessStepsOrPausedRecursively(output) if (!isNil(newOutput)) { flowContext = flowContext.upsertStep(step, newOutput) } } } return flowContext } async function runOrReturnPayload(input: BeginExecuteFlowOperation): Promise<TriggerPayload> { if (!input.executeTrigger) { return input.triggerPayload as TriggerPayload } const newPayload = await triggerHelper.executeTrigger({ params: { ...input, hookType: TriggerHookType.RUN, test: false, webhookUrl: '', triggerPayload: input.triggerPayload as TriggerPayload, }, constants: EngineConstants.fromExecuteFlowInput(input), }) as ExecuteTriggerResponse<TriggerHookType.RUN> return newPayload.output[0] as TriggerPayload } async function insertSuccessStepsOrPausedRecursively(stepOutput: StepOutput): Promise<StepOutput | null> { if (![StepOutputStatus.SUCCEEDED, StepOutputStatus.PAUSED].includes(stepOutput.status)) { return null } if (stepOutput.type === FlowActionType.LOOP_ON_ITEMS) { const loopOutput = new LoopStepOutput(stepOutput) const iterations = loopOutput.output?.iterations ?? [] const newIterations: Record<string, StepOutput>[] = [] for (const iteration of iterations) { const newSteps: Record<string, StepOutput> = {} for (const [step, output] of Object.entries(iteration)) { const newOutput = await insertSuccessStepsOrPausedRecursively(output) if (!isNil(newOutput)) { newSteps[step] = newOutput } } newIterations.push(newSteps) } return loopOutput.setIterations(newIterations) } return stepOutput }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server