Skip to main content
Glama

mcp-google-sheets

webhook-controller.ts8.26 kB
import { ALL_PRINCIPAL_TYPES, EventPayload, FAIL_PARENT_ON_FAILURE_HEADER, FlowRun, isMultipartFile, PARENT_RUN_ID_HEADER, WebhookUrlParams, WebsocketClientEvent, } from '@activepieces/shared' import { FastifyPluginAsyncTypebox } from '@fastify/type-provider-typebox' import { trace } from '@opentelemetry/api' import { FastifyRequest } from 'fastify' import { stepFileService } from '../file/step-file/step-file.service' import { projectService } from '../project/project-service' import { triggerSourceService } from '../trigger/trigger-source/trigger-source-service' import { WebhookFlowVersionToRun } from './webhook-handler' import { webhookService } from './webhook.service' const tracer = trace.getTracer('webhook-controller') export const webhookController: FastifyPluginAsyncTypebox = async (app) => { app.all( '/:flowId/sync', WEBHOOK_PARAMS, async (request: FastifyRequest<{ Params: WebhookUrlParams }>, reply) => { return tracer.startActiveSpan('webhook.receive.sync', { attributes: { 'webhook.flowId': request.params.flowId, 'webhook.method': request.method, 'webhook.type': 'sync', }, }, async (span) => { try { const response = await webhookService.handleWebhook({ data: (projectId: string) => convertRequest(request, projectId, request.params.flowId), logger: request.log, flowId: request.params.flowId, async: false, flowVersionToRun: WebhookFlowVersionToRun.LOCKED_FALL_BACK_TO_LATEST, saveSampleData: await triggerSourceService(request.log).existsByFlowId({ flowId: request.params.flowId, simulate: true, }, ), execute: true, ...extractHeaderFromRequest(request), }) span.setAttribute('webhook.response.status', response.status) await reply .status(response.status) .headers(response.headers) .send(response.body) } finally { span.end() } }) }, ) app.all( '/:flowId', WEBHOOK_PARAMS, async (request: FastifyRequest<{ Params: WebhookUrlParams }>, reply) => { return tracer.startActiveSpan('webhook.receive.async', { attributes: { 'webhook.flowId': request.params.flowId, 'webhook.method': request.method, 'webhook.type': 'async', }, }, async (span) => { try { const response = await webhookService.handleWebhook({ data: (projectId: string) => convertRequest(request, projectId, request.params.flowId), logger: request.log, flowId: request.params.flowId, async: true, saveSampleData: await triggerSourceService(request.log).existsByFlowId({ flowId: request.params.flowId, simulate: true, }, ), flowVersionToRun: WebhookFlowVersionToRun.LOCKED_FALL_BACK_TO_LATEST, execute: true, ...extractHeaderFromRequest(request), }) span.setAttribute('webhook.response.status', response.status) await reply .status(response.status) .headers(response.headers) .send(response.body) } finally { span.end() } }) }, ) app.all('/:flowId/draft/sync', WEBHOOK_PARAMS, async (request, reply) => { const response = await webhookService.handleWebhook({ data: (projectId: string) => convertRequest(request, projectId, request.params.flowId), logger: request.log, flowId: request.params.flowId, async: false, saveSampleData: true, flowVersionToRun: WebhookFlowVersionToRun.LATEST, execute: true, onRunCreated: (run) => { app.io.to(run.projectId).emit(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, run) }, ...extractHeaderFromRequest(request), }) await reply .status(response.status) .headers(response.headers) .send(response.body) }) app.all('/:flowId/draft', WEBHOOK_PARAMS, async (request, reply) => { const response = await webhookService.handleWebhook({ data: (projectId: string) => convertRequest(request, projectId, request.params.flowId), logger: request.log, flowId: request.params.flowId, async: true, saveSampleData: true, flowVersionToRun: WebhookFlowVersionToRun.LATEST, execute: true, ...extractHeaderFromRequest(request), }) await reply .status(response.status) .headers(response.headers) .send(response.body) }) app.all('/:flowId/test', WEBHOOK_PARAMS, async (request, reply) => { const response = await webhookService.handleWebhook({ data: (projectId: string) => convertRequest(request, projectId, request.params.flowId), logger: request.log, flowId: request.params.flowId, async: true, saveSampleData: true, flowVersionToRun: WebhookFlowVersionToRun.LATEST, execute: false, ...extractHeaderFromRequest(request), }) await reply .status(response.status) .headers(response.headers) .send(response.body) }) } const WEBHOOK_PARAMS = { config: { allowedPrincipals: ALL_PRINCIPAL_TYPES, skipAuth: true, rawBody: true, }, schema: { params: WebhookUrlParams, }, } async function convertRequest( request: FastifyRequest, projectId: string, flowId: string, ): Promise<EventPayload> { return { method: request.method, headers: request.headers as Record<string, string>, body: await convertBody(request, projectId, flowId), queryParams: request.query as Record<string, string>, rawBody: request.rawBody, } } async function convertBody( request: FastifyRequest, projectId: string, flowId: string, ): Promise<unknown> { if (request.isMultipart()) { const jsonResult: Record<string, unknown> = {} const requestBodyEntries = Object.entries( request.body as Record<string, unknown>, ) const platformId = await projectService.getPlatformId(projectId) for (const [key, value] of requestBodyEntries) { if (isMultipartFile(value)) { const file = await stepFileService(request.log).saveAndEnrich({ data: value.data as Buffer, fileName: value.filename, stepName: 'trigger', flowId, contentLength: value.data.length, platformId, projectId, }) jsonResult[key] = file.url } else { jsonResult[key] = value } } return jsonResult } return request.body } function extractHeaderFromRequest(request: FastifyRequest): Pick<FlowRun, 'parentRunId' | 'failParentOnFailure'> { return { parentRunId: request.headers[PARENT_RUN_ID_HEADER] as string, failParentOnFailure: request.headers[FAIL_PARENT_ON_FAILURE_HEADER] === 'true', } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server