webhook-handler.ts•8.55 kB
import { AppSystemProp, rejectedPromiseHandler } from '@activepieces/server-shared'
import { assertNotNullOrUndefined, EngineHttpResponse, ExecutionType, Flow, FlowRun, FlowStatus, FlowVersionId, isNil, LATEST_JOB_DATA_SCHEMA_VERSION, PlatformId, ProgressUpdateType, ProjectId, RunEnvironment, WorkerJobType } from '@activepieces/shared'
import { context, propagation, trace } from '@opentelemetry/api'
import { FastifyBaseLogger } from 'fastify'
import { StatusCodes } from 'http-status-codes'
import { flowRunService } from '../flows/flow-run/flow-run-service'
import { flowVersionRepo } from '../flows/flow-version/flow-version.service'
import { system } from '../helper/system/system'
import { triggerSourceService } from '../trigger/trigger-source/trigger-source-service'
import { engineResponseWatcher } from '../workers/engine-response-watcher'
import { jobQueue } from '../workers/queue/job-queue'
import { JobType } from '../workers/queue/queue-manager'
const tracer = trace.getTracer('webhook-handler')
const WEBHOOK_TIMEOUT_MS = system.getNumberOrThrow(AppSystemProp.WEBHOOK_TIMEOUT_SECONDS) * 1000
export enum WebhookFlowVersionToRun {
LOCKED_FALL_BACK_TO_LATEST = 'locked_fall_back_to_latest',
LATEST = 'latest',
}
export const webhookHandler = {
async getFlowVersionIdToRun(type: WebhookFlowVersionToRun, flow: Flow): Promise<FlowVersionId> {
if (type === WebhookFlowVersionToRun.LOCKED_FALL_BACK_TO_LATEST && !isNil(flow.publishedVersionId)) {
return flow.publishedVersionId
}
const flowVersionSchema = await flowVersionRepo().createQueryBuilder()
.select('id')
.where({
flowId: flow.id,
})
.orderBy('created', 'DESC')
.getRawOne()
assertNotNullOrUndefined(flowVersionSchema, 'Flow version not found')
return flowVersionSchema.id
},
async handleAsync(params: AsyncWebhookParams): Promise<EngineHttpResponse> {
return tracer.startActiveSpan('webhook.handler.async', {
attributes: {
'webhook.flowId': params.flow.id,
'webhook.requestId': params.webhookRequestId,
'webhook.saveSampleData': params.saveSampleData,
'webhook.execute': params.execute,
'webhook.environment': params.runEnvironment,
},
}, async (span) => {
try {
const { flow, logger, webhookRequestId, payload, flowVersionIdToRun, webhookHeader, saveSampleData, execute, runEnvironment, parentRunId, failParentOnFailure, platformId } = params
span.setAttribute('webhook.platformId', platformId)
// Inject trace context for propagation across queue boundary
const traceContext: Record<string, string> = {}
propagation.inject(context.active(), traceContext)
await jobQueue(logger).add({
id: webhookRequestId,
type: JobType.ONE_TIME,
data: {
platformId,
projectId: flow.projectId,
schemaVersion: LATEST_JOB_DATA_SCHEMA_VERSION,
requestId: webhookRequestId,
payload,
jobType: WorkerJobType.EXECUTE_WEBHOOK,
flowId: flow.id,
saveSampleData,
flowVersionIdToRun,
runEnvironment,
execute,
parentRunId,
failParentOnFailure,
traceContext,
},
})
logger.info('Async webhook request completed')
span.setAttribute('webhook.queuedSuccessfully', true)
return {
status: StatusCodes.OK,
body: {},
headers: {
[webhookHeader]: webhookRequestId,
},
}
}
finally {
span.end()
}
})
},
async handleSync(params: SyncWebhookParams): Promise<EngineHttpResponse> {
return tracer.startActiveSpan('webhook.handler.sync', {
attributes: {
'webhook.flowId': params.flow.id,
'webhook.requestId': params.webhookRequestId,
'webhook.saveSampleData': params.saveSampleData,
'webhook.environment': params.runEnvironment,
},
}, async (span) => {
try {
const { payload, projectId, flow, logger, webhookRequestId, synchronousHandlerId, flowVersionIdToRun, runEnvironment, saveSampleData, flowVersionToRun, parentRunId, failParentOnFailure, platformId } = params
if (saveSampleData) {
rejectedPromiseHandler(savePayload({
flow,
logger,
webhookRequestId,
payload,
platformId,
flowVersionIdToRun,
runEnvironment,
parentRunId,
failParentOnFailure,
}), logger)
}
const disabledFlow = flow.status !== FlowStatus.ENABLED && flowVersionToRun === WebhookFlowVersionToRun.LOCKED_FALL_BACK_TO_LATEST
if (disabledFlow) {
span.setAttribute('webhook.flowDisabled', true)
return {
status: StatusCodes.NOT_FOUND,
body: {},
headers: {},
}
}
const createdRun = await flowRunService(logger).start({
platformId,
environment: runEnvironment,
flowId: flow.id,
flowVersionId: flowVersionIdToRun,
payload,
synchronousHandlerId,
projectId,
executeTrigger: true,
httpRequestId: webhookRequestId,
executionType: ExecutionType.BEGIN,
progressUpdateType: ProgressUpdateType.WEBHOOK_RESPONSE,
parentRunId,
failParentOnFailure,
})
span.setAttribute('webhook.runId', createdRun.id)
params.onRunCreated?.(createdRun)
return await engineResponseWatcher(logger).oneTimeListener<EngineHttpResponse>(webhookRequestId, true, WEBHOOK_TIMEOUT_MS, {
status: StatusCodes.NO_CONTENT,
body: {},
headers: {},
})
}
finally {
span.end()
}
})
},
}
async function savePayload(params: Omit<AsyncWebhookParams, 'saveSampleData' | 'webhookHeader' | 'execute'>): Promise<void> {
const { flow, logger, webhookRequestId, payload, flowVersionIdToRun, runEnvironment, parentRunId, failParentOnFailure, platformId } = params
await webhookHandler.handleAsync({
flow,
logger,
webhookRequestId,
payload,
flowVersionIdToRun,
saveSampleData: true,
runEnvironment,
execute: false,
webhookHeader: '',
platformId,
parentRunId,
failParentOnFailure,
})
await triggerSourceService(logger).disable({ flowId: flow.id, projectId: flow.projectId, simulate: true, ignoreError: true })
}
type AsyncWebhookParams = {
flow: Flow
logger: FastifyBaseLogger
webhookRequestId: string
platformId: PlatformId
payload: unknown
flowVersionIdToRun: FlowVersionId
webhookHeader: string
saveSampleData: boolean
runEnvironment: RunEnvironment
execute: boolean
parentRunId?: string
failParentOnFailure: boolean
}
type SyncWebhookParams = {
payload: unknown
saveSampleData: boolean
projectId: ProjectId
runEnvironment: RunEnvironment
platformId: PlatformId
flowVersionToRun: WebhookFlowVersionToRun
flow: Flow
logger: FastifyBaseLogger
webhookRequestId: string
synchronousHandlerId: string
flowVersionIdToRun: FlowVersionId
onRunCreated?: (run: FlowRun) => void
parentRunId?: string
failParentOnFailure: boolean
}