Skip to main content
Glama

Activepieces MCP Server

by eldoonreval
flow.controller.tsβ€’10.5 kB
import { ApplicationEventName, GitPushOperationType } from '@activepieces/ee-shared' import { ActivepiecesError, ApId, CountFlowsRequest, CreateFlowRequest, ErrorCode, FlowOperationRequest, FlowOperationType, FlowStatus, flowStructureUtil, FlowTemplateWithoutProjectInformation, GetFlowQueryParamsRequest, GetFlowTemplateRequestQuery, isNil, ListFlowsRequest, Permission, PlatformUsageMetric, PopulatedFlow, PrincipalType, SeekPage, SERVICE_KEY_SECURITY_OPENAPI, Trigger, } from '@activepieces/shared' import { FastifyPluginAsyncTypebox, Type, } from '@fastify/type-provider-typebox' import dayjs from 'dayjs' import { StatusCodes } from 'http-status-codes' import { authenticationUtils } from '../../authentication/authentication-utils' import { entitiesMustBeOwnedByCurrentProject } from '../../authentication/authorization' import { assertUserHasPermissionToFlow } from '../../ee/authentication/project-role/rbac-middleware' import { checkQuotaOrThrow } from '../../ee/platform/platform-plan/platform-plan-helper' import { gitRepoService } from '../../ee/projects/project-release/git-sync/git-sync.service' import { eventsHooks } from '../../helper/application-events' import { flowService } from './flow.service' const DEFAULT_PAGE_SIZE = 10 export const flowController: FastifyPluginAsyncTypebox = async (app) => { app.addHook('preSerialization', entitiesMustBeOwnedByCurrentProject) app.post('/', CreateFlowRequestOptions, async (request, reply) => { const newFlow = await flowService(request.log).create({ projectId: request.principal.projectId, request: request.body, }) eventsHooks.get(request.log).sendUserEventFromRequest(request, { action: ApplicationEventName.FLOW_CREATED, data: { flow: newFlow, }, }) return reply.status(StatusCodes.CREATED).send(newFlow) }) app.post('/:id', UpdateFlowRequestOptions, async (request) => { if (request.body.type === FlowOperationType.CHANGE_STATUS && request.body.request.status === FlowStatus.ENABLED) { await checkQuotaOrThrow({ platformId: request.principal.platform.id, metric: PlatformUsageMetric.ACTIVE_FLOWS, }) } const userId = await authenticationUtils.extractUserIdFromPrincipal(request.principal) await assertUserHasPermissionToFlow(request.principal, request.body.type, request.log) const flow = await flowService(request.log).getOnePopulatedOrThrow({ id: request.params.id, projectId: request.principal.projectId, }) await assertThatFlowIsNotBeingUsed(flow, userId) eventsHooks.get(request.log).sendUserEventFromRequest(request, { action: ApplicationEventName.FLOW_UPDATED, data: { request: request.body, flowVersion: flow.version, }, }) const updatedFlow = await flowService(request.log).update({ id: request.params.id, userId: request.principal.type === PrincipalType.SERVICE ? null : userId, platformId: request.principal.platform.id, projectId: request.principal.projectId, operation: cleanOperation(request.body), }) return updatedFlow }) app.get('/', ListFlowsRequestOptions, async (request) => { return flowService(request.log).list({ projectId: request.principal.projectId, folderId: request.query.folderId, cursorRequest: request.query.cursor ?? null, limit: request.query.limit ?? DEFAULT_PAGE_SIZE, status: request.query.status, name: request.query.name, versionState: request.query.versionState, connectionExternalIds: request.query.connectionExternalIds, }) }) app.get('/count', CountFlowsRequestOptions, async (request) => { return flowService(request.log).count({ folderId: request.query.folderId, projectId: request.principal.projectId, }) }) app.get('/:id/template', GetFlowTemplateRequestOptions, async (request) => { return flowService(request.log).getTemplate({ flowId: request.params.id, projectId: request.principal.projectId, versionId: undefined, }) }) app.get('/:id', GetFlowRequestOptions, async (request) => { return flowService(request.log).getOnePopulatedOrThrow({ id: request.params.id, projectId: request.principal.projectId, versionId: request.query.versionId, }) }) app.delete('/:id', DeleteFlowRequestOptions, async (request, reply) => { const flow = await flowService(request.log).getOnePopulatedOrThrow({ id: request.params.id, projectId: request.principal.projectId, }) eventsHooks.get(request.log).sendUserEventFromRequest(request, { action: ApplicationEventName.FLOW_DELETED, data: { flow, flowVersion: flow.version, }, }) await gitRepoService(request.log).onDeleted({ type: GitPushOperationType.DELETE_FLOW, idOrExternalId: request.params.id, userId: request.principal.id, projectId: request.principal.projectId, platformId: request.principal.platform.id, log: request.log, }) await flowService(request.log).delete({ id: request.params.id, projectId: request.principal.projectId, }) return reply.status(StatusCodes.NO_CONTENT).send() }) } function cleanOperation(operation: FlowOperationRequest): FlowOperationRequest { if (operation.type === FlowOperationType.IMPORT_FLOW) { const clearInputUiInfo = { currentSelectedData: undefined, sampleDataFileId: undefined, sampleDataInputFileId: undefined, lastTestDate: undefined, } const trigger = flowStructureUtil.transferStep(operation.request.trigger, (step) => { return { ...step, settings: { ...step.settings, inputUiInfo: { ...step.settings.inputUiInfo, ...clearInputUiInfo, }, }, } }) as Trigger return { ...operation, request: { ...operation.request, trigger: { ...trigger, settings: { ...trigger.settings, inputUiInfo: { ...trigger.settings.inputUiInfo, ...clearInputUiInfo, }, }, }, }, } } return operation } async function assertThatFlowIsNotBeingUsed( flow: PopulatedFlow, userId: string, ): Promise<void> { const currentTime = dayjs() if ( !isNil(flow.version.updatedBy) && flow.version.updatedBy !== userId && currentTime.diff(dayjs(flow.version.updated), 'minute') <= 1 ) { throw new ActivepiecesError({ code: ErrorCode.FLOW_IN_USE, params: { flowVersionId: flow.version.id, message: 'Flow is being used by another user in the last minute. Please try again later.', }, }) } } const CreateFlowRequestOptions = { config: { allowedPrincipals: [PrincipalType.USER, PrincipalType.SERVICE], permission: Permission.WRITE_FLOW, }, schema: { tags: ['flows'], description: 'Create a flow', security: [SERVICE_KEY_SECURITY_OPENAPI], body: CreateFlowRequest, response: { [StatusCodes.CREATED]: PopulatedFlow, }, }, } const UpdateFlowRequestOptions = { config: { permission: Permission.UPDATE_FLOW_STATUS, }, schema: { tags: ['flows'], description: 'Apply an operation to a flow', security: [SERVICE_KEY_SECURITY_OPENAPI], body: FlowOperationRequest, params: Type.Object({ id: ApId, }), }, } const ListFlowsRequestOptions = { config: { allowedPrincipals: [PrincipalType.USER, PrincipalType.SERVICE], permission: Permission.READ_FLOW, }, schema: { tags: ['flows'], description: 'List flows', security: [SERVICE_KEY_SECURITY_OPENAPI], querystring: ListFlowsRequest, response: { [StatusCodes.OK]: SeekPage(PopulatedFlow), }, }, } const CountFlowsRequestOptions = { schema: { querystring: CountFlowsRequest, }, } const GetFlowTemplateRequestOptions = { config: { allowedPrincipals: [PrincipalType.USER, PrincipalType.SERVICE], permission: Permission.READ_FLOW, }, schema: { tags: ['flows'], security: [SERVICE_KEY_SECURITY_OPENAPI], description: 'Export flow as template', params: Type.Object({ id: ApId, }), querystring: GetFlowTemplateRequestQuery, response: { [StatusCodes.OK]: FlowTemplateWithoutProjectInformation, }, }, } const GetFlowRequestOptions = { config: { allowedPrincipals: [PrincipalType.USER, PrincipalType.SERVICE], permission: Permission.READ_FLOW, }, schema: { tags: ['flows'], security: [SERVICE_KEY_SECURITY_OPENAPI], description: 'Get a flow by id', params: Type.Object({ id: ApId, }), querystring: GetFlowQueryParamsRequest, response: { [StatusCodes.OK]: PopulatedFlow, }, }, } const DeleteFlowRequestOptions = { config: { allowedPrincipals: [PrincipalType.USER, PrincipalType.SERVICE], permission: Permission.WRITE_FLOW, }, schema: { tags: ['flows'], security: [SERVICE_KEY_SECURITY_OPENAPI], description: 'Delete a flow', params: Type.Object({ id: ApId, }), response: { [StatusCodes.NO_CONTENT]: Type.Never(), }, }, }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eldoonreval/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server