Skip to main content
Glama
flow.service.ts24.7 kB
import { apDayjs, apDayjsDuration } from '@activepieces/server-shared' import { ActivepiecesError, apId, CreateFlowRequest, Cursor, ErrorCode, Flow, FlowId, FlowOperationRequest, FlowOperationStatus, FlowOperationType, flowPieceUtil, FlowStatus, FlowVersion, FlowVersionId, FlowVersionState, isNil, Metadata, PlatformId, PopulatedFlow, ProjectId, SeekPage, SharedTemplate, TelemetryEventName, TemplateType, TriggerSource, UncategorizedFolderId, UserId, } from '@activepieces/shared' import dayjs from 'dayjs' import { FastifyBaseLogger } from 'fastify' import { EntityManager, In, IsNull, Not } from 'typeorm' import { transaction } from '../../core/db/transaction' import { buildPaginator } from '../../helper/pagination/build-paginator' import { paginationHelper } from '../../helper/pagination/pagination-utils' import Paginator, { Order } from '../../helper/pagination/paginator' import { SystemJobData, SystemJobName } from '../../helper/system-jobs/common' import { systemJobsSchedule } from '../../helper/system-jobs/system-job' import { telemetry } from '../../helper/telemetry.utils' import { projectService } from '../../project/project-service' import { triggerSourceService } from '../../trigger/trigger-source/trigger-source-service' import { flowVersionMigrationService } from '../flow-version/flow-version-migration.service' import { flowVersionRepo, flowVersionService } from '../flow-version/flow-version.service' import { flowFolderService } from '../folder/folder.service' import { flowExecutionCache } from './flow-execution-cache' import { FlowEntity } from './flow.entity' import { flowRepo } from './flow.repo' export const flowService = (log: FastifyBaseLogger) => ({ async create({ projectId, request, externalId }: CreateParams): Promise<PopulatedFlow> { const folderId = await getFolderIdFromRequest({ projectId, folderId: request.folderId, folderName: request.folderName, log }) const newFlow: NewFlow = { id: apId(), projectId, folderId, status: FlowStatus.DISABLED, publishedVersionId: null, externalId: externalId ?? apId(), metadata: request.metadata, operationStatus: FlowOperationStatus.NONE, } const savedFlow = await flowRepo().save(newFlow) const savedFlowVersion = await flowVersionService(log).createEmptyVersion( savedFlow.id, { displayName: request.displayName, }, ) telemetry(log).trackProject(savedFlow.projectId, { name: TelemetryEventName.CREATED_FLOW, payload: { flowId: savedFlow.id, }, }) .catch((e) => log.error(e, '[FlowService#create] telemetry.trackProject'), ) return { ...savedFlow, version: savedFlowVersion, } }, async list({ projectIds, platformId, cursorRequest, limit = Paginator.NO_LIMIT, folderId, status, name, connectionExternalIds, agentExternalIds, externalIds, versionState = FlowVersionState.DRAFT, includeTriggerSource = true, }: ListParams): Promise<SeekPage<PopulatedFlow>> { const decodedCursor = paginationHelper.decodeCursor(cursorRequest) const paginator = buildPaginator({ entity: FlowEntity, alias: 'ff', query: { limit, orderBy: [ { field: 'status', order: Order.DESC }, { field: 'updated', order: Order.DESC }, ], afterCursor: decodedCursor.nextCursor, beforeCursor: decodedCursor.previousCursor, }, }) const queryBuilder = flowRepo().createQueryBuilder('ff').where({ operationStatus: Not(FlowOperationStatus.DELETING) }) if (projectIds) { queryBuilder.andWhere({ projectId: In(projectIds) }) } else { queryBuilder .innerJoin('project', 'project', 'project.id = ff."projectId"') .andWhere('project."platformId" = :platformId', { platformId }) } if (folderId !== undefined) { queryBuilder.andWhere({ folderId: folderId === UncategorizedFolderId ? IsNull() : folderId }) } if (status !== undefined) { queryBuilder.andWhere({ status: In(status) }) } const latestVersionSubquery = flowVersionRepo() .createQueryBuilder('fv_sub') .select('fv_sub.id') .where('fv_sub."flowId" = ff.id') .orderBy('fv_sub.created', 'DESC') .limit(1) if (versionState === FlowVersionState.DRAFT) { queryBuilder.leftJoinAndMapOne( 'ff.version', 'flow_version', 'latest_version', `latest_version."flowId" = ff.id AND latest_version.id = (${latestVersionSubquery.getQuery()})`, ) } else { queryBuilder.leftJoin( 'flow_version', 'latest_version', `latest_version."flowId" = ff.id AND latest_version.id = (${latestVersionSubquery.getQuery()})`, ) queryBuilder.leftJoinAndMapOne( 'ff.version', 'flow_version', 'published_version', 'published_version.id = ff.publishedVersionId', ) } if (includeTriggerSource) { queryBuilder.leftJoinAndMapOne( 'ff.triggerSource', 'trigger_source', 'ts', 'ts."flowId" = ff.id AND ts.deleted IS NULL', ) } if (name !== undefined) { queryBuilder.andWhere('LOWER(latest_version."displayName") LIKE LOWER(:name)', { name: `%${name}%` }) } if (externalIds !== undefined) { queryBuilder.andWhere('ff."externalId" IN (:...externalIds)', { externalIds }) } if (connectionExternalIds !== undefined) { queryBuilder.andWhere('latest_version."connectionIds" && :connectionExternalIds', { connectionExternalIds }) } if (agentExternalIds !== undefined) { queryBuilder.andWhere('latest_version."agentIds" && :agentExternalIds', { agentExternalIds }) } const paginationResult = await paginator.paginate<Flow & { version: FlowVersion | null, triggerSource?: TriggerSource }>(queryBuilder) const populatedFlows = await Promise.all(paginationResult.data.map(async (flow) => { if (isNil(flow.version)) { throw new ActivepiecesError({ code: ErrorCode.ENTITY_NOT_FOUND, params: { entityType: 'FlowVersion', message: `flowId=${flow.id}`, }, }) } const migratedVersion = await flowVersionMigrationService.migrate(flow.version) return { ...flow, version: migratedVersion, triggerSource: includeTriggerSource && flow.triggerSource ? { schedule: flow.triggerSource.schedule, } : undefined, } })) return paginationHelper.createPage(populatedFlows, paginationResult.cursor) }, async exists(id: FlowId): Promise<boolean> { return flowRepo().existsBy({ id, }) }, async getOneById(id: string): Promise<Flow | null> { const flow = await flowRepo().findOneBy({ id, }) if (isNil(flow)) { return null } const projectExists = await projectService.exists({ projectId: flow.projectId, }) if (!projectExists) { return null } return flow }, async getOne({ id, projectId, entityManager }: GetOneParams): Promise<Flow | null> { const projectExists = await projectService.exists({ projectId, }) if (!projectExists) { return null } return flowRepo(entityManager).findOneBy({ id, projectId, }) }, async getOneOrThrow(params: GetOneParams): Promise<Flow> { const flow = await this.getOne(params) assertFlowIsNotNull(flow) return flow }, async getOnePopulated({ id, projectId, versionId, removeConnectionsName = false, removeSampleData = false, entityManager, }: GetOnePopulatedParams): Promise<PopulatedFlow | null> { const flow = await flowRepo(entityManager).findOne({ where: { id, projectId, }, }) const projectExists = await projectService.exists({ projectId, }) if (isNil(flow) || !projectExists) { return null } const flowVersion = await flowVersionService(log).getFlowVersionOrThrow({ flowId: id, versionId, removeConnectionsName, removeSampleData, entityManager, }) const triggerSource = await triggerSourceService(log).getByFlowId({ flowId: id, projectId, simulate: undefined, }) return { ...flow, version: flowVersion, triggerSource: triggerSource ? { schedule: triggerSource.schedule, } : undefined, } }, async getOnePopulatedOrThrow({ id, projectId, versionId, removeConnectionsName = false, removeSampleData = false, entityManager, }: GetOnePopulatedParams): Promise<PopulatedFlow> { const flow = await this.getOnePopulated({ id, projectId, versionId, removeConnectionsName, removeSampleData, entityManager, }) assertFlowIsNotNull(flow) return flow }, async update({ id, userId, projectId, platformId, operation, }: UpdateParams): Promise<PopulatedFlow> { if (operation.type === FlowOperationType.LOCK_AND_PUBLISH || operation.type === FlowOperationType.CHANGE_STATUS) { const flow = await this.getOneOrThrow({ id, projectId, }) if (flow.operationStatus !== FlowOperationStatus.NONE) { throw new ActivepiecesError({ code: ErrorCode.FLOW_OPERATION_IN_PROGRESS, params: { message: `Flow is busy with ${flow.operationStatus.toLocaleLowerCase()} operation. Please try again in a moment.`, }, }) } } switch (operation.type) { case FlowOperationType.LOCK_AND_PUBLISH: { await this.updatedPublishedVersionId({ id, userId, projectId, platformId, }) await flowRepo().update(id, { operationStatus: operation.request.status === FlowStatus.ENABLED ? FlowOperationStatus.ENABLING : FlowOperationStatus.DISABLING, }) await this.addUpdateStatusJob({ id, projectId, newStatus: operation.request.status ?? FlowStatus.ENABLED, }) break } case FlowOperationType.CHANGE_STATUS: { await flowRepo().update(id, { operationStatus: operation.request.status === FlowStatus.ENABLED ? FlowOperationStatus.ENABLING : FlowOperationStatus.DISABLING, }) await this.addUpdateStatusJob({ id, projectId, newStatus: operation.request.status, }) break } case FlowOperationType.CHANGE_FOLDER: { await flowRepo().update(id, { folderId: operation.request.folderId, }) break } case FlowOperationType.UPDATE_METADATA: { await this.updateMetadata({ id, projectId, metadata: operation.request.metadata, }) break } case FlowOperationType.UPDATE_MINUTES_SAVED: { await flowRepo().update(id, { timeSavedPerRun: operation.request.timeSavedPerRun, }) break } default: { let lastVersion = await flowVersionService( log, ).getFlowVersionOrThrow({ flowId: id, versionId: undefined, }) if (lastVersion.state === FlowVersionState.LOCKED) { const lastVersionWithArtifacts = await flowVersionService( log, ).getFlowVersionOrThrow({ flowId: id, versionId: undefined, }) lastVersion = await flowVersionService( log, ).createEmptyVersion(id, { displayName: lastVersionWithArtifacts.displayName, }) // Duplicate the artifacts from the previous version, otherwise they will be deleted during update operation lastVersion = await flowVersionService(log).applyOperation({ userId, projectId, platformId, flowVersion: lastVersion, userOperation: { type: FlowOperationType.IMPORT_FLOW, request: lastVersionWithArtifacts, }, }) } await flowVersionService(log).applyOperation({ userId, projectId, platformId, flowVersion: lastVersion, userOperation: operation, }) } } return this.getOnePopulatedOrThrow({ id, projectId, }) }, async updatedPublishedVersionId({ id, userId, projectId, platformId, }: UpdatePublishedVersionIdParams): Promise<PopulatedFlow> { const flowToUpdate = await this.getOneOrThrow({ id, projectId }) const flowVersionToPublish = await flowVersionService(log).getFlowVersionOrThrow({ flowId: id, versionId: undefined, }) if (flowToUpdate.status === FlowStatus.ENABLED && !isNil(flowToUpdate.publishedVersionId)) { await triggerSourceService(log).disable({ flowId: flowToUpdate.id, projectId: flowToUpdate.projectId, simulate: false, ignoreError: false, }) } return transaction(async (entityManager) => { const lockedFlowVersion = await lockFlowVersionIfNotLocked({ flowVersion: flowVersionToPublish, userId, projectId, platformId, entityManager, log, }) flowToUpdate.publishedVersionId = lockedFlowVersion.id flowToUpdate.status = FlowStatus.DISABLED const updatedFlow = await flowRepo(entityManager).save(flowToUpdate) await flowExecutionCache(log).invalidate(updatedFlow.id) return { ...updatedFlow, version: lockedFlowVersion, } }) }, async delete({ id, projectId }: DeleteParams): Promise<void> { const flow = await this.getOneOrThrow({ id, projectId, }) if (flow.operationStatus !== FlowOperationStatus.NONE) { throw new ActivepiecesError({ code: ErrorCode.FLOW_OPERATION_IN_PROGRESS, params: { message: `Flow ${id} is already being ${flow.operationStatus}`, }, }) } await this.addDeleteJob(flow) await flowRepo().update(id, { operationStatus: FlowOperationStatus.DELETING, }) }, async getAllEnabled(): Promise<PopulatedFlow[]> { const flows = await flowRepo().findBy({ status: FlowStatus.ENABLED, }) return Promise.all(flows.map(async (flow) => this.getOnePopulatedOrThrow({ id: flow.id, projectId: flow.projectId, versionId: flow.publishedVersionId ?? undefined, }))) }, async deleteAllByPlatformId(platformId: PlatformId): Promise<void> { const projectIds = await projectService.getProjectIdsByPlatform(platformId) const flows = await flowRepo().findBy({ projectId: In(projectIds), }) await Promise.all(flows.map((flow) => this.delete({ id: flow.id, projectId: flow.projectId }))) }, async getTemplate({ flowId, versionId, projectId, }: GetTemplateParams): Promise<SharedTemplate> { const flow = await this.getOnePopulatedOrThrow({ id: flowId, projectId, versionId, removeConnectionsName: true, removeSampleData: true, }) const template: SharedTemplate = { name: flow.version.displayName, summary: '', description: '', pieces: Array.from(new Set(flowPieceUtil.getUsedPieces(flow.version.trigger))), flows: [flow.version], tags: [], blogUrl: '', metadata: null, author: '', categories: [], type: TemplateType.SHARED, } return template }, async count({ projectId, folderId, status }: CountParams): Promise<number> { if (folderId === undefined) { return flowRepo().countBy({ projectId, status }) } return flowRepo().countBy({ folderId: folderId !== UncategorizedFolderId ? folderId : IsNull(), projectId, status, }) }, async existsByProjectAndStatus(params: ExistsByProjectAndStatusParams): Promise<boolean> { const { projectId, status, entityManager } = params return flowRepo(entityManager).existsBy({ projectId, status, }) }, async updateMetadata({ id, projectId, metadata, }: UpdateMetadataParams): Promise<PopulatedFlow> { const flowToUpdate = await this.getOneOrThrow({ id, projectId, }) flowToUpdate.metadata = metadata await flowRepo().save(flowToUpdate) return this.getOnePopulatedOrThrow({ id, projectId, }) }, async updateLastModified(flowId: FlowId, projectId: ProjectId): Promise<void> { const flow = await this.getOneOrThrow({ id: flowId, projectId, }) flow.updated = dayjs().toISOString() await flowRepo().save(flow) }, addDeleteJob: async (flow: Flow): Promise<void> => { await systemJobsSchedule(log).upsertJob({ job: { name: SystemJobName.DELETE_FLOW, data: { flow, preDeleteDone: false, dbDeleteDone: false, }, jobId: `delete-flow-${flow.id}`, }, schedule: { type: 'one-time', date: apDayjs(), }, customConfig: { backoff: { type: 'exponential', delay: apDayjsDuration(5, 'second').asMilliseconds(), }, }, }) }, addUpdateStatusJob: async (data: Omit<SystemJobData<SystemJobName.UPDATE_FLOW_STATUS>, 'preUpdateDone'>): Promise<void> => { await systemJobsSchedule(log).upsertJob({ job: { name: SystemJobName.UPDATE_FLOW_STATUS, data: { ...data, preUpdateDone: false, }, jobId: `update-flow-status-${data.id}`, }, schedule: { type: 'one-time', date: apDayjs(), }, customConfig: { backoff: { type: 'exponential', delay: apDayjsDuration(5, 'second').asMilliseconds(), }, }, }) }, }) const lockFlowVersionIfNotLocked = async ({ flowVersion, userId, projectId, platformId, entityManager, log, }: LockFlowVersionIfNotLockedParams): Promise<FlowVersion> => { if (flowVersion.state === FlowVersionState.LOCKED) { return flowVersion } return flowVersionService(log).applyOperation({ userId, projectId, platformId, flowVersion, userOperation: { type: FlowOperationType.LOCK_FLOW, request: { flowId: flowVersion.flowId, }, }, entityManager, }) } const getFolderIdFromRequest = async ({ projectId, folderId, folderName, log }: { projectId: string, folderId: string | undefined, folderName: string | undefined, log: FastifyBaseLogger }) => { if (folderId) { return folderId } if (folderName) { return (await flowFolderService(log).upsert({ projectId, request: { projectId, displayName: folderName, }, })).id } return null } const assertFlowIsNotNull: <T extends Flow>( flow: T | null ) => asserts flow is T = <T>(flow: T | null) => { if (isNil(flow)) { throw new ActivepiecesError({ code: ErrorCode.ENTITY_NOT_FOUND, params: {}, }) } } type CreateParams = { projectId: ProjectId request: CreateFlowRequest externalId?: string } type ListParamsBase = { cursorRequest?: Cursor limit?: number folderId?: string status?: FlowStatus[] name?: string versionState?: FlowVersionState externalIds?: string[] connectionExternalIds?: string[] agentExternalIds?: string[] includeTriggerSource?: boolean } type ListParams = ListParamsBase & ( | { projectIds: ProjectId[], platformId?: never } | { projectIds?: never, platformId: PlatformId } ) type GetOneParams = { id: FlowId projectId: ProjectId entityManager?: EntityManager } type GetOnePopulatedParams = GetOneParams & { versionId?: FlowVersionId removeConnectionsName?: boolean removeSampleData?: boolean } type GetTemplateParams = { flowId: FlowId projectId: ProjectId versionId: FlowVersionId | undefined } type CountParams = { projectId: ProjectId folderId?: string status?: FlowStatus } type UpdateParams = { id: FlowId userId: UserId | null projectId: ProjectId operation: FlowOperationRequest platformId: PlatformId } type UpdatePublishedVersionIdParams = { id: FlowId userId: UserId | null platformId: PlatformId projectId: ProjectId } type DeleteParams = { id: FlowId projectId: ProjectId } type NewFlow = Omit<Flow, 'created' | 'updated'> type LockFlowVersionIfNotLockedParams = { flowVersion: FlowVersion userId: UserId | null projectId: ProjectId platformId: PlatformId entityManager: EntityManager log: FastifyBaseLogger } type ExistsByProjectAndStatusParams = { projectId: ProjectId status: FlowStatus entityManager: EntityManager } type UpdateMetadataParams = { id: FlowId projectId: ProjectId metadata: Metadata | null | undefined }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server