Skip to main content
Glama

mcp-google-sheets

webhook.service.ts5.6 kB
import { pinoLogging } from '@activepieces/server-shared' import { ActivepiecesError, apId, EngineHttpResponse, ErrorCode, EventPayload, FlowRun, FlowStatus, isNil, PlatformUsageMetric, RunEnvironment, TriggerPayload } from '@activepieces/shared' import { FastifyBaseLogger } from 'fastify' import { StatusCodes } from 'http-status-codes' import { projectLimitsService } from '../ee/projects/project-plan/project-plan.service' import { flowService } from '../flows/flow/flow.service' import { triggerSourceService } from '../trigger/trigger-source/trigger-source-service' import { engineResponseWatcher } from '../workers/engine-response-watcher' import { handshakeHandler } from './handshake-handler' import { WebhookFlowVersionToRun, webhookHandler } from './webhook-handler' export const webhookService = { async handleWebhook({ logger, data, flowId, async, saveSampleData, flowVersionToRun, payload, execute, onRunCreated, parentRunId, failParentOnFailure, }: HandleWebhookParams): Promise<EngineHttpResponse> { const webhookHeader = 'x-webhook-id' const webhookRequestId = apId() const pinoLogger = pinoLogging.createWebhookContextLog({ log: logger, webhookId: webhookRequestId, flowId }) const flow = await flowService(pinoLogger).getOneById(flowId) if (isNil(flow)) { pinoLogger.info('Flow not found, returning GONE') return { status: StatusCodes.GONE, body: {}, headers: {}, } } const flowVersionIdToRun = await webhookHandler.getFlowVersionIdToRun(flowVersionToRun, flow) const exceededLimit = await projectLimitsService(pinoLogger).checkTasksExceededLimit(flow.projectId) if (exceededLimit) { throw new ActivepiecesError({ code: ErrorCode.QUOTA_EXCEEDED, params: { metric: PlatformUsageMetric.TASKS, }, }) } const triggerSource = await triggerSourceService(pinoLogger).getByFlowId({ flowId: flow.id, projectId: flow.projectId, simulate: saveSampleData, }) const response = await handshakeHandler(pinoLogger).handleHandshakeRequest({ payload: (payload ?? await data(flow.projectId)) as TriggerPayload, handshakeConfiguration: await handshakeHandler(pinoLogger).getWebhookHandshakeConfiguration(triggerSource), flowId: flow.id, flowVersionId: flowVersionIdToRun, projectId: flow.projectId, }) if (!isNil(response)) { logger.info({ message: 'Handshake request completed', flowId: flow.id, flowVersionId: flowVersionIdToRun, webhookRequestId, }, 'Handshake request completed') return { status: response.status, body: response.body, headers: response.headers ?? {}, } } const flowDisabledAndNoSaveSampleData = flow.status !== FlowStatus.ENABLED && !saveSampleData && flowVersionToRun === WebhookFlowVersionToRun.LOCKED_FALL_BACK_TO_LATEST if (flowDisabledAndNoSaveSampleData) { return { status: StatusCodes.NOT_FOUND, body: {}, headers: { [webhookHeader]: webhookRequestId, }, } } pinoLogger.info('Adding webhook job to queue') if (async) { return webhookHandler.handleAsync({ flow, saveSampleData, flowVersionIdToRun, payload: payload ?? await data(flow.projectId), logger: pinoLogger, webhookRequestId, runEnvironment: flowVersionToRun === WebhookFlowVersionToRun.LOCKED_FALL_BACK_TO_LATEST ? RunEnvironment.PRODUCTION : RunEnvironment.TESTING, webhookHeader, execute: flow.status === FlowStatus.ENABLED && execute, parentRunId, failParentOnFailure, }) } const flowHttpResponse = await webhookHandler.handleSync({ payload: payload ?? await data(flow.projectId), projectId: flow.projectId, flow, runEnvironment: flowVersionToRun === WebhookFlowVersionToRun.LOCKED_FALL_BACK_TO_LATEST ? RunEnvironment.PRODUCTION : RunEnvironment.TESTING, logger: pinoLogger, webhookRequestId, synchronousHandlerId: engineResponseWatcher(pinoLogger).getServerId(), flowVersionIdToRun, saveSampleData, flowVersionToRun, onRunCreated, parentRunId, failParentOnFailure, }) return { status: flowHttpResponse.status, body: flowHttpResponse.body, headers: { ...flowHttpResponse.headers, [webhookHeader]: webhookRequestId, }, } }, } type HandleWebhookParams = { flowId: string async: boolean saveSampleData: boolean flowVersionToRun: WebhookFlowVersionToRun data: (projectId: string) => Promise<EventPayload> logger: FastifyBaseLogger payload?: Record<string, unknown> execute: boolean onRunCreated?: (run: FlowRun) => void parentRunId?: string failParentOnFailure: boolean }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server