Skip to main content
Glama

mcp-google-sheets

flow.service.ts20.7 kB
import { rejectedPromiseHandler } from '@activepieces/server-shared' import { ActivepiecesError, apId, assertNotNullOrUndefined, CreateFlowRequest, Cursor, ErrorCode, Flow, FlowId, FlowOperationRequest, FlowOperationType, flowPieceUtil, FlowStatus, FlowTemplateWithoutProjectInformation, FlowVersion, FlowVersionId, FlowVersionState, isNil, Metadata, PlatformId, PopulatedFlow, ProjectId, SeekPage, TelemetryEventName, UserId, } from '@activepieces/shared' import dayjs from 'dayjs' import { FastifyBaseLogger } from 'fastify' import { EntityManager, In, IsNull } from 'typeorm' import { transaction } from '../../core/db/transaction' import { AddAPArrayContainsToQueryBuilder } from '../../database/database-connection' import { distributedLock } from '../../helper/lock' import { buildPaginator } from '../../helper/pagination/build-paginator' import { paginationHelper } from '../../helper/pagination/pagination-utils' import { telemetry } from '../../helper/telemetry.utils' import { projectService } from '../../project/project-service' import { triggerSourceService } from '../../trigger/trigger-source/trigger-source-service' import { flowVersionService } from '../flow-version/flow-version.service' import { flowFolderService } from '../folder/folder.service' import { flowSideEffects } from './flow-service-side-effects' import { FlowEntity } from './flow.entity' import { flowRepo } from './flow.repo' export const flowService = (log: FastifyBaseLogger) => ({ async create({ projectId, request, externalId }: CreateParams): Promise<PopulatedFlow> { const folderId = await getFolderIdFromRequest({ projectId, folderId: request.folderId, folderName: request.folderName, log }) const newFlow: NewFlow = { id: apId(), projectId, folderId, status: FlowStatus.DISABLED, publishedVersionId: null, externalId: externalId ?? apId(), metadata: request.metadata, } const savedFlow = await flowRepo().save(newFlow) const savedFlowVersion = await flowVersionService(log).createEmptyVersion( savedFlow.id, { displayName: request.displayName, }, ) telemetry(log).trackProject(savedFlow.projectId, { name: TelemetryEventName.CREATED_FLOW, payload: { flowId: savedFlow.id, }, }) .catch((e) => log.error(e, '[FlowService#create] telemetry.trackProject'), ) return { ...savedFlow, version: savedFlowVersion, } }, async list({ projectId, cursorRequest, limit, folderId, status, name, connectionExternalIds, agentExternalIds, externalIds, versionState = FlowVersionState.DRAFT, }: ListParams): Promise<SeekPage<PopulatedFlow>> { const decodedCursor = paginationHelper.decodeCursor(cursorRequest) const paginator = buildPaginator({ entity: FlowEntity, alias: 'ff', query: { limit, order: 'DESC', orderBy: 'updated', afterCursor: decodedCursor.nextCursor, beforeCursor: decodedCursor.previousCursor, }, }) const queryWhere: Record<string, unknown> = { projectId } if (folderId !== undefined) { queryWhere.folderId = folderId === 'NULL' ? IsNull() : folderId } if (status !== undefined) { queryWhere.status = In(status) } const queryBuilder = flowRepo().createQueryBuilder('ff') .leftJoin( 'flow_version', 'latest_version', 'latest_version."flowId" = ff.id AND latest_version.id = (SELECT id FROM flow_version WHERE "flowId" = ff.id ORDER BY created DESC LIMIT 1)', ) .where(queryWhere) if (name !== undefined) { queryBuilder.andWhere('LOWER(latest_version."displayName") LIKE LOWER(:name)', { name: `%${name}%` }) } if (externalIds !== undefined) { queryBuilder.andWhere('ff."externalId" IN (:...externalIds)', { externalIds }) } if (connectionExternalIds !== undefined) { AddAPArrayContainsToQueryBuilder(queryBuilder, 'latest_version."connectionIds"', connectionExternalIds) } if (agentExternalIds !== undefined) { AddAPArrayContainsToQueryBuilder(queryBuilder, 'latest_version."agentIds"', agentExternalIds) } const paginationResult = await paginator.paginate(queryBuilder) const populatedFlowPromises = paginationResult.data.map(async (flow) => { const version = await flowVersionService(log).getFlowVersionOrThrow({ flowId: flow.id, versionId: (versionState === FlowVersionState.DRAFT) ? undefined : (flow.publishedVersionId ?? undefined), }) return { ...flow, version, } }) const populatedFlows = (await Promise.all(populatedFlowPromises)).filter((flow) => flow !== null) return paginationHelper.createPage(populatedFlows, paginationResult.cursor) }, async getOneById(id: string): Promise<Flow | null> { const flow = await flowRepo().findOneBy({ id, }) if (isNil(flow)) { return null } const projectExists = await projectService.exists({ projectId: flow.projectId, }) if (!projectExists) { return null } return flow }, async getOne({ id, projectId, entityManager }: GetOneParams): Promise<Flow | null> { const projectExists = await projectService.exists({ projectId, }) if (!projectExists) { return null } return flowRepo(entityManager).findOneBy({ id, projectId, }) }, async getOneOrThrow(params: GetOneParams): Promise<Flow> { const flow = await this.getOne(params) assertFlowIsNotNull(flow) return flow }, async getOnePopulated({ id, projectId, versionId, removeConnectionsName = false, removeSampleData = false, entityManager, }: GetOnePopulatedParams): Promise<PopulatedFlow | null> { const flow = await flowRepo(entityManager).findOne({ where: { id, projectId, }, }) const projectExists = await projectService.exists({ projectId, }) if (isNil(flow) || !projectExists) { return null } const flowVersion = await flowVersionService(log).getFlowVersionOrThrow({ flowId: id, versionId, removeConnectionsName, removeSampleData, entityManager, }) const triggerSource = await triggerSourceService(log).getByFlowId({ flowId: id, projectId, simulate: true, }) return { ...flow, version: flowVersion, triggerSource: triggerSource ?? undefined, } }, async getOnePopulatedOrThrow({ id, projectId, versionId, removeConnectionsName = false, removeSampleData = false, entityManager, }: GetOnePopulatedParams): Promise<PopulatedFlow> { const flow = await this.getOnePopulated({ id, projectId, versionId, removeConnectionsName, removeSampleData, entityManager, }) assertFlowIsNotNull(flow) return flow }, async update({ id, userId, projectId, platformId, operation, lock = true, }: UpdateParams): Promise<PopulatedFlow> { const flowLock = lock ? await distributedLock.acquireLock({ key: id, timeout: 240000, log, }) : null try { switch (operation.type) { case FlowOperationType.LOCK_AND_PUBLISH: { await this.updatedPublishedVersionId({ id, userId, projectId, platformId, }) await this.updateStatus({ id, projectId, newStatus: operation.request.status ?? FlowStatus.ENABLED, }) break } case FlowOperationType.CHANGE_STATUS: { await this.updateStatus({ id, projectId, newStatus: operation.request.status, }) break } case FlowOperationType.CHANGE_FOLDER: { await flowRepo().update(id, { folderId: operation.request.folderId, }) break } case FlowOperationType.UPDATE_METADATA: { await this.updateMetadata({ id, projectId, metadata: operation.request.metadata, }) break } default: { let lastVersion = await flowVersionService(log).getFlowVersionOrThrow({ flowId: id, versionId: undefined, }) if (lastVersion.state === FlowVersionState.LOCKED) { const lastVersionWithArtifacts = await flowVersionService(log).getFlowVersionOrThrow({ flowId: id, versionId: undefined, }) lastVersion = await flowVersionService(log).createEmptyVersion(id, { displayName: lastVersionWithArtifacts.displayName, }) // Duplicate the artifacts from the previous version, otherwise they will be deleted during update operation lastVersion = await flowVersionService(log).applyOperation({ userId, projectId, platformId, flowVersion: lastVersion, userOperation: { type: FlowOperationType.IMPORT_FLOW, request: lastVersionWithArtifacts, }, }) } await flowVersionService(log).applyOperation({ userId, projectId, platformId, flowVersion: lastVersion, userOperation: operation, }) } } } finally { await flowLock?.release() } return this.getOnePopulatedOrThrow({ id, projectId, }) }, async updateStatus({ id, projectId, newStatus, entityManager, }: UpdateStatusParams): Promise<PopulatedFlow> { const flowToUpdate = await this.getOneOrThrow({ id, projectId, entityManager, }) const publishedFlowVersionId = flowToUpdate.publishedVersionId if (flowToUpdate.status !== newStatus) { assertNotNullOrUndefined(publishedFlowVersionId, 'publishedFlowVersionId is required') const publishedFlowVersion = await flowVersionService(log).getFlowVersionOrThrow({ flowId: flowToUpdate.id, versionId: publishedFlowVersionId, entityManager, }) await flowRepo(entityManager).save(flowToUpdate) await flowSideEffects(log).preUpdateStatus({ flowToUpdate, publishedFlowVersion, newStatus, }) flowToUpdate.status = newStatus await flowRepo(entityManager).save(flowToUpdate) } return this.getOnePopulatedOrThrow({ id, projectId, entityManager, }) }, async updatedPublishedVersionId({ id, userId, projectId, platformId, }: UpdatePublishedVersionIdParams): Promise<PopulatedFlow> { const flowToUpdate = await this.getOneOrThrow({ id, projectId }) const flowVersionToPublish = await flowVersionService(log).getFlowVersionOrThrow({ flowId: id, versionId: undefined, }) if (flowToUpdate.status === FlowStatus.ENABLED && !isNil(flowToUpdate.publishedVersionId)) { await triggerSourceService(log).disable({ flowId: flowToUpdate.id, projectId: flowToUpdate.projectId, simulate: false, ignoreError: false, }) } return transaction(async (entityManager) => { const lockedFlowVersion = await lockFlowVersionIfNotLocked({ flowVersion: flowVersionToPublish, userId, projectId, platformId, entityManager, log, }) flowToUpdate.publishedVersionId = lockedFlowVersion.id flowToUpdate.status = FlowStatus.DISABLED const updatedFlow = await flowRepo(entityManager).save(flowToUpdate) return { ...updatedFlow, version: lockedFlowVersion, } }) }, async delete({ id, projectId }: DeleteParams): Promise<void> { const lock = await distributedLock.acquireLock({ key: id, timeout: 10000, log, }) try { const flowToDelete = await this.getOneOrThrow({ id, projectId, }) rejectedPromiseHandler(flowSideEffects(log).preDelete({ flowToDelete, }), log) await flowRepo().delete({ id }) } finally { await lock.release() } }, async getAllEnabled(): Promise<PopulatedFlow[]> { const flows = await flowRepo().findBy({ status: FlowStatus.ENABLED, }) return Promise.all(flows.map(async (flow) => this.getOnePopulatedOrThrow({ id: flow.id, projectId: flow.projectId, versionId: flow.publishedVersionId ?? undefined, }))) }, async getTemplate({ flowId, versionId, projectId, }: GetTemplateParams): Promise<FlowTemplateWithoutProjectInformation> { const flow = await this.getOnePopulatedOrThrow({ id: flowId, projectId, versionId, removeConnectionsName: true, removeSampleData: true, }) return { name: flow.version.displayName, description: '', pieces: Array.from(new Set(flowPieceUtil.getUsedPieces(flow.version.trigger))), template: flow.version, tags: [], created: Date.now().toString(), updated: Date.now().toString(), blogUrl: '', } }, async count({ projectId, folderId, status }: CountParams): Promise<number> { if (folderId === undefined) { return flowRepo().countBy({ projectId, status }) } return flowRepo().countBy({ folderId: folderId !== 'NULL' ? folderId : IsNull(), projectId, status, }) }, async existsByProjectAndStatus(params: ExistsByProjectAndStatusParams): Promise<boolean> { const { projectId, status, entityManager } = params return flowRepo(entityManager).existsBy({ projectId, status, }) }, async updateMetadata({ id, projectId, metadata, }: UpdateMetadataParams): Promise<PopulatedFlow> { const flowToUpdate = await this.getOneOrThrow({ id, projectId, }) flowToUpdate.metadata = metadata await flowRepo().save(flowToUpdate) return this.getOnePopulatedOrThrow({ id, projectId, }) }, async updateLastModified(flowId: FlowId, projectId: ProjectId): Promise<void> { const flow = await this.getOneOrThrow({ id: flowId, projectId, }) flow.updated = dayjs().toISOString() await flowRepo().save(flow) }, }) const lockFlowVersionIfNotLocked = async ({ flowVersion, userId, projectId, platformId, entityManager, log, }: LockFlowVersionIfNotLockedParams): Promise<FlowVersion> => { if (flowVersion.state === FlowVersionState.LOCKED) { return flowVersion } return flowVersionService(log).applyOperation({ userId, projectId, platformId, flowVersion, userOperation: { type: FlowOperationType.LOCK_FLOW, request: { flowId: flowVersion.flowId, }, }, entityManager, }) } const getFolderIdFromRequest = async ({ projectId, folderId, folderName, log }: { projectId: string, folderId: string | undefined, folderName: string | undefined, log: FastifyBaseLogger }) => { if (folderId) { return folderId } if (folderName) { return (await flowFolderService(log).upsert({ projectId, request: { projectId, displayName: folderName, }, })).id } return null } const assertFlowIsNotNull: <T extends Flow>( flow: T | null ) => asserts flow is T = <T>(flow: T | null) => { if (isNil(flow)) { throw new ActivepiecesError({ code: ErrorCode.ENTITY_NOT_FOUND, params: {}, }) } } type CreateParams = { projectId: ProjectId request: CreateFlowRequest externalId?: string } type ListParams = { projectId: ProjectId cursorRequest: Cursor | null limit: number folderId: string | undefined status: FlowStatus[] | undefined name: string | undefined versionState?: FlowVersionState externalIds?: string[] connectionExternalIds?: string[] agentExternalIds?: string[] } type GetOneParams = { id: FlowId projectId: ProjectId entityManager?: EntityManager } type GetOnePopulatedParams = GetOneParams & { versionId?: FlowVersionId removeConnectionsName?: boolean removeSampleData?: boolean } type GetTemplateParams = { flowId: FlowId projectId: ProjectId versionId: FlowVersionId | undefined } type CountParams = { projectId: ProjectId folderId?: string status?: FlowStatus } type UpdateParams = { id: FlowId userId: UserId | null projectId: ProjectId operation: FlowOperationRequest lock?: boolean platformId: PlatformId } type UpdateStatusParams = { id: FlowId projectId: ProjectId newStatus: FlowStatus entityManager?: EntityManager } type UpdatePublishedVersionIdParams = { id: FlowId userId: UserId | null platformId: PlatformId projectId: ProjectId } type DeleteParams = { id: FlowId projectId: ProjectId } type NewFlow = Omit<Flow, 'created' | 'updated'> type LockFlowVersionIfNotLockedParams = { flowVersion: FlowVersion userId: UserId | null projectId: ProjectId platformId: PlatformId entityManager: EntityManager log: FastifyBaseLogger } type ExistsByProjectAndStatusParams = { projectId: ProjectId status: FlowStatus entityManager: EntityManager } type UpdateMetadataParams = { id: FlowId projectId: ProjectId metadata: Metadata | null | undefined }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server