flow-version.service.ts•14.3 kB
import {
    ActivepiecesError,
    apId,
    Cursor,
    ErrorCode,
    FlowAction,
    FlowActionType,
    FlowId,
    FlowOperationRequest,
    flowOperations,
    FlowOperationType,
    flowStructureUtil,
    FlowTrigger,
    FlowTriggerType,
    FlowVersion,
    FlowVersionId,
    FlowVersionState,
    isNil,
    LATEST_SCHEMA_VERSION,
    PlatformId,
    ProjectId,
    sanitizeObjectForPostgresql,
    SeekPage,
    UserId,
} from '@activepieces/shared'
import dayjs from 'dayjs'
import { FastifyBaseLogger } from 'fastify'
import { EntityManager, FindOneOptions } from 'typeorm'
import { repoFactory } from '../../core/db/repo-factory'
import { buildPaginator } from '../../helper/pagination/build-paginator'
import { paginationHelper } from '../../helper/pagination/pagination-utils'
import { pieceMetadataService } from '../../pieces/piece-metadata-service'
import { projectService } from '../../project/project-service'
import { userService } from '../../user/user-service'
import { sampleDataService } from '../step-run/sample-data.service'
import { FlowVersionEntity } from './flow-version-entity'
import { flowVersionMigrationService } from './flow-version-migration.service'
import { flowVersionSideEffects } from './flow-version-side-effects'
import { flowVersionValidationUtil } from './flow-version-validator-util'
export const flowVersionRepo = repoFactory(FlowVersionEntity)
export const flowVersionService = (log: FastifyBaseLogger) => ({
    async lockPieceVersions({
        projectId,
        flowVersion,
        entityManager,
    }: LockPieceVersionsParams): Promise<FlowVersion> {
        if (flowVersion.state === FlowVersionState.LOCKED) {
            return flowVersion
        }
        const pieceVersion: Record<string, string> = {}
        const platformId = await projectService.getPlatformId(projectId)
        const steps = flowStructureUtil.getAllSteps(flowVersion.trigger)
        for (const step of steps) {
            const stepTypeIsPiece = [FlowActionType.PIECE, FlowTriggerType.PIECE].includes(
                step.type,
            )
            if (stepTypeIsPiece) {
                const pieceMetadata = await pieceMetadataService(log).getOrThrow({
                    projectId,
                    platformId,
                    name: step.settings.pieceName,
                    version: step.settings.pieceVersion,
                    entityManager,
                })
                pieceVersion[step.name] = pieceMetadata.version
            }
        }
        return flowStructureUtil.transferFlow(flowVersion, (step) => {
            const clonedStep = JSON.parse(JSON.stringify(step))
            if (pieceVersion[step.name]) {
                clonedStep.settings.pieceVersion = pieceVersion[step.name]
            }
            return clonedStep
        })
    },
    async applyOperation({
        flowVersion,
        projectId,
        userId,
        userOperation,
        entityManager,
        platformId,
    }: ApplyOperationParams): Promise<FlowVersion> {
        let operations: FlowOperationRequest[] = []
        let mutatedFlowVersion: FlowVersion = flowVersion
        switch (userOperation.type) {
            case FlowOperationType.USE_AS_DRAFT: {
                const previousVersion = await flowVersionService(log).getFlowVersionOrThrow({
                    flowId: flowVersion.flowId,
                    versionId: userOperation.request.versionId,
                    removeConnectionsName: false,
                })
                operations = [{
                    type: FlowOperationType.IMPORT_FLOW,
                    request: {
                        trigger: previousVersion.trigger,
                        displayName: previousVersion.displayName,
                        schemaVersion: previousVersion.schemaVersion,
                    },
                }]
                break
            }
            case FlowOperationType.SAVE_SAMPLE_DATA: {
                const modifiedStep = await sampleDataService(log).saveSampleDataFileIdsInStep({
                    projectId,
                    flowVersionId: mutatedFlowVersion.id,
                    stepName: userOperation.request.stepName,
                    payload: userOperation.request.payload,
                    type: userOperation.request.type,
                    dataType: userOperation.request.dataType,
                })
                if (flowStructureUtil.isAction(modifiedStep.type)) {
                    operations = [{
                        type: FlowOperationType.UPDATE_ACTION,
                        request: modifiedStep as FlowAction,
                    }]
                }
                else {
                    operations = [{
                        type: FlowOperationType.UPDATE_TRIGGER,
                        request: modifiedStep as FlowTrigger,
                    }]
                }
                break
            }
            case FlowOperationType.LOCK_FLOW: {
                mutatedFlowVersion = await this.lockPieceVersions({
                    projectId,
                    flowVersion: mutatedFlowVersion,
                    entityManager,
                })
                operations = [userOperation]
                break
            }
            default: {
                operations = [userOperation]
                break
            }
        }
        for (const operation of operations) {
            mutatedFlowVersion = await applySingleOperation(
                projectId,
                mutatedFlowVersion,
                operation,
                platformId,
                log,
            )
        }
        await flowVersionSideEffects(log).postApplyOperation({
            flowVersion: mutatedFlowVersion,
            operation: userOperation,
        })
        mutatedFlowVersion.updated = dayjs().toISOString()
        if (userId) {
            mutatedFlowVersion.updatedBy = userId
        }
        mutatedFlowVersion.connectionIds = flowStructureUtil.extractConnectionIds(mutatedFlowVersion)
        mutatedFlowVersion.agentIds = flowStructureUtil.extractAgentIds(mutatedFlowVersion)
        return flowVersionRepo(entityManager).save(sanitizeObjectForPostgresql(mutatedFlowVersion))
    },
    async getOne(id: FlowVersionId): Promise<FlowVersion | null> {
        if (isNil(id)) {
            return null
        }
        return findOne({
            where: {
                id,
            },
        })
    },
    async exists(id: FlowVersionId): Promise<boolean> {
        return flowVersionRepo().exists({
            where: {
                id,
            },
        })
    },
    async getLatestVersion(flowId: FlowId, state: FlowVersionState): Promise<FlowVersion | null> {
        return findOne({
            where: {
                flowId,
                state,
            },
            order: {
                created: 'DESC',
            },
        })
    },
    async getLatestLockedVersionOrThrow(flowId: FlowId): Promise<FlowVersion> {
        const lockedVersion = await this.getLatestVersion(flowId, FlowVersionState.LOCKED)
        if (isNil(lockedVersion)) {
            throw new ActivepiecesError({
                code: ErrorCode.ENTITY_NOT_FOUND,
                params: {
                    entityId: flowId,
                    entityType: 'FlowVersion',
                },
            })
        }
        return lockedVersion
    },
    async getOneOrThrow(id: FlowVersionId): Promise<FlowVersion> {
        const flowVersion = await flowVersionService(log).getOne(id)
        if (isNil(flowVersion)) {
            throw new ActivepiecesError({
                code: ErrorCode.ENTITY_NOT_FOUND,
                params: {
                    entityId: id,
                    entityType: 'FlowVersion',
                },
            })
        }
        return flowVersion
    },
    async list({
        cursorRequest,
        limit,
        flowId,
    }: ListFlowVersionParams): Promise<SeekPage<FlowVersion>> {
        const decodedCursor = paginationHelper.decodeCursor(cursorRequest)
        const paginator = buildPaginator({
            entity: FlowVersionEntity,
            query: {
                limit,
                order: 'DESC',
                afterCursor: decodedCursor.nextCursor,
                beforeCursor: decodedCursor.previousCursor,
            },
        })
        const paginationResult = await paginator.paginate(
            flowVersionRepo().createQueryBuilder()
                .where({
                    flowId,
                }),
        )
        const promises = paginationResult.data.map(async (flowVersion) => {
            return {
                ...flowVersion,
                updatedByUser: isNil(flowVersion.updatedBy) ? null : await userService.getMetaInformation({
                    id: flowVersion.updatedBy,
                }),
            }
        })
        return paginationHelper.createPage<FlowVersion>(
            await Promise.all(promises),
            paginationResult.cursor,
        )
    },
    async getFlowVersionOrThrow({
        flowId,
        versionId,
        removeConnectionsName = false,
        removeSampleData = false,
        entityManager,
    }: GetFlowVersionOrThrowParams): Promise<FlowVersion> {
        const flowVersion: FlowVersion | null = await findOne({
            where: {
                flowId,
                id: versionId,
            },
            //This is needed to return draft by default because it is always the latest one
            order: {
                created: 'DESC',
            },
        }, entityManager)
        if (isNil(flowVersion)) {
            throw new ActivepiecesError({
                code: ErrorCode.ENTITY_NOT_FOUND,
                params: {
                    entityId: versionId,
                    entityType: 'FlowVersion',
                    message: `flowId=${flowId}`,
                },
            })
        }
        return this.removeConnectionsAndSampleDataFromFlowVersion(
            flowVersion,
            removeConnectionsName,
            removeSampleData,
        )
    },
    async createEmptyVersion(
        flowId: FlowId,
        request: {
            displayName: string
        },
    ): Promise<FlowVersion> {
        const flowVersion: NewFlowVersion = {
            id: apId(),
            displayName: request.displayName,
            flowId,
            trigger: {
                type: FlowTriggerType.EMPTY,
                name: 'trigger',
                settings: {},
                valid: false,
                displayName: 'Select Trigger',
            },
            schemaVersion: LATEST_SCHEMA_VERSION,
            connectionIds: [],
            agentIds: [],
            valid: false,
            state: FlowVersionState.DRAFT,
        }
        return flowVersionRepo().save(flowVersion)
    },
    removeConnectionsAndSampleDataFromFlowVersion(
        flowVersion: FlowVersion,
        removeConnectionNames: boolean,
        removeSampleData: boolean,
    ): FlowVersion {
        return flowStructureUtil.transferFlow(flowVersion, (step) => {
            const clonedStep = JSON.parse(JSON.stringify(step))
            if (removeConnectionNames) {
                clonedStep.settings.input = removeConnectionsFromInput(clonedStep.settings.input)
            }
            if (removeSampleData && !isNil(clonedStep?.settings?.sampleData)) {
                clonedStep.settings.sampleData.sampleDataFileId = undefined
                clonedStep.settings.sampleData.sampleDataInputFileId = undefined
                clonedStep.settings.sampleData.lastTestDate = undefined
            }
            return clonedStep
        })
    },
})
async function findOne(options: FindOneOptions, entityManager?: EntityManager): Promise<FlowVersion | null> {
    const flowVersion = await flowVersionRepo(entityManager).findOne(options)
    if (isNil(flowVersion)) {
        return null
    }
    return flowVersionMigrationService.migrate(flowVersion)
}
async function applySingleOperation(
    projectId: ProjectId,
    flowVersion: FlowVersion,
    operation: FlowOperationRequest,
    platformId: PlatformId,
    log: FastifyBaseLogger,
): Promise<FlowVersion> {
    await flowVersionSideEffects(log).preApplyOperation({
        projectId,
        flowVersion,
        operation,
    })
    const preparedOperation = await flowVersionValidationUtil(log).prepareRequest(projectId, platformId, operation)
    const updatedFlowVersion = flowOperations.apply(flowVersion, preparedOperation)
    await flowVersionSideEffects(log).postApplyOperation({
        flowVersion: updatedFlowVersion,
        operation: preparedOperation,
    })
    return updatedFlowVersion
}
function removeConnectionsFromInput(
    obj: Record<string, unknown>,
): Record<string, unknown> {
    if (isNil(obj)) {
        return obj
    }
    const replacedObj: Record<string, unknown> = {}
    for (const [key, value] of Object.entries(obj)) {
        if (Array.isArray(value)) {
            replacedObj[key] = value
        }
        else if (typeof value === 'object' && value !== null) {
            replacedObj[key] = removeConnectionsFromInput(value as Record<string, unknown>)
        }
        else if (typeof value === 'string') {
            const replacedValue = value.replace(/\{{connections\.[^}]*}}/g, '')
            replacedObj[key] = replacedValue === '' ? undefined : replacedValue
        }
        else {
            replacedObj[key] = value
        }
    }
    return replacedObj
}
type GetFlowVersionOrThrowParams = {
    flowId: FlowId
    versionId: FlowVersionId | undefined
    removeConnectionsName?: boolean
    removeSampleData?: boolean
    entityManager?: EntityManager
}
type NewFlowVersion = Omit<FlowVersion, 'created' | 'updated'>
type ListFlowVersionParams = {
    flowId: FlowId
    cursorRequest: Cursor | null
    limit: number
}
type ApplyOperationParams = {
    userId: UserId | null
    projectId: ProjectId
    platformId: PlatformId
    flowVersion: FlowVersion
    userOperation: FlowOperationRequest
    entityManager?: EntityManager
}
type LockPieceVersionsParams = {
    projectId: ProjectId
    flowVersion: FlowVersion
    entityManager?: EntityManager
}