Skip to main content
Glama

mcp-google-sheets

app-event-routing.module.ts6.14 kB
import { facebookLeads } from '@activepieces/piece-facebook-leads' import { intercom } from '@activepieces/piece-intercom' import { slack } from '@activepieces/piece-slack' import { square } from '@activepieces/piece-square' import { Piece } from '@activepieces/pieces-framework' import { rejectedPromiseHandler, } from '@activepieces/server-shared' import { ActivepiecesError, ALL_PRINCIPAL_TYPES, apId, assertNotNullOrUndefined, ErrorCode, FlowStatus, isNil, LATEST_JOB_DATA_SCHEMA_VERSION, RunEnvironment, WorkerJobType, } from '@activepieces/shared' import { FastifyPluginAsyncTypebox } from '@fastify/type-provider-typebox' import { FastifyRequest } from 'fastify' import { StatusCodes } from 'http-status-codes' import { domainHelper } from '../../ee/custom-domains/domain-helper' import { flowService } from '../../flows/flow/flow.service' import { projectService } from '../../project/project-service' import { WebhookFlowVersionToRun, webhookHandler } from '../../webhooks/webhook-handler' import { jobQueue } from '../../workers/queue/job-queue' import { JobType } from '../../workers/queue/queue-manager' import { triggerSourceService } from '../trigger-source/trigger-source-service' import { appEventRoutingService } from './app-event-routing.service' const appWebhooks: Record<string, Piece> = { slack, square, 'facebook-leads': facebookLeads, intercom, } const pieceNames: Record<string, string> = { slack: '@activepieces/piece-slack', square: '@activepieces/piece-square', 'facebook-leads': '@activepieces/piece-facebook-leads', intercom: '@activepieces/piece-intercom', } export const appEventRoutingModule: FastifyPluginAsyncTypebox = async (app) => { await app.register(appEventRoutingController, { prefix: '/v1/app-events' }) } export const appEventRoutingController: FastifyPluginAsyncTypebox = async ( fastify, ) => { fastify.all( '/:pieceUrl', { config: { rawBody: true, allowedPrincipals: ALL_PRINCIPAL_TYPES, }, }, async ( request: FastifyRequest<{ Body: unknown Params: { pieceUrl: string } }>, requestReply, ) => { const pieceUrl = request.params.pieceUrl const payload = { headers: request.headers as Record<string, string>, body: request.body, rawBody: request.rawBody, method: request.method, queryParams: request.query as Record<string, string>, } const piece = appWebhooks[pieceUrl] if (isNil(piece)) { throw new ActivepiecesError({ code: ErrorCode.PIECE_NOT_FOUND, params: { pieceName: pieceUrl, pieceVersion: 'latest', message: 'Pieces is not found in app event routing', }, }) } const appName = pieceNames[pieceUrl] assertNotNullOrUndefined(piece.events, 'Event is possible in this piece') const { reply, event, identifierValue } = piece.events.parseAndReply({ payload, server: { publicUrl: await domainHelper.getPublicUrl({ path: '' }), }, }) if (!isNil(reply)) { request.log.info( { reply, piece: pieceUrl, }, '[AppEventRoutingController#event] reply', ) return requestReply .status(StatusCodes.OK) .headers(reply?.headers ?? {}) .send(reply?.body ?? {}) } request.log.info( { event, identifierValue, }, '[AppEventRoutingController#event] event', ) if (isNil(event) || isNil(identifierValue)) { return requestReply.status(StatusCodes.BAD_REQUEST).send({}) } const listeners = await appEventRoutingService.listListeners({ appName, event, identifierValue, }) const eventsQueue = listeners.map(async (listener) => { const requestId = apId() const flow = await flowService(request.log).getOne({ id: listener.flowId, projectId: listener.projectId }) if (isNil(flow)) { return } const flowVersionIdToRun = await webhookHandler.getFlowVersionIdToRun(WebhookFlowVersionToRun.LOCKED_FALL_BACK_TO_LATEST, flow) const platformId = await projectService.getPlatformId(listener.projectId) return jobQueue(request.log).add({ id: requestId, type: JobType.ONE_TIME, data: { platformId, projectId: listener.projectId, schemaVersion: LATEST_JOB_DATA_SCHEMA_VERSION, requestId, payload, flowId: listener.flowId, jobType: WorkerJobType.EXECUTE_WEBHOOK, runEnvironment: RunEnvironment.PRODUCTION, saveSampleData: await triggerSourceService(request.log).existsByFlowId({ flowId: listener.flowId, simulate: true, }), flowVersionIdToRun, execute: flow.status === FlowStatus.ENABLED, }, }) }) rejectedPromiseHandler(Promise.all(eventsQueue), request.log) return requestReply.status(StatusCodes.OK).send({}) }, ) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server