Skip to main content
Glama

mcp-google-sheets

flow-run-service.ts26.7 kB
import { AppSystemProp, exceptionHandler, rejectedPromiseHandler } from '@activepieces/server-shared' import { ActivepiecesError, apId, assertNotNullOrUndefined, Cursor, EngineHttpResponse, ErrorCode, ExecutionType, ExecutioOutputFile, File, FileCompression, FileType, FlowId, FlowRetryStrategy, FlowRun, FlowRunId, FlowRunStatus, FlowVersionId, isNil, LATEST_JOB_DATA_SCHEMA_VERSION, PauseMetadata, PauseType, ProgressUpdateType, ProjectId, RunEnvironment, SampleDataFileType, SeekPage, spreadIfDefined, WorkerJobType, } from '@activepieces/shared' import dayjs from 'dayjs' import { FastifyBaseLogger } from 'fastify' import { StatusCodes } from 'http-status-codes' import { In, Not } from 'typeorm' import { repoFactory } from '../../core/db/repo-factory' import { APArrayContains, } from '../../database/database-connection' import { fileService } from '../../file/file.service' import { s3Helper } from '../../file/s3-helper' import { flowVersionService } from '../../flows/flow-version/flow-version.service' import { buildPaginator } from '../../helper/pagination/build-paginator' import { paginationHelper } from '../../helper/pagination/pagination-utils' import { Order } from '../../helper/pagination/paginator' import { system } from '../../helper/system/system' import { projectService } from '../../project/project-service' import { engineResponseWatcher } from '../../workers/engine-response-watcher' import { jobQueue } from '../../workers/queue/job-queue' import { JobType } from '../../workers/queue/queue-manager' import { flowService } from '../flow/flow.service' import { sampleDataService } from '../step-run/sample-data.service' import { FlowRunEntity } from './flow-run-entity' import { flowRunSideEffects } from './flow-run-side-effects' export const WEBHOOK_TIMEOUT_MS = system.getNumberOrThrow(AppSystemProp.WEBHOOK_TIMEOUT_SECONDS) * 1000 export const flowRunRepo = repoFactory<FlowRun>(FlowRunEntity) const maxFileSizeInBytes = system.getNumberOrThrow(AppSystemProp.MAX_FILE_SIZE_MB) * 1024 * 1024 const USE_SIGNED_URL = system.getBoolean(AppSystemProp.S3_USE_SIGNED_URLS) ?? false export const flowRunService = (log: FastifyBaseLogger) => ({ async list(params: ListParams): Promise<SeekPage<FlowRun>> { const decodedCursor = paginationHelper.decodeCursor(params.cursor) const paginator = buildPaginator<FlowRun>({ entity: FlowRunEntity, query: { limit: params.limit, order: Order.DESC, orderBy: 'created', afterCursor: decodedCursor.nextCursor, beforeCursor: decodedCursor.previousCursor, }, }) let query = flowRunRepo().createQueryBuilder('flow_run').where({ projectId: params.projectId, environment: RunEnvironment.PRODUCTION, }) if (params.flowId) { query = query.andWhere({ flowId: In(params.flowId), }) } if (params.status) { query = query.andWhere({ status: In(params.status), }) } if (params.createdAfter) { query = query.andWhere('flow_run.created >= :createdAfter', { createdAfter: params.createdAfter, }) } if (params.createdBefore) { query = query.andWhere('flow_run.created <= :createdBefore', { createdBefore: params.createdBefore, }) } if (params.tags) { query = query.andWhere(APArrayContains('tags', params.tags)) } if (!isNil(params.failedStepName)) { query = query.andWhere({ failedStepName: params.failedStepName, }) } const { data, cursor: newCursor } = await paginator.paginate(query) return paginationHelper.createPage<FlowRun>(data, newCursor) }, async retry({ flowRunId, strategy, projectId }: RetryParams): Promise<FlowRun | null> { const oldFlowRun = await flowRunService(log).getOnePopulatedOrThrow({ id: flowRunId, projectId, }) switch (strategy) { case FlowRetryStrategy.FROM_FAILED_STEP: await flowRunRepo().update({ id: oldFlowRun.id, projectId: oldFlowRun.projectId, }, { status: FlowRunStatus.QUEUED, }) return flowRunService(log).resume({ flowRunId: oldFlowRun.id, executionType: ExecutionType.RESUME, progressUpdateType: ProgressUpdateType.NONE, checkRequestId: false, }) case FlowRetryStrategy.ON_LATEST_VERSION: { const latestFlowVersion = await flowVersionService(log).getLatestLockedVersionOrThrow( oldFlowRun.flowId, ) const payload = oldFlowRun.steps ? oldFlowRun.steps[latestFlowVersion.trigger.name]?.output : undefined await flowRunRepo().update({ id: oldFlowRun.id, projectId: oldFlowRun.projectId, }, { flowVersionId: latestFlowVersion.id, status: FlowRunStatus.QUEUED, }) const updatedFlowRun = await flowRunRepo().findOneByOrFail({ id: oldFlowRun.id }) return addToQueue({ payload, flowRun: updatedFlowRun, synchronousHandlerId: undefined, httpRequestId: undefined, progressUpdateType: ProgressUpdateType.NONE, executionType: ExecutionType.BEGIN, executeTrigger: false, }, log) } } }, async existsBy(runId: FlowRunId): Promise<boolean> { return flowRunRepo().existsBy({ id: runId }) }, async bulkRetry({ projectId, flowRunIds, strategy, status, flowId, createdAfter, createdBefore, excludeFlowRunIds, failedStepName }: BulkRetryParams): Promise<(FlowRun | null)[]> { const filteredFlowRunIds = await filterFlowRunsAndApplyFilters(projectId, flowRunIds, status, flowId, createdAfter, createdBefore, excludeFlowRunIds, failedStepName) return Promise.all(filteredFlowRunIds.map(flowRunId => this.retry({ flowRunId, strategy, projectId }))) }, async resume({ flowRunId, payload, requestId, progressUpdateType, executionType, checkRequestId, }: ResumeWebhookParams): Promise<FlowRun | null> { log.info({ flowRunId, }, '[FlowRunService#resume] adding flow run to queue') const flowRunToResume = await flowRunRepo().findOneBy({ id: flowRunId, }) if (isNil(flowRunToResume)) { throw new ActivepiecesError({ code: ErrorCode.FLOW_RUN_NOT_FOUND, params: { id: flowRunId, }, }) } const pauseMetadata = flowRunToResume.pauseMetadata const matchRequestId = isNil(pauseMetadata) || (pauseMetadata.type === PauseType.WEBHOOK && requestId === pauseMetadata.requestId) if (matchRequestId || !checkRequestId) { return addToQueue({ payload, flowRun: flowRunToResume, synchronousHandlerId: returnHandlerId(pauseMetadata, requestId, log), httpRequestId: flowRunToResume.pauseMetadata?.requestIdToReply ?? undefined, progressUpdateType, executeTrigger: false, executionType, }, log) } await flowRunSideEffects(log).onResume(flowRunToResume) return flowRunToResume }, updateRunStatusAsync({ flowRunId, status }: UpdateRunStatusParams): void { rejectedPromiseHandler(flowRunRepo().update(flowRunId, { status }), log) }, async updateRun({ flowRunId, status, tasks, projectId, tags, duration, failedStepName, }: FinishParams): Promise<FlowRun> { log.info({ flowRunId, status, tasks, duration, failedStepName, }, '[FlowRunService#updateRun]') await flowRunRepo().update({ id: flowRunId, projectId, }, { status, ...spreadIfDefined('tasks', tasks), ...spreadIfDefined('duration', duration ? Math.floor(Number(duration)) : undefined), tags, finishTime: new Date().toISOString(), failedStepName: failedStepName ?? undefined, }) const flowRun = await flowRunRepo().findOneByOrFail({ id: flowRunId }) await flowRunSideEffects(log).onFinish(flowRun) return flowRun }, async updateLogs({ flowRunId, logsFileId, projectId, executionStateString, executionStateContentLength }: UpdateLogs): Promise<void> { const executionState = executionStateString ? Buffer.from(executionStateString) : undefined if (executionStateContentLength > maxFileSizeInBytes || (!isNil(executionState) && executionState.byteLength > maxFileSizeInBytes)) { const errors = new Error( 'Execution Output is too large, maximum size is ' + maxFileSizeInBytes, ) exceptionHandler.handle(errors, log) throw errors } const newLogsFileId = logsFileId ?? apId() await fileService(log).save({ fileId: newLogsFileId, projectId, data: executionState ?? null, size: executionStateContentLength, type: FileType.FLOW_RUN_LOG, compression: FileCompression.NONE, metadata: { flowRunId, projectId, }, }) await flowRunRepo().update(flowRunId, { logsFileId: newLogsFileId, }) }, async updateLogsSizeAndAttachLogsFile({ flowRunId, logsFileId, executionStateContentLength }: UpdateLogsSizeAndAttachLogsFileParams): Promise<void> { await flowRunRepo().update(flowRunId, { logsFileId, }) await fileService(log).updateSize({ fileId: logsFileId, size: executionStateContentLength, }) }, async start({ payload, executeTrigger, executionType, synchronousHandlerId, progressUpdateType, httpRequestId, projectId, flowVersionId, parentRunId, failParentOnFailure, stepNameToTest, environment, }: StartParams): Promise<FlowRun> { const flowVersion = await flowVersionService(log).getOneOrThrow(flowVersionId) const flow = await flowService(log).getOneOrThrow({ id: flowVersion.flowId, projectId, }) const newFlowRun = await create({ projectId, flowVersionId, parentRunId, flowId: flow.id, failParentOnFailure, stepNameToTest, flowDisplayName: flowVersion.displayName, environment, }) await addToQueue({ flowRun: newFlowRun, payload, executeTrigger, executionType, synchronousHandlerId, httpRequestId, progressUpdateType, }, log) await flowRunSideEffects(log).onStart(newFlowRun) return newFlowRun }, async test({ projectId, flowVersionId, parentRunId, stepNameToTest }: TestParams): Promise<FlowRun> { const flowVersion = await flowVersionService(log).getOneOrThrow(flowVersionId) const triggerPayload = await sampleDataService(log).getOrReturnEmpty({ projectId, flowVersion, stepName: flowVersion.trigger.name, type: SampleDataFileType.OUTPUT, }) const flowRun = await create({ projectId, flowId: flowVersion.flowId, flowVersionId: flowVersion.id, flowDisplayName: flowVersion.displayName, environment: RunEnvironment.TESTING, parentRunId, failParentOnFailure: undefined, stepNameToTest, }) return addToQueue({ flowRun, payload: triggerPayload, executionType: ExecutionType.BEGIN, synchronousHandlerId: undefined, httpRequestId: undefined, executeTrigger: false, progressUpdateType: ProgressUpdateType.TEST_FLOW, sampleData: !isNil(stepNameToTest) ? await sampleDataService(log).getSampleDataForFlow(projectId, flowVersion, SampleDataFileType.OUTPUT) : undefined, }, log) }, async pause(params: PauseParams): Promise<void> { log.info({ flowRunId: params.flowRunId, pauseType: params.pauseMetadata.type, }, '[FlowRunService] pausing flow run') const { flowRunId, pauseMetadata } = params log.info( `[FlowRunSideEffects#pause] flowRunId=${flowRunId} pauseType=${pauseMetadata?.type}`, ) if (isNil(pauseMetadata)) { throw new ActivepiecesError({ code: ErrorCode.VALIDATION, params: { message: `pauseMetadata is undefined flowRunId=${flowRunId}`, }, }) } await flowRunRepo().update(flowRunId, { status: FlowRunStatus.PAUSED, // eslint-disable-next-line @typescript-eslint/no-explicit-any pauseMetadata: pauseMetadata as any, }) const flowRun = await flowRunRepo().findOneByOrFail({ id: flowRunId }) switch (pauseMetadata.type) { case PauseType.DELAY: { const platformId = await projectService.getPlatformId(flowRun.projectId) // Todo(@amrdb): please make this not hacky const MINIMUM_DELAY_IN_MILLISECONDS_UNTIL_FIRST_JOB_IS_MARKED_AS_COMPLETED = dayjs.duration(60, 'seconds').asMilliseconds() await jobQueue(log).add({ id: 'delayed_' + flowRun.id, type: JobType.ONE_TIME, data: { jobType: WorkerJobType.DELAYED_FLOW, platformId, schemaVersion: LATEST_JOB_DATA_SCHEMA_VERSION, runId: flowRun.id, flowId: flowRun.flowId, synchronousHandlerId: flowRun.pauseMetadata?.handlerId ?? null, progressUpdateType: flowRun.pauseMetadata?.progressUpdateType ?? ProgressUpdateType.NONE, projectId: flowRun.projectId, environment: flowRun.environment, flowVersionId: flowRun.flowVersionId, }, delay: Math.max(MINIMUM_DELAY_IN_MILLISECONDS_UNTIL_FIRST_JOB_IS_MARKED_AS_COMPLETED, dayjs(pauseMetadata.resumeDateTime).diff(dayjs(), 'ms')), }) break } case PauseType.WEBHOOK: break } }, async getOne(params: GetOneParams): Promise<FlowRun | null> { return flowRunRepo().findOneBy({ projectId: params.projectId, id: params.id, }) }, async getOneOrThrow(params: GetOneParams): Promise<FlowRun> { const flowRun = await flowRunRepo().findOneBy({ projectId: params.projectId, id: params.id, }) if (isNil(flowRun)) { throw new ActivepiecesError({ code: ErrorCode.FLOW_RUN_NOT_FOUND, params: { id: params.id, }, }) } return flowRun }, async getOnePopulatedOrThrow(params: GetOneParams): Promise<FlowRun> { const flowRun = await this.getOneOrThrow(params) let steps = {} if (!isNil(flowRun.logsFileId)) { const file = await fileService(log).getDataOrThrow({ fileId: flowRun.logsFileId, projectId: flowRun.projectId, }) const serializedExecutionOutput = file.data.toString('utf-8') const executionOutput: ExecutioOutputFile = JSON.parse( serializedExecutionOutput, ) steps = executionOutput.executionState.steps } return { ...flowRun, steps, } }, async handleSyncResumeFlow({ runId, payload, requestId }: { runId: string, payload: unknown, requestId: string }) { const flowRun = await flowRunService(log).getOnePopulatedOrThrow({ id: runId, projectId: undefined, }) const synchronousHandlerId = engineResponseWatcher(log).getServerId() const matchRequestId = isNil(flowRun.pauseMetadata) || (flowRun.pauseMetadata.type === PauseType.WEBHOOK && requestId === flowRun.pauseMetadata.requestId) assertNotNullOrUndefined(synchronousHandlerId, 'synchronousHandlerId is required for sync resume request is required') if (!matchRequestId) { return { status: StatusCodes.NOT_FOUND, body: {}, headers: {}, } } if (flowRun.status !== FlowRunStatus.PAUSED) { return { status: StatusCodes.CONFLICT, body: { 'message': 'Flow run is not paused', 'flowRunStatus': flowRun.status }, headers: {}, } } await addToQueue({ payload, flowRun, synchronousHandlerId, httpRequestId: requestId, executeTrigger: false, progressUpdateType: ProgressUpdateType.TEST_FLOW, executionType: ExecutionType.RESUME, }, log) return engineResponseWatcher(log).oneTimeListener<EngineHttpResponse>(requestId, true, WEBHOOK_TIMEOUT_MS, { status: StatusCodes.NO_CONTENT, body: {}, headers: {}, }) }, }) async function filterFlowRunsAndApplyFilters( projectId: ProjectId, flowRunIds?: FlowRunId[], status?: FlowRunStatus[], flowId?: FlowId[], createdAfter?: string, createdBefore?: string, excludeFlowRunIds?: FlowRunId[], failedStepName?: string, ): Promise<FlowRunId[]> { let query = flowRunRepo().createQueryBuilder('flow_run').where({ projectId, environment: RunEnvironment.PRODUCTION, }) if (!isNil(flowRunIds) && flowRunIds.length > 0) { query = query.andWhere({ id: In(flowRunIds), }) } if (flowId && flowId.length > 0) { query = query.andWhere({ flowId: In(flowId), }) } if (status && status.length > 0) { query = query.andWhere({ status: In(status), }) } if (createdAfter) { query = query.andWhere('flow_run.created >= :createdAfter', { createdAfter, }) } if (createdBefore) { query = query.andWhere('flow_run.created <= :createdBefore', { createdBefore, }) } if (excludeFlowRunIds && excludeFlowRunIds.length > 0) { query = query.andWhere({ id: Not(In(excludeFlowRunIds)), }) } if (failedStepName) { query = query.andWhere('flow_run.failedStepName = :failedStepName', { failedStepName, }) } const flowRuns = await query.getMany() return flowRuns.map(flowRun => flowRun.id) } function returnHandlerId(pauseMetadata: PauseMetadata | undefined, requestId: string | undefined, log: FastifyBaseLogger): string { const handlerId = engineResponseWatcher(log).getServerId() if (isNil(pauseMetadata)) { return handlerId } if (pauseMetadata.type === PauseType.WEBHOOK && requestId === pauseMetadata.requestId && pauseMetadata.handlerId) { return pauseMetadata.handlerId } else { return handlerId } } const createLogsUploadUrl = async (params: CreateLogsUploadUrlParams, log: FastifyBaseLogger): Promise<{ uploadUrl: string, fileId: string }> => { const file = await getOrCreateLogsFile(params, log) assertNotNullOrUndefined(file.s3Key, 's3Key') const uploadUrl = await s3Helper(log).putS3SignedUrl(file.s3Key) return { uploadUrl, fileId: file.id } } async function getOrCreateLogsFile(params: GetOrCreateLogsFileParams, log: FastifyBaseLogger): Promise<File> { if (isNil(params.flowRun.logsFileId)) { return fileService(log).save({ projectId: params.projectId, data: null, size: 0, type: FileType.FLOW_RUN_LOG, compression: FileCompression.NONE, metadata: { flowRunId: params.flowRun.id, projectId: params.projectId, }, }) } return fileService(log).getFileOrThrow({ projectId: params.projectId, fileId: params.flowRun.logsFileId, type: FileType.FLOW_RUN_LOG, }) } type GetOrCreateLogsFileParams = { flowRun: FlowRun projectId: ProjectId } async function addToQueue(params: AddToQueueParams, log: FastifyBaseLogger): Promise<FlowRun> { let logsUploadUrl: string | undefined let logsFileId: string | undefined = params.flowRun.logsFileId ?? undefined if (USE_SIGNED_URL) { const logsUploadResult = await createLogsUploadUrl({ flowRun: params.flowRun, projectId: params.flowRun.projectId, }, log) logsUploadUrl = logsUploadResult.uploadUrl logsFileId = logsUploadResult.fileId } const platformId = await projectService.getPlatformId(params.flowRun.projectId) await jobQueue(log).add({ id: params.flowRun.id, type: JobType.ONE_TIME, priority: params.flowRun.environment === RunEnvironment.TESTING ? 'high' : isNil(params.synchronousHandlerId) ? 'low' : 'medium', data: { synchronousHandlerId: params.synchronousHandlerId ?? null, projectId: params.flowRun.projectId, platformId, environment: params.flowRun.environment, runId: params.flowRun.id, jobType: WorkerJobType.EXECUTE_FLOW, flowVersionId: params.flowRun.flowVersionId, payload: params.payload, executeTrigger: params.executeTrigger, httpRequestId: params.httpRequestId, executionType: params.executionType, progressUpdateType: params.progressUpdateType, stepNameToTest: params.flowRun.stepNameToTest, sampleData: params.sampleData, logsUploadUrl, logsFileId, }, }) return params.flowRun } async function create(params: CreateParams): Promise<FlowRun> { return flowRunRepo().save({ id: apId(), projectId: params.projectId, flowId: params.flowId, flowVersionId: params.flowVersionId, environment: params.environment, flowDisplayName: params.flowDisplayName, startTime: new Date().toISOString(), parentRunId: params.parentRunId, failParentOnFailure: params.failParentOnFailure ?? true, status: FlowRunStatus.QUEUED, stepNameToTest: params.stepNameToTest, }) } type CreateLogsUploadUrlParams = { flowRun: FlowRun projectId: string } type UpdateLogs = { flowRunId: FlowRunId logsFileId: string | undefined projectId: ProjectId executionStateString: string | undefined executionStateContentLength: number } type UpdateRunStatusParams = { flowRunId: FlowRunId status: FlowRunStatus } type FinishParams = { flowRunId: FlowRunId projectId: string status: FlowRunStatus tasks: number | undefined duration: number | undefined tags: string[] failedStepName?: string | undefined } type UpdateLogsSizeAndAttachLogsFileParams = { flowRunId: FlowRunId logsFileId: string executionStateContentLength: number } type CreateParams = { projectId: ProjectId flowVersionId: FlowVersionId parentRunId?: FlowRunId failParentOnFailure: boolean | undefined stepNameToTest?: string flowId: FlowId flowDisplayName: string environment: RunEnvironment } type ListParams = { projectId: ProjectId flowId: FlowId[] | undefined status: FlowRunStatus[] | undefined cursor: Cursor | null tags?: string[] limit: number createdAfter?: string createdBefore?: string failedStepName?: string } type GetOneParams = { id: FlowRunId projectId: ProjectId | undefined } type AddToQueueParams = { flowRun: FlowRun payload: unknown executeTrigger: boolean executionType: ExecutionType synchronousHandlerId: string | undefined httpRequestId: string | undefined progressUpdateType: ProgressUpdateType sampleData?: Record<string, unknown> } type StartParams = { payload: unknown environment: RunEnvironment flowVersionId: FlowVersionId projectId: ProjectId parentRunId?: FlowRunId failParentOnFailure: boolean | undefined stepNameToTest?: string executeTrigger: boolean executionType: ExecutionType synchronousHandlerId: string | undefined httpRequestId: string | undefined progressUpdateType: ProgressUpdateType sampleData?: Record<string, unknown> } type TestParams = { projectId: ProjectId flowVersionId: FlowVersionId parentRunId?: FlowRunId stepNameToTest?: string } type PauseParams = { flowRunId: FlowRunId pauseMetadata: PauseMetadata } type RetryParams = { flowRunId: FlowRunId strategy: FlowRetryStrategy projectId: ProjectId } type BulkRetryParams = { projectId: ProjectId flowRunIds?: FlowRunId[] strategy: FlowRetryStrategy status?: FlowRunStatus[] flowId?: FlowId[] createdAfter?: string createdBefore?: string excludeFlowRunIds?: FlowRunId[] failedStepName?: string } type ResumeWebhookParams = { flowRunId: FlowRunId requestId?: string progressUpdateType: ProgressUpdateType payload?: unknown executionType: ExecutionType checkRequestId: boolean }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server