app-event-routing.module.ts•6.14 kB
import { facebookLeads } from '@activepieces/piece-facebook-leads'
import { intercom } from '@activepieces/piece-intercom'
import { slack } from '@activepieces/piece-slack'
import { square } from '@activepieces/piece-square'
import { Piece } from '@activepieces/pieces-framework'
import {
rejectedPromiseHandler,
} from '@activepieces/server-shared'
import {
ActivepiecesError,
ALL_PRINCIPAL_TYPES,
apId,
assertNotNullOrUndefined,
ErrorCode,
FlowStatus,
isNil,
LATEST_JOB_DATA_SCHEMA_VERSION,
RunEnvironment,
WorkerJobType,
} from '@activepieces/shared'
import { FastifyPluginAsyncTypebox } from '@fastify/type-provider-typebox'
import { FastifyRequest } from 'fastify'
import { StatusCodes } from 'http-status-codes'
import { domainHelper } from '../../ee/custom-domains/domain-helper'
import { flowService } from '../../flows/flow/flow.service'
import { projectService } from '../../project/project-service'
import { WebhookFlowVersionToRun, webhookHandler } from '../../webhooks/webhook-handler'
import { jobQueue } from '../../workers/queue/job-queue'
import { JobType } from '../../workers/queue/queue-manager'
import { triggerSourceService } from '../trigger-source/trigger-source-service'
import { appEventRoutingService } from './app-event-routing.service'
const appWebhooks: Record<string, Piece> = {
slack,
square,
'facebook-leads': facebookLeads,
intercom,
}
const pieceNames: Record<string, string> = {
slack: '@activepieces/piece-slack',
square: '@activepieces/piece-square',
'facebook-leads': '@activepieces/piece-facebook-leads',
intercom: '@activepieces/piece-intercom',
}
export const appEventRoutingModule: FastifyPluginAsyncTypebox = async (app) => {
await app.register(appEventRoutingController, { prefix: '/v1/app-events' })
}
export const appEventRoutingController: FastifyPluginAsyncTypebox = async (
fastify,
) => {
fastify.all(
'/:pieceUrl',
{
config: {
rawBody: true,
allowedPrincipals: ALL_PRINCIPAL_TYPES,
},
},
async (
request: FastifyRequest<{
Body: unknown
Params: {
pieceUrl: string
}
}>,
requestReply,
) => {
const pieceUrl = request.params.pieceUrl
const payload = {
headers: request.headers as Record<string, string>,
body: request.body,
rawBody: request.rawBody,
method: request.method,
queryParams: request.query as Record<string, string>,
}
const piece = appWebhooks[pieceUrl]
if (isNil(piece)) {
throw new ActivepiecesError({
code: ErrorCode.PIECE_NOT_FOUND,
params: {
pieceName: pieceUrl,
pieceVersion: 'latest',
message: 'Pieces is not found in app event routing',
},
})
}
const appName = pieceNames[pieceUrl]
assertNotNullOrUndefined(piece.events, 'Event is possible in this piece')
const { reply, event, identifierValue } = piece.events.parseAndReply({
payload,
server: {
publicUrl: await domainHelper.getPublicUrl({ path: '' }),
},
})
if (!isNil(reply)) {
request.log.info(
{
reply,
piece: pieceUrl,
},
'[AppEventRoutingController#event] reply',
)
return requestReply
.status(StatusCodes.OK)
.headers(reply?.headers ?? {})
.send(reply?.body ?? {})
}
request.log.info(
{
event,
identifierValue,
},
'[AppEventRoutingController#event] event',
)
if (isNil(event) || isNil(identifierValue)) {
return requestReply.status(StatusCodes.BAD_REQUEST).send({})
}
const listeners = await appEventRoutingService.listListeners({
appName,
event,
identifierValue,
})
const eventsQueue = listeners.map(async (listener) => {
const requestId = apId()
const flow = await flowService(request.log).getOne({ id: listener.flowId, projectId: listener.projectId })
if (isNil(flow)) {
return
}
const flowVersionIdToRun = await webhookHandler.getFlowVersionIdToRun(WebhookFlowVersionToRun.LOCKED_FALL_BACK_TO_LATEST, flow)
const platformId = await projectService.getPlatformId(listener.projectId)
return jobQueue(request.log).add({
id: requestId,
type: JobType.ONE_TIME,
data: {
platformId,
projectId: listener.projectId,
schemaVersion: LATEST_JOB_DATA_SCHEMA_VERSION,
requestId,
payload,
flowId: listener.flowId,
jobType: WorkerJobType.EXECUTE_WEBHOOK,
runEnvironment: RunEnvironment.PRODUCTION,
saveSampleData: await triggerSourceService(request.log).existsByFlowId({
flowId: listener.flowId,
simulate: true,
}),
flowVersionIdToRun,
execute: flow.status === FlowStatus.ENABLED,
},
})
})
rejectedPromiseHandler(Promise.all(eventsQueue), request.log)
return requestReply.status(StatusCodes.OK).send({})
},
)
}