Skip to main content
Glama

mcp-google-sheets

trigger-helper.ts11.7 kB
import { inspect } from 'node:util' import { PiecePropertyMap, StaticPropsValue, TriggerStrategy } from '@activepieces/pieces-framework' import { assertEqual, assertNotNullOrUndefined, AUTHENTICATION_PROPERTY_NAME, EventPayload, ExecuteTriggerOperation, ExecuteTriggerResponse, FlowTrigger, isNil, PieceTrigger, PropertySettings, ScheduleOptions, TriggerHookType, TriggerSourceScheduleType } from '@activepieces/shared' import { isValidCron } from 'cron-validator' import { EngineConstants } from '../handler/context/engine-constants' import { FlowExecutorContext } from '../handler/context/flow-execution-context' import { createFlowsContext } from '../services/flows.service' import { createFilesService } from '../services/step-files.service' import { createContextStore } from '../services/storage.service' import { utils } from '../utils' import { propsProcessor } from '../variables/props-processor' import { createPropsResolver } from '../variables/props-resolver' import { pieceLoader } from './piece-loader' type Listener = { events: string[] identifierValue: string identifierKey: string } export const triggerHelper = { async executeOnStart(trigger: FlowTrigger, constants: EngineConstants, payload: unknown) { const { pieceName, pieceVersion, triggerName, input, propertySettings } = (trigger as PieceTrigger).settings assertNotNullOrUndefined(triggerName, 'triggerName is required') const { pieceTrigger, processedInput } = await prepareTriggerExecution({ pieceName, pieceVersion, triggerName, input, projectId: constants.projectId, apiUrl: constants.internalApiUrl, engineToken: constants.engineToken, pieceSource: constants.piecesSource, propertySettings, }) const isOldVersionOrNotSupported = isNil(pieceTrigger.onStart) if (isOldVersionOrNotSupported) { return } const context = { store: createContextStore({ apiUrl: constants.internalApiUrl, prefix: '', flowId: constants.flowId, engineToken: constants.engineToken, }), auth: processedInput[AUTHENTICATION_PROPERTY_NAME], propsValue: processedInput, payload, run: { id: constants.flowRunId, }, project: { id: constants.projectId, externalId: constants.externalProjectId, }, connections: utils.createConnectionManager({ apiUrl: constants.internalApiUrl, projectId: constants.projectId, engineToken: constants.engineToken, target: 'triggers', }), } await pieceTrigger.onStart(context) }, async executeTrigger({ params, constants }: ExecuteTriggerParams): Promise<ExecuteTriggerResponse<TriggerHookType>> { const { pieceName, pieceVersion, triggerName, input, propertySettings } = (params.flowVersion.trigger as PieceTrigger).settings assertNotNullOrUndefined(triggerName, 'triggerName is required') const { piece, pieceTrigger, processedInput } = await prepareTriggerExecution({ pieceName, pieceVersion, triggerName, input, projectId: params.projectId, apiUrl: constants.internalApiUrl, engineToken: params.engineToken, pieceSource: constants.piecesSource, propertySettings, }) const appListeners: Listener[] = [] const prefix = params.test ? 'test' : '' let scheduleOptions: ScheduleOptions | undefined = undefined const context = { store: createContextStore({ apiUrl: constants.internalApiUrl, prefix, flowId: params.flowVersion.flowId, engineToken: params.engineToken, }), app: { createListeners({ events, identifierKey, identifierValue }: Listener): void { appListeners.push({ events, identifierValue, identifierKey }) }, }, setSchedule(request: ScheduleOptions) { if (!isValidCron(request.cronExpression)) { throw new Error(`Invalid cron expression: ${request.cronExpression}`) } scheduleOptions = { type: TriggerSourceScheduleType.CRON_EXPRESSION, cronExpression: request.cronExpression, timezone: request.timezone ?? 'UTC', } }, flows: createFlowsContext({ engineToken: params.engineToken, internalApiUrl: constants.internalApiUrl, flowId: params.flowVersion.flowId, flowVersionId: params.flowVersion.id, }), webhookUrl: params.webhookUrl, auth: processedInput[AUTHENTICATION_PROPERTY_NAME], propsValue: processedInput, payload: params.triggerPayload ?? {}, project: { id: params.projectId, externalId: constants.externalProjectId, }, server: { token: params.engineToken, apiUrl: constants.internalApiUrl, publicUrl: params.publicApiUrl, }, connections: utils.createConnectionManager({ apiUrl: constants.internalApiUrl, projectId: constants.projectId, engineToken: constants.engineToken, target: 'triggers', }), } switch (params.hookType) { case TriggerHookType.ON_DISABLE: await pieceTrigger.onDisable(context) return {} case TriggerHookType.ON_ENABLE: await pieceTrigger.onEnable(context) return { listeners: appListeners, scheduleOptions: pieceTrigger.type === TriggerStrategy.POLLING ? scheduleOptions : undefined, } case TriggerHookType.RENEW: assertEqual(pieceTrigger.type, TriggerStrategy.WEBHOOK, 'triggerType', 'WEBHOOK') await pieceTrigger.onRenew(context) return { success: true, } case TriggerHookType.HANDSHAKE: { try { const response = await pieceTrigger.onHandshake(context) return { success: true, response, } } catch (e) { console.error(e) return { success: false, message: `Error while testing trigger: ${inspect(e)}`, } } } case TriggerHookType.TEST: try { return { success: true, output: await pieceTrigger.test({ ...context, files: createFilesService({ apiUrl: constants.internalApiUrl, engineToken: params.engineToken!, stepName: triggerName, flowId: params.flowVersion.flowId, }), }), } } catch (e) { return { success: false, message: `Error while testing trigger: ${inspect(e)}`, output: [], } } case TriggerHookType.RUN: { if (pieceTrigger.type === TriggerStrategy.APP_WEBHOOK) { if (!params.appWebhookUrl) { throw new Error(`App webhook url is not available for piece name ${pieceName}`) } if (!params.webhookSecret) { throw new Error(`Webhook secret is not available for piece name ${pieceName}`) } try { const verified = piece.events?.verify({ appWebhookUrl: params.appWebhookUrl, payload: params.triggerPayload as EventPayload, webhookSecret: params.webhookSecret, }) if (verified === false) { console.info('Webhook is not verified') return { success: false, message: 'Webhook is not verified', output: [], } } } catch (e) { return { success: false, message: `Error while verifying webhook: ${inspect(e)}`, output: [], } } } try { const items = await pieceTrigger.run({ ...context, files: createFilesService({ apiUrl: constants.internalApiUrl, engineToken: params.engineToken!, flowId: params.flowVersion.flowId, stepName: triggerName, }), }) return { success: true, output: items, } } catch (e) { console.error(e) return { success: false, message: inspect(e), output: [], } } } } }, } type ExecuteTriggerParams = { params: ExecuteTriggerOperation<TriggerHookType> constants: EngineConstants } async function prepareTriggerExecution({ pieceName, pieceVersion, triggerName, input, propertySettings, projectId, apiUrl, engineToken, pieceSource }: PrepareTriggerExecutionParams) { const { piece, pieceTrigger } = await pieceLoader.getPieceAndTriggerOrThrow({ pieceName, pieceVersion, triggerName, pieceSource, }) const { resolvedInput } = await createPropsResolver({ apiUrl, projectId, engineToken, }).resolve<StaticPropsValue<PiecePropertyMap>>({ unresolvedInput: input, executionState: FlowExecutorContext.empty(), }) const { processedInput, errors } = await propsProcessor.applyProcessorsAndValidators(resolvedInput, pieceTrigger.props, piece.auth, pieceTrigger.requireAuth, propertySettings) if (Object.keys(errors).length > 0) { throw new Error(JSON.stringify(errors, null, 2)) } return { piece, pieceTrigger, processedInput } } type PrepareTriggerExecutionParams = { pieceName: string pieceVersion: string triggerName: string input: unknown propertySettings: Record<string, PropertySettings> projectId: string apiUrl: string engineToken: string pieceSource: string }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server