code-executor.ts•2.9 kB
import path from 'path'
import importFresh from '@activepieces/import-fresh-webpack'
import { assertNotNullOrUndefined, CodeAction, FlowActionType, GenericStepOutput, StepOutputStatus } from '@activepieces/shared'
import { initCodeSandbox } from '../core/code/code-sandbox'
import { CodeModule } from '../core/code/code-sandbox-common'
import { continueIfFailureHandler, handleExecutionError, runWithExponentialBackoff } from '../helper/error-handling'
import { progressService } from '../services/progress.service'
import { ActionHandler, BaseExecutor } from './base-executor'
import { ExecutionVerdict } from './context/flow-execution-context'
export const codeExecutor: BaseExecutor<CodeAction> = {
async handle({
action,
executionState,
constants,
}) {
if (executionState.isCompleted({ stepName: action.name })) {
return executionState
}
const resultExecution = await runWithExponentialBackoff(executionState, action, constants, executeAction)
return continueIfFailureHandler(resultExecution, action, constants)
},
}
const executeAction: ActionHandler<CodeAction> = async ({ action, executionState, constants }) => {
const stepStartTime = performance.now()
const { censoredInput, resolvedInput } = await constants.propsResolver.resolve<Record<string, unknown>>({
unresolvedInput: action.settings.input,
executionState,
})
const stepOutput = GenericStepOutput.create({
input: censoredInput,
type: FlowActionType.CODE,
status: StepOutputStatus.RUNNING,
})
try {
await progressService.sendUpdate({
engineConstants: constants,
flowExecutorContext: executionState.upsertStep(action.name, stepOutput),
})
assertNotNullOrUndefined(constants.runEnvironment, 'Run environment is required')
const artifactPath = path.resolve(`${constants.baseCodeDirectory}/${constants.flowVersionId}/${action.name}/index.js`)
const codeModule: CodeModule = await importFresh(artifactPath)
const codeSandbox = await initCodeSandbox()
const output = await codeSandbox.runCodeModule({
codeModule,
inputs: resolvedInput,
})
return executionState.upsertStep(action.name, stepOutput.setOutput(output).setStatus(StepOutputStatus.SUCCEEDED).setDuration(performance.now() - stepStartTime)).increaseTask()
}
catch (e) {
const handledError = handleExecutionError(e)
const failedStepOutput = stepOutput
.setStatus(StepOutputStatus.FAILED)
.setErrorMessage(handledError.message)
.setDuration(performance.now() - stepStartTime)
return executionState
.upsertStep(action.name, failedStepOutput)
.setVerdict(ExecutionVerdict.FAILED, handledError.verdictResponse)
}
}