Skip to main content
Glama
trigger-helper.ts12.7 kB
import { inspect } from 'node:util' import { PiecePropertyMap, StaticPropsValue, TriggerStrategy } from '@activepieces/pieces-framework' import { assertEqual, AUTHENTICATION_PROPERTY_NAME, EngineGenericError, EventPayload, ExecuteTriggerOperation, ExecuteTriggerResponse, FlowTrigger, InvalidCronExpressionError, isNil, PieceTrigger, PropertySettings, ScheduleOptions, TriggerHookType, TriggerSourceScheduleType } from '@activepieces/shared' import { isValidCron } from 'cron-validator' import { EngineConstants } from '../handler/context/engine-constants' import { FlowExecutorContext } from '../handler/context/flow-execution-context' import { createFlowsContext } from '../services/flows.service' import { createFilesService } from '../services/step-files.service' import { createContextStore } from '../services/storage.service' import { utils } from '../utils' import { propsProcessor } from '../variables/props-processor' import { createPropsResolver } from '../variables/props-resolver' import { pieceLoader } from './piece-loader' type Listener = { events: string[] identifierValue: string identifierKey: string } export const triggerHelper = { async executeOnStart(trigger: FlowTrigger, constants: EngineConstants, payload: unknown) { const { pieceName, pieceVersion, triggerName, input, propertySettings } = (trigger as PieceTrigger).settings if (isNil(triggerName)) { throw new EngineGenericError('TriggerNameNotSetError', 'Trigger name is not set') } const { pieceTrigger, processedInput, piece } = await prepareTriggerExecution({ pieceName, pieceVersion, triggerName, input, projectId: constants.projectId, apiUrl: constants.internalApiUrl, engineToken: constants.engineToken, devPieces: constants.devPieces, propertySettings, }) const isOldVersionOrNotSupported = isNil(pieceTrigger.onStart) if (isOldVersionOrNotSupported) { return } const context = { store: createContextStore({ apiUrl: constants.internalApiUrl, prefix: '', flowId: constants.flowId, engineToken: constants.engineToken, }), auth: processedInput[AUTHENTICATION_PROPERTY_NAME], propsValue: processedInput, payload, run: { id: constants.flowRunId, }, step: { name: triggerName, }, project: { id: constants.projectId, externalId: constants.externalProjectId, }, connections: utils.createConnectionManager({ apiUrl: constants.internalApiUrl, projectId: constants.projectId, engineToken: constants.engineToken, target: 'triggers', contextVersion: piece.getContextInfo?.().version, }), } await pieceTrigger.onStart(context) }, async executeTrigger({ params, constants }: ExecuteTriggerParams): Promise<ExecuteTriggerResponse<TriggerHookType>> { const { pieceName, pieceVersion, triggerName, input, propertySettings } = (params.flowVersion.trigger as PieceTrigger).settings if (isNil(triggerName)) { throw new EngineGenericError('TriggerNameNotSetError', 'Trigger name is not set') } const { piece, pieceTrigger, processedInput } = await prepareTriggerExecution({ pieceName, pieceVersion, triggerName, input, projectId: params.projectId, apiUrl: constants.internalApiUrl, engineToken: params.engineToken, devPieces: constants.devPieces, propertySettings, }) const appListeners: Listener[] = [] const prefix = params.test ? 'test' : '' let scheduleOptions: ScheduleOptions | undefined = undefined const context = { store: createContextStore({ apiUrl: constants.internalApiUrl, prefix, flowId: params.flowVersion.flowId, engineToken: params.engineToken, }), step: { name: triggerName, }, app: { createListeners({ events, identifierKey, identifierValue }: Listener): void { appListeners.push({ events, identifierValue, identifierKey }) }, }, setSchedule(request: ScheduleOptions) { if (!isValidCron(request.cronExpression)) { throw new InvalidCronExpressionError(request.cronExpression) } scheduleOptions = { type: TriggerSourceScheduleType.CRON_EXPRESSION, cronExpression: request.cronExpression, timezone: request.timezone ?? 'UTC', } }, flows: createFlowsContext({ engineToken: params.engineToken, internalApiUrl: constants.internalApiUrl, flowId: params.flowVersion.flowId, flowVersionId: params.flowVersion.id, }), webhookUrl: params.webhookUrl, auth: processedInput[AUTHENTICATION_PROPERTY_NAME], propsValue: processedInput, payload: params.triggerPayload ?? {}, project: { id: params.projectId, externalId: constants.externalProjectId, }, server: { token: params.engineToken, apiUrl: constants.internalApiUrl, publicUrl: params.publicApiUrl, }, connections: utils.createConnectionManager({ apiUrl: constants.internalApiUrl, projectId: constants.projectId, engineToken: constants.engineToken, target: 'triggers', contextVersion: piece.getContextInfo?.().version, }), } switch (params.hookType) { case TriggerHookType.ON_DISABLE: { await pieceTrigger.onDisable(context) return {} } case TriggerHookType.ON_ENABLE: { await pieceTrigger.onEnable(context) return { listeners: appListeners, scheduleOptions: pieceTrigger.type === TriggerStrategy.POLLING ? scheduleOptions : undefined, } } case TriggerHookType.RENEW: { assertEqual(pieceTrigger.type, TriggerStrategy.WEBHOOK, 'triggerType', 'WEBHOOK') await pieceTrigger.onRenew(context) return { success: true, } } case TriggerHookType.HANDSHAKE: { const { data: handshakeResponse, error: handshakeResponseError } = await utils.tryCatchAndThrowOnEngineError(() => pieceTrigger.onHandshake(context)) if (handshakeResponseError) { console.error(handshakeResponseError) return { success: false, message: `Error while testing trigger: ${inspect(handshakeResponseError)}`, } } return { success: true, response: handshakeResponse, } } case TriggerHookType.TEST: { const { data: testResponse, error: testResponseError } = await utils.tryCatchAndThrowOnEngineError(() => pieceTrigger.test({ ...context, files: createFilesService({ apiUrl: constants.internalApiUrl, engineToken: params.engineToken!, stepName: triggerName, flowId: params.flowVersion.flowId, }), })) if (testResponseError) { console.error(testResponseError) return { success: false, message: `Error while testing trigger: ${inspect(testResponseError)}`, output: [], } } return { success: true, output: testResponse, } } case TriggerHookType.RUN: { if (pieceTrigger.type === TriggerStrategy.APP_WEBHOOK) { const { data: verified, error: verifiedError } = await utils.tryCatchAndThrowOnEngineError(async () => { if (!params.appWebhookUrl) { throw new EngineGenericError('AppWebhookUrlNotAvailableError', `App webhook url is not available for piece name ${pieceName}`) } if (!params.webhookSecret) { throw new EngineGenericError('WebhookSecretNotAvailableError', `Webhook secret is not available for piece name ${pieceName}`) } return piece.events?.verify({ appWebhookUrl: params.appWebhookUrl, payload: params.triggerPayload as EventPayload, webhookSecret: params.webhookSecret, }) }) if (verifiedError) { return { success: false, message: `Error while verifying webhook: ${inspect(verifiedError)}`, output: [], } } if (isNil(verified)) { return { success: false, message: 'Webhook is not verified', output: [], } } } const { data: triggerRunResult, error: triggerRunError } = await utils.tryCatchAndThrowOnEngineError(async () => { const items = await pieceTrigger.run({ ...context, files: createFilesService({ apiUrl: constants.internalApiUrl, engineToken: params.engineToken!, flowId: params.flowVersion.flowId, stepName: triggerName, }), }) return { success: true, output: items, } }) if (triggerRunError) { return { success: false, message: triggerRunError.message, output: [], } } return triggerRunResult } } }, } type ExecuteTriggerParams = { params: ExecuteTriggerOperation<TriggerHookType> constants: EngineConstants } async function prepareTriggerExecution({ pieceName, pieceVersion, triggerName, input, propertySettings, projectId, apiUrl, engineToken, devPieces }: PrepareTriggerExecutionParams) { const { piece, pieceTrigger } = await pieceLoader.getPieceAndTriggerOrThrow({ pieceName, pieceVersion, triggerName, devPieces, }) const { resolvedInput } = await createPropsResolver({ apiUrl, projectId, engineToken, contextVersion: piece.getContextInfo?.().version, }).resolve<StaticPropsValue<PiecePropertyMap>>({ unresolvedInput: input, executionState: FlowExecutorContext.empty(), }) const { processedInput, errors } = await propsProcessor.applyProcessorsAndValidators(resolvedInput, pieceTrigger.props, piece.auth, pieceTrigger.requireAuth, propertySettings) if (Object.keys(errors).length > 0) { throw new Error(JSON.stringify(errors, null, 2)) } return { piece, pieceTrigger, processedInput } } type PrepareTriggerExecutionParams = { pieceName: string pieceVersion: string triggerName: string input: unknown propertySettings: Record<string, PropertySettings> projectId: string apiUrl: string engineToken: string devPieces: string[] }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server