Skip to main content
Glama

mcp-google-sheets

webhook.service.ts6.92 kB
import { pinoLogging } from '@activepieces/server-shared' import { apId, EngineHttpResponse, EventPayload, FlowRun, FlowStatus, isNil, RunEnvironment, TriggerPayload } from '@activepieces/shared' import { trace } from '@opentelemetry/api' import { FastifyBaseLogger } from 'fastify' import { StatusCodes } from 'http-status-codes' import { flowExecutionCache } from '../flows/flow/flow-execution-cache' import { engineResponseWatcher } from '../workers/engine-response-watcher' import { handshakeHandler } from './handshake-handler' import { WebhookFlowVersionToRun, webhookHandler } from './webhook-handler' const tracer = trace.getTracer('webhook-service') export const webhookService = { async handleWebhook({ logger, data, flowId, async, saveSampleData, flowVersionToRun, payload, execute, onRunCreated, parentRunId, failParentOnFailure, }: HandleWebhookParams): Promise<EngineHttpResponse> { return tracer.startActiveSpan('webhook.service.handle', { attributes: { 'webhook.flowId': flowId, 'webhook.async': async, 'webhook.saveSampleData': saveSampleData, 'webhook.execute': execute, }, }, async (span) => { try { const webhookHeader = 'x-webhook-id' const webhookRequestId = apId() span.setAttribute('webhook.requestId', webhookRequestId) const pinoLogger = pinoLogging.createWebhookContextLog({ log: logger, webhookId: webhookRequestId, flowId }) const flowExecutionResult = await flowExecutionCache(pinoLogger).get({ flowId, simulate: saveSampleData, }) if (!flowExecutionResult.exists) { pinoLogger.info('Flow not found, returning GONE') span.setAttribute('webhook.flowFound', false) return { status: StatusCodes.GONE, body: {}, headers: { [webhookHeader]: webhookRequestId, }, } } const { flow } = flowExecutionResult if (flow.status === FlowStatus.DISABLED && !saveSampleData) { pinoLogger.info('trigger source not found, returning NOT FOUND') span.setAttribute('webhook.triggerSourceFound', false) return { status: StatusCodes.NOT_FOUND, body: {}, headers: { [webhookHeader]: webhookRequestId, }, } } span.setAttribute('webhook.flowFound', true) span.setAttribute('webhook.projectId', flow.projectId) const flowVersionIdToRun = await webhookHandler.getFlowVersionIdToRun(flowVersionToRun, flow) span.setAttribute('webhook.flowVersionId', flowVersionIdToRun) const response = await handshakeHandler(pinoLogger).handleHandshakeRequest({ payload: (payload ?? await data(flow.projectId)) as TriggerPayload, handshakeConfiguration: flowExecutionResult.handshakeConfiguration ?? null, flowId: flow.id, flowVersionId: flowVersionIdToRun, projectId: flow.projectId, }) if (!isNil(response)) { logger.info({ message: 'Handshake request completed', flowId: flow.id, flowVersionId: flowVersionIdToRun, webhookRequestId, }, 'Handshake request completed') span.setAttribute('webhook.handshake', true) return { status: response.status, body: response.body, headers: response.headers ?? {}, } } pinoLogger.info('Adding webhook job to queue') if (async) { span.setAttribute('webhook.mode', 'async') return await webhookHandler.handleAsync({ flow, saveSampleData, platformId: flowExecutionResult.platformId, flowVersionIdToRun, payload: payload ?? await data(flow.projectId), logger: pinoLogger, webhookRequestId, runEnvironment: flowVersionToRun === WebhookFlowVersionToRun.LOCKED_FALL_BACK_TO_LATEST ? RunEnvironment.PRODUCTION : RunEnvironment.TESTING, webhookHeader, execute: flow.status === FlowStatus.ENABLED && execute, parentRunId, failParentOnFailure, }) } span.setAttribute('webhook.mode', 'sync') const flowHttpResponse = await webhookHandler.handleSync({ payload: payload ?? await data(flow.projectId), projectId: flow.projectId, flow, platformId: flowExecutionResult.platformId, runEnvironment: flowVersionToRun === WebhookFlowVersionToRun.LOCKED_FALL_BACK_TO_LATEST ? RunEnvironment.PRODUCTION : RunEnvironment.TESTING, logger: pinoLogger, webhookRequestId, synchronousHandlerId: engineResponseWatcher(pinoLogger).getServerId(), flowVersionIdToRun, saveSampleData, flowVersionToRun, onRunCreated, parentRunId, failParentOnFailure, }) return { status: flowHttpResponse.status, body: flowHttpResponse.body, headers: { ...flowHttpResponse.headers, [webhookHeader]: webhookRequestId, }, } } finally { span.end() } }) }, } type HandleWebhookParams = { flowId: string async: boolean saveSampleData: boolean flowVersionToRun: WebhookFlowVersionToRun data: (projectId: string) => Promise<EventPayload> logger: FastifyBaseLogger payload?: Record<string, unknown> execute: boolean onRunCreated?: (run: FlowRun) => void parentRunId?: string failParentOnFailure: boolean }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server