Skip to main content
Glama

mcp-google-sheets

flow-version.service.ts14.3 kB
import { ActivepiecesError, apId, Cursor, ErrorCode, FlowAction, FlowActionType, FlowId, FlowOperationRequest, flowOperations, FlowOperationType, flowStructureUtil, FlowTrigger, FlowTriggerType, FlowVersion, FlowVersionId, FlowVersionState, isNil, LATEST_SCHEMA_VERSION, PlatformId, ProjectId, sanitizeObjectForPostgresql, SeekPage, UserId, } from '@activepieces/shared' import dayjs from 'dayjs' import { FastifyBaseLogger } from 'fastify' import { EntityManager, FindOneOptions } from 'typeorm' import { repoFactory } from '../../core/db/repo-factory' import { buildPaginator } from '../../helper/pagination/build-paginator' import { paginationHelper } from '../../helper/pagination/pagination-utils' import { pieceMetadataService } from '../../pieces/piece-metadata-service' import { projectService } from '../../project/project-service' import { userService } from '../../user/user-service' import { sampleDataService } from '../step-run/sample-data.service' import { FlowVersionEntity } from './flow-version-entity' import { flowVersionMigrationService } from './flow-version-migration.service' import { flowVersionSideEffects } from './flow-version-side-effects' import { flowVersionValidationUtil } from './flow-version-validator-util' export const flowVersionRepo = repoFactory(FlowVersionEntity) export const flowVersionService = (log: FastifyBaseLogger) => ({ async lockPieceVersions({ projectId, flowVersion, entityManager, }: LockPieceVersionsParams): Promise<FlowVersion> { if (flowVersion.state === FlowVersionState.LOCKED) { return flowVersion } const pieceVersion: Record<string, string> = {} const platformId = await projectService.getPlatformId(projectId) const steps = flowStructureUtil.getAllSteps(flowVersion.trigger) for (const step of steps) { const stepTypeIsPiece = [FlowActionType.PIECE, FlowTriggerType.PIECE].includes( step.type, ) if (stepTypeIsPiece) { const pieceMetadata = await pieceMetadataService(log).getOrThrow({ projectId, platformId, name: step.settings.pieceName, version: step.settings.pieceVersion, entityManager, }) pieceVersion[step.name] = pieceMetadata.version } } return flowStructureUtil.transferFlow(flowVersion, (step) => { const clonedStep = JSON.parse(JSON.stringify(step)) if (pieceVersion[step.name]) { clonedStep.settings.pieceVersion = pieceVersion[step.name] } return clonedStep }) }, async applyOperation({ flowVersion, projectId, userId, userOperation, entityManager, platformId, }: ApplyOperationParams): Promise<FlowVersion> { let operations: FlowOperationRequest[] = [] let mutatedFlowVersion: FlowVersion = flowVersion switch (userOperation.type) { case FlowOperationType.USE_AS_DRAFT: { const previousVersion = await flowVersionService(log).getFlowVersionOrThrow({ flowId: flowVersion.flowId, versionId: userOperation.request.versionId, removeConnectionsName: false, }) operations = [{ type: FlowOperationType.IMPORT_FLOW, request: { trigger: previousVersion.trigger, displayName: previousVersion.displayName, schemaVersion: previousVersion.schemaVersion, }, }] break } case FlowOperationType.SAVE_SAMPLE_DATA: { const modifiedStep = await sampleDataService(log).saveSampleDataFileIdsInStep({ projectId, flowVersionId: mutatedFlowVersion.id, stepName: userOperation.request.stepName, payload: userOperation.request.payload, type: userOperation.request.type, dataType: userOperation.request.dataType, }) if (flowStructureUtil.isAction(modifiedStep.type)) { operations = [{ type: FlowOperationType.UPDATE_ACTION, request: modifiedStep as FlowAction, }] } else { operations = [{ type: FlowOperationType.UPDATE_TRIGGER, request: modifiedStep as FlowTrigger, }] } break } case FlowOperationType.LOCK_FLOW: { mutatedFlowVersion = await this.lockPieceVersions({ projectId, flowVersion: mutatedFlowVersion, entityManager, }) operations = [userOperation] break } default: { operations = [userOperation] break } } for (const operation of operations) { mutatedFlowVersion = await applySingleOperation( projectId, mutatedFlowVersion, operation, platformId, log, ) } await flowVersionSideEffects(log).postApplyOperation({ flowVersion: mutatedFlowVersion, operation: userOperation, }) mutatedFlowVersion.updated = dayjs().toISOString() if (userId) { mutatedFlowVersion.updatedBy = userId } mutatedFlowVersion.connectionIds = flowStructureUtil.extractConnectionIds(mutatedFlowVersion) mutatedFlowVersion.agentIds = flowStructureUtil.extractAgentIds(mutatedFlowVersion) return flowVersionRepo(entityManager).save(sanitizeObjectForPostgresql(mutatedFlowVersion)) }, async getOne(id: FlowVersionId): Promise<FlowVersion | null> { if (isNil(id)) { return null } return findOne({ where: { id, }, }) }, async exists(id: FlowVersionId): Promise<boolean> { return flowVersionRepo().exists({ where: { id, }, }) }, async getLatestVersion(flowId: FlowId, state: FlowVersionState): Promise<FlowVersion | null> { return findOne({ where: { flowId, state, }, order: { created: 'DESC', }, }) }, async getLatestLockedVersionOrThrow(flowId: FlowId): Promise<FlowVersion> { const lockedVersion = await this.getLatestVersion(flowId, FlowVersionState.LOCKED) if (isNil(lockedVersion)) { throw new ActivepiecesError({ code: ErrorCode.ENTITY_NOT_FOUND, params: { entityId: flowId, entityType: 'FlowVersion', }, }) } return lockedVersion }, async getOneOrThrow(id: FlowVersionId): Promise<FlowVersion> { const flowVersion = await flowVersionService(log).getOne(id) if (isNil(flowVersion)) { throw new ActivepiecesError({ code: ErrorCode.ENTITY_NOT_FOUND, params: { entityId: id, entityType: 'FlowVersion', }, }) } return flowVersion }, async list({ cursorRequest, limit, flowId, }: ListFlowVersionParams): Promise<SeekPage<FlowVersion>> { const decodedCursor = paginationHelper.decodeCursor(cursorRequest) const paginator = buildPaginator({ entity: FlowVersionEntity, query: { limit, order: 'DESC', afterCursor: decodedCursor.nextCursor, beforeCursor: decodedCursor.previousCursor, }, }) const paginationResult = await paginator.paginate( flowVersionRepo().createQueryBuilder() .where({ flowId, }), ) const promises = paginationResult.data.map(async (flowVersion) => { return { ...flowVersion, updatedByUser: isNil(flowVersion.updatedBy) ? null : await userService.getMetaInformation({ id: flowVersion.updatedBy, }), } }) return paginationHelper.createPage<FlowVersion>( await Promise.all(promises), paginationResult.cursor, ) }, async getFlowVersionOrThrow({ flowId, versionId, removeConnectionsName = false, removeSampleData = false, entityManager, }: GetFlowVersionOrThrowParams): Promise<FlowVersion> { const flowVersion: FlowVersion | null = await findOne({ where: { flowId, id: versionId, }, //This is needed to return draft by default because it is always the latest one order: { created: 'DESC', }, }, entityManager) if (isNil(flowVersion)) { throw new ActivepiecesError({ code: ErrorCode.ENTITY_NOT_FOUND, params: { entityId: versionId, entityType: 'FlowVersion', message: `flowId=${flowId}`, }, }) } return this.removeConnectionsAndSampleDataFromFlowVersion( flowVersion, removeConnectionsName, removeSampleData, ) }, async createEmptyVersion( flowId: FlowId, request: { displayName: string }, ): Promise<FlowVersion> { const flowVersion: NewFlowVersion = { id: apId(), displayName: request.displayName, flowId, trigger: { type: FlowTriggerType.EMPTY, name: 'trigger', settings: {}, valid: false, displayName: 'Select Trigger', }, schemaVersion: LATEST_SCHEMA_VERSION, connectionIds: [], agentIds: [], valid: false, state: FlowVersionState.DRAFT, } return flowVersionRepo().save(flowVersion) }, removeConnectionsAndSampleDataFromFlowVersion( flowVersion: FlowVersion, removeConnectionNames: boolean, removeSampleData: boolean, ): FlowVersion { return flowStructureUtil.transferFlow(flowVersion, (step) => { const clonedStep = JSON.parse(JSON.stringify(step)) if (removeConnectionNames) { clonedStep.settings.input = removeConnectionsFromInput(clonedStep.settings.input) } if (removeSampleData && !isNil(clonedStep?.settings?.sampleData)) { clonedStep.settings.sampleData.sampleDataFileId = undefined clonedStep.settings.sampleData.sampleDataInputFileId = undefined clonedStep.settings.sampleData.lastTestDate = undefined } return clonedStep }) }, }) async function findOne(options: FindOneOptions, entityManager?: EntityManager): Promise<FlowVersion | null> { const flowVersion = await flowVersionRepo(entityManager).findOne(options) if (isNil(flowVersion)) { return null } return flowVersionMigrationService.migrate(flowVersion) } async function applySingleOperation( projectId: ProjectId, flowVersion: FlowVersion, operation: FlowOperationRequest, platformId: PlatformId, log: FastifyBaseLogger, ): Promise<FlowVersion> { await flowVersionSideEffects(log).preApplyOperation({ projectId, flowVersion, operation, }) const preparedOperation = await flowVersionValidationUtil(log).prepareRequest(projectId, platformId, operation) const updatedFlowVersion = flowOperations.apply(flowVersion, preparedOperation) await flowVersionSideEffects(log).postApplyOperation({ flowVersion: updatedFlowVersion, operation: preparedOperation, }) return updatedFlowVersion } function removeConnectionsFromInput( obj: Record<string, unknown>, ): Record<string, unknown> { if (isNil(obj)) { return obj } const replacedObj: Record<string, unknown> = {} for (const [key, value] of Object.entries(obj)) { if (Array.isArray(value)) { replacedObj[key] = value } else if (typeof value === 'object' && value !== null) { replacedObj[key] = removeConnectionsFromInput(value as Record<string, unknown>) } else if (typeof value === 'string') { const replacedValue = value.replace(/\{{connections\.[^}]*}}/g, '') replacedObj[key] = replacedValue === '' ? undefined : replacedValue } else { replacedObj[key] = value } } return replacedObj } type GetFlowVersionOrThrowParams = { flowId: FlowId versionId: FlowVersionId | undefined removeConnectionsName?: boolean removeSampleData?: boolean entityManager?: EntityManager } type NewFlowVersion = Omit<FlowVersion, 'created' | 'updated'> type ListFlowVersionParams = { flowId: FlowId cursorRequest: Cursor | null limit: number } type ApplyOperationParams = { userId: UserId | null projectId: ProjectId platformId: PlatformId flowVersion: FlowVersion userOperation: FlowOperationRequest entityManager?: EntityManager } type LockPieceVersionsParams = { projectId: ProjectId flowVersion: FlowVersion entityManager?: EntityManager }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server