Skip to main content
Glama
flow-run-service.ts26.1 kB
import { AppSystemProp } from '@activepieces/server-shared' import { ActivepiecesError, apId, assertNotNullOrUndefined, Cursor, EngineHttpResponse, ErrorCode, ExecutionType, FlowId, FlowRetryStrategy, FlowRun, FlowRunId, FlowRunStatus, FlowVersionId, isNil, LATEST_JOB_DATA_SCHEMA_VERSION, PauseMetadata, PauseType, PlatformId, ProgressUpdateType, ProjectId, RunEnvironment, SampleDataFileType, SeekPage, UploadLogsBehavior, WorkerJobType, } from '@activepieces/shared' import { context, propagation, trace } from '@opentelemetry/api' import { FastifyBaseLogger } from 'fastify' import { StatusCodes } from 'http-status-codes' import pLimit from 'p-limit' import { In, IsNull, Not, Repository, SelectQueryBuilder } from 'typeorm' import { repoFactory } from '../../core/db/repo-factory' import { isPostgres } from '../../database/database-common' import { APArrayContains } from '../../database/database-connection' import { flowVersionService } from '../../flows/flow-version/flow-version.service' import { buildPaginator } from '../../helper/pagination/build-paginator' import { paginationHelper } from '../../helper/pagination/pagination-utils' import { Order } from '../../helper/pagination/paginator' import { system } from '../../helper/system/system' import { projectService } from '../../project/project-service' import { engineResponseWatcher } from '../../workers/engine-response-watcher' import { jobQueue } from '../../workers/queue/job-queue' import { JobType } from '../../workers/queue/queue-manager' import { sampleDataService } from '../step-run/sample-data.service' import { FlowRunEntity } from './flow-run-entity' import { flowRunSideEffects } from './flow-run-side-effects' import { runsMetadataQueue } from './flow-runs-queue' import { flowRunLogsService } from './logs/flow-run-logs-service' const CANCELLABLE_STATUSES: FlowRunStatus[] = [FlowRunStatus.PAUSED, FlowRunStatus.QUEUED] const tracer = trace.getTracer('flow-run-service') export const WEBHOOK_TIMEOUT_MS = system.getNumberOrThrow(AppSystemProp.WEBHOOK_TIMEOUT_SECONDS) * 1000 export const flowRunRepo = repoFactory<FlowRun>(FlowRunEntity) const USE_SIGNED_URL = system.getBoolean(AppSystemProp.S3_USE_SIGNED_URLS) ?? false export const flowRunService = (log: FastifyBaseLogger) => ({ async list(params: ListParams): Promise<SeekPage<FlowRun>> { const decodedCursor = paginationHelper.decodeCursor(params.cursor) const paginator = buildPaginator<FlowRun>({ entity: FlowRunEntity, query: { limit: params.limit, order: Order.DESC, orderBy: 'created', afterCursor: decodedCursor.nextCursor, beforeCursor: decodedCursor.previousCursor, }, }) let query = queryBuilderForFlowRun(flowRunRepo()).where({ projectId: params.projectId, environment: RunEnvironment.PRODUCTION, }) if (!isNil(params.archived)) { query = query.andWhere({ archivedAt: params.archived ? Not(IsNull()) : IsNull(), }) } if (params.flowId) { query = query.andWhere({ flowId: In(params.flowId), }) } if (params.status) { query = query.andWhere({ status: In(params.status), }) } if (params.createdAfter) { query = query.andWhere('flow_run.created >= :createdAfter', { createdAfter: params.createdAfter, }) } if (params.createdBefore) { query = query.andWhere('flow_run.created <= :createdBefore', { createdBefore: params.createdBefore, }) } if (params.tags) { query = query.andWhere(APArrayContains('tags', params.tags)) } if (!isNil(params.failedStepName)) { query = query.andWhere('flow_run."failedStep"->>\'name\' = :failedStepName', { failedStepName: params.failedStepName, }) } if (params.flowRunIds) { query = query.andWhere({ id: In(params.flowRunIds), }) } const { data, cursor: newCursor } = await paginator.paginate(query) return paginationHelper.createPage<FlowRun>(data, newCursor) }, async retry({ flowRunId, strategy, projectId }: RetryParams): Promise<FlowRun | null> { const oldFlowRun = await flowRunService(log).getOnePopulatedOrThrow({ id: flowRunId, projectId, }) switch (strategy) { case FlowRetryStrategy.FROM_FAILED_STEP: await flowRunRepo().update({ id: oldFlowRun.id, projectId: oldFlowRun.projectId, }, { status: FlowRunStatus.QUEUED, }) return flowRunService(log).resume({ flowRunId: oldFlowRun.id, executionType: ExecutionType.RESUME, progressUpdateType: ProgressUpdateType.NONE, checkRequestId: false, }) case FlowRetryStrategy.ON_LATEST_VERSION: { const latestFlowVersion = await flowVersionService(log).getLatestLockedVersionOrThrow( oldFlowRun.flowId, ) const payload = oldFlowRun.steps ? oldFlowRun.steps[latestFlowVersion.trigger.name]?.output : undefined return this.start({ flowId: oldFlowRun.flowId, payload, platformId: await projectService.getPlatformId(oldFlowRun.projectId), executionType: ExecutionType.BEGIN, progressUpdateType: ProgressUpdateType.NONE, synchronousHandlerId: undefined, httpRequestId: undefined, executeTrigger: false, environment: oldFlowRun.environment, flowVersionId: latestFlowVersion.id, projectId: oldFlowRun.projectId, failParentOnFailure: oldFlowRun.failParentOnFailure, parentRunId: oldFlowRun.parentRunId, }) } } }, async cancel({ projectId, platformId, flowRunIds, excludeFlowRunIds, status, flowId, createdAfter, createdBefore }: CancelParams): Promise<void> { const filteredStatus = status ?? CANCELLABLE_STATUSES const flowRuns = await filterFlowRunsAndApplyFilters({ projectId, flowRunIds, status: filteredStatus, flowId, createdAfter, createdBefore, excludeFlowRunIds, }) const cancelParentFlowRuns = await Promise.allSettled(flowRuns.map(flowRun => cancelSingleRun(log, flowRun, platformId))) const childFlows = await getAllChildRuns(flowRuns.map(flowRun => flowRun.id)) log.info({ flowRunsCount: flowRuns.length, childFlowCount: childFlows.length, }, '[cancelChildFlows] Found cancellable descendant flows (paused/queued)') const canceChildlPromises = await Promise.allSettled(childFlows.map(flowRun => cancelSingleRun(log, flowRun, platformId))) if (cancelParentFlowRuns.some(r => r.status === 'rejected')) { throw cancelParentFlowRuns.find(r => r.status === 'rejected')!.reason } if (canceChildlPromises.some(r => r.status === 'rejected')) { throw canceChildlPromises.find(r => r.status === 'rejected')!.reason } }, async existsBy(runId: FlowRunId): Promise<boolean> { return flowRunRepo().existsBy({ id: runId }) }, async bulkArchive(params: BulkArchiveActionParams): Promise<void> { const filteredFlowRuns = await filterFlowRunsAndApplyFilters(params) await flowRunRepo().update({ id: In(filteredFlowRuns.map(flowRun => flowRun.id)), projectId: params.projectId, }, { archivedAt: new Date().toISOString(), }) }, async bulkRetry(params: BulkRetryParams): Promise<(FlowRun | null)[]> { const filteredFlowRuns = await filterFlowRunsAndApplyFilters(params) const limit = pLimit(10) return Promise.all( filteredFlowRuns.map(flowRun => limit(() => this.retry({ flowRunId: flowRun.id, strategy: params.strategy, projectId: params.projectId })), ), ) }, async resume({ flowRunId, payload, requestId, progressUpdateType, executionType, checkRequestId, }: ResumeWebhookParams): Promise<FlowRun | null> { log.info({ runId: flowRunId, }, '[FlowRunService#resume] adding flow run to queue') const flowRun = await queryBuilderForFlowRun(flowRunRepo()).where({ id: flowRunId }).getOne() if (isNil(flowRun)) { throw new ActivepiecesError({ code: ErrorCode.FLOW_RUN_NOT_FOUND, params: { id: flowRunId, }, }) } const pauseMetadata = flowRun.pauseMetadata const matchRequestId = isNil(pauseMetadata) || (pauseMetadata.type === PauseType.WEBHOOK && requestId === pauseMetadata.requestId) const platformId = await projectService.getPlatformId(flowRun.projectId) if (matchRequestId || !checkRequestId) { return addToQueue({ payload, flowRun, platformId, synchronousHandlerId: returnHandlerId(pauseMetadata, requestId, log), httpRequestId: flowRun.pauseMetadata?.requestIdToReply ?? undefined, progressUpdateType, executeTrigger: false, executionType, }, log) } await flowRunSideEffects(log).onResume(flowRun) return flowRun }, async start({ flowId, payload, executeTrigger, executionType, synchronousHandlerId, progressUpdateType, httpRequestId, projectId, flowVersionId, parentRunId, failParentOnFailure, platformId, stepNameToTest, environment, }: StartParams): Promise<FlowRun> { return tracer.startActiveSpan('flowRun.start', { attributes: { 'flowRun.flowVersionId': flowVersionId, 'flowRun.projectId': projectId, 'flowRun.environment': environment, 'flowRun.executionType': executionType, 'flowRun.progressUpdateType': progressUpdateType, 'flowRun.executeTrigger': executeTrigger, 'flowRun.httpRequestId': httpRequestId ?? 'none', }, }, async (span) => { try { span.setAttribute('flowRun.flowId', flowId) const newFlowRun = await queueOrCreateInstantly({ projectId, flowVersionId, parentRunId, flowId, failParentOnFailure, stepNameToTest, environment, }, log) span.setAttribute('flowRun.id', newFlowRun.id) await addToQueue({ flowRun: newFlowRun, platformId, payload, executeTrigger, executionType, synchronousHandlerId, httpRequestId, progressUpdateType, }, log) span.setAttribute('flowRun.queued', true) await flowRunSideEffects(log).onStart(newFlowRun) return newFlowRun } finally { span.end() } }) }, async test({ projectId, flowVersionId, parentRunId, stepNameToTest }: TestParams): Promise<FlowRun> { const flowVersion = await flowVersionService(log).getOneOrThrow(flowVersionId) const triggerPayload = await sampleDataService(log).getOrReturnEmpty({ projectId, flowVersion, stepName: flowVersion.trigger.name, type: SampleDataFileType.OUTPUT, }) const flowRun = await queueOrCreateInstantly({ projectId, flowId: flowVersion.flowId, flowVersionId: flowVersion.id, environment: RunEnvironment.TESTING, parentRunId, failParentOnFailure: undefined, stepNameToTest, }, log) return addToQueue({ flowRun, payload: triggerPayload, executionType: ExecutionType.BEGIN, synchronousHandlerId: undefined, httpRequestId: undefined, platformId: await projectService.getPlatformId(projectId), executeTrigger: false, progressUpdateType: ProgressUpdateType.TEST_FLOW, sampleData: !isNil(stepNameToTest) ? await sampleDataService(log).getSampleDataForFlow(projectId, flowVersion, SampleDataFileType.OUTPUT) : undefined, }, log) }, async getOne(params: GetOneParams): Promise<FlowRun | null> { const flowRun = await queryBuilderForFlowRun(flowRunRepo()).where({ id: params.id, ...(params.projectId ? { projectId: params.projectId } : {}), }).getOne() return flowRun }, async getOneOrThrow(params: GetOneParams): Promise<FlowRun> { const flowRun = await this.getOne(params) if (isNil(flowRun)) { throw new ActivepiecesError({ code: ErrorCode.FLOW_RUN_NOT_FOUND, params: { id: params.id, }, }) } return flowRun }, async getOnePopulatedOrThrow(params: GetOneParams): Promise<FlowRun> { const flowRun = await this.getOneOrThrow(params) let steps = {} if (!isNil(flowRun.logsFileId)) { const stateFile = await flowRunLogsService(log).getLogs({ logsFileId: flowRun.logsFileId, projectId: flowRun.projectId, }) if (!isNil(stateFile)) { steps = stateFile.executionState.steps } } return { ...flowRun, steps, } }, async handleSyncResumeFlow({ runId, payload, requestId }: { runId: string, payload: unknown, requestId: string }): Promise<EngineHttpResponse> { const flowRun = await flowRunService(log).getOnePopulatedOrThrow({ id: runId, projectId: undefined, }) const synchronousHandlerId = engineResponseWatcher(log).getServerId() const matchRequestId = isNil(flowRun.pauseMetadata) || (flowRun.pauseMetadata.type === PauseType.WEBHOOK && requestId === flowRun.pauseMetadata.requestId) assertNotNullOrUndefined(synchronousHandlerId, 'synchronousHandlerId is required for sync resume request is required') if (!matchRequestId) { return { status: StatusCodes.NOT_FOUND, body: {}, headers: {}, } } if (flowRun.status !== FlowRunStatus.PAUSED) { return { status: StatusCodes.CONFLICT, body: { 'message': 'Flow run is not paused', 'flowRunStatus': flowRun.status }, headers: {}, } } await addToQueue({ payload, flowRun, platformId: await projectService.getPlatformId(flowRun.projectId), synchronousHandlerId, httpRequestId: requestId, executeTrigger: false, progressUpdateType: ProgressUpdateType.TEST_FLOW, executionType: ExecutionType.RESUME, }, log) return engineResponseWatcher(log).oneTimeListener<EngineHttpResponse>(requestId, true, WEBHOOK_TIMEOUT_MS, { status: StatusCodes.NO_CONTENT, body: {}, headers: {}, }) }, }) async function cancelSingleRun(log: FastifyBaseLogger, flowRun: FlowRun, platformId: string): Promise<void> { await jobQueue(log).removeOneTimeJob({ jobId: flowRun.id, platformId, }) await runsMetadataQueue(log).add({ id: flowRun.id, projectId: flowRun.projectId, status: FlowRunStatus.CANCELED, }) log.info({ flowRunId: flowRun.id, }, '[cancelFlowRuns] Canceled flow run') } async function getAllChildRuns(parentRunIds: string[]): Promise<FlowRun[]> { if (parentRunIds.length === 0) { return [] } const parentRunIdPlaceholders = createSqlPlaceholders(parentRunIds.length, 0) const statusStartIndex = parentRunIds.length const statusPlaceholders = createSqlPlaceholders(CANCELLABLE_STATUSES.length, statusStartIndex) const query = ` WITH RECURSIVE descendants AS ( SELECT * FROM flow_run WHERE "parentRunId" IN (${parentRunIdPlaceholders}) AND status IN (${statusPlaceholders}) UNION ALL SELECT f.* FROM flow_run f INNER JOIN descendants d ON f."parentRunId" = d.id WHERE f.status IN (${statusPlaceholders}) ) SELECT * FROM descendants ` const queryParams = [ ...parentRunIds, ...CANCELLABLE_STATUSES, ...(!isPostgres() ? CANCELLABLE_STATUSES : []), ] const results = await flowRunRepo().query(query, queryParams) return results as FlowRun[] } async function filterFlowRunsAndApplyFilters( params: FilterFlowRunsAndApplyFiltersParams, ): Promise<FlowRun[]> { let query = flowRunRepo().createQueryBuilder('flow_run').where({ projectId: params.projectId, environment: RunEnvironment.PRODUCTION, }) if (!isNil(params.flowRunIds) && params.flowRunIds.length > 0) { query = query.andWhere({ id: In(params.flowRunIds), }) } if (!isNil(params.archived)) { query = query.andWhere({ archivedAt: params.archived ? Not(IsNull()) : IsNull(), }) } if (params.flowId && params.flowId.length > 0) { query = query.andWhere({ flowId: In(params.flowId), }) } if (params.status && params.status.length > 0) { query = query.andWhere({ status: In(params.status), }) } if (params.createdAfter) { query = query.andWhere('flow_run.created >= :createdAfter', { createdAfter: params.createdAfter, }) } if (params.createdBefore) { query = query.andWhere('flow_run.created <= :createdBefore', { createdBefore: params.createdBefore, }) } if (params.excludeFlowRunIds && params.excludeFlowRunIds.length > 0) { query = query.andWhere({ id: Not(In(params.excludeFlowRunIds)), }) } if (params.failedStepName) { query = query.andWhere('flow_run.failedStepName = :failedStepName', { failedStepName: params.failedStepName, }) } const flowRuns = await query.getMany() return flowRuns } function returnHandlerId(pauseMetadata: PauseMetadata | undefined, requestId: string | undefined, log: FastifyBaseLogger): string { const handlerId = engineResponseWatcher(log).getServerId() if (isNil(pauseMetadata)) { return handlerId } if (pauseMetadata.type === PauseType.WEBHOOK && requestId === pauseMetadata.requestId && pauseMetadata.handlerId) { return pauseMetadata.handlerId } else { return handlerId } } async function addToQueue(params: AddToQueueParams, log: FastifyBaseLogger): Promise<FlowRun> { const logsFileId = params.flowRun.logsFileId ?? apId() const logsUploadUrl = await flowRunLogsService(log).constructUploadUrl({ logsFileId, projectId: params.flowRun.projectId, flowRunId: params.flowRun.id, behavior: USE_SIGNED_URL ? UploadLogsBehavior.REDIRECT_TO_S3 : UploadLogsBehavior.UPLOAD_DIRECTLY, }) const traceContext: Record<string, string> = {} propagation.inject(context.active(), traceContext) await jobQueue(log).add({ id: params.flowRun.id, type: JobType.ONE_TIME, data: { schemaVersion: LATEST_JOB_DATA_SCHEMA_VERSION, synchronousHandlerId: params.synchronousHandlerId ?? null, projectId: params.flowRun.projectId, platformId: params.platformId, environment: params.flowRun.environment, flowId: params.flowRun.flowId, runId: params.flowRun.id, jobType: WorkerJobType.EXECUTE_FLOW, flowVersionId: params.flowRun.flowVersionId, payload: params.payload, executeTrigger: params.executeTrigger, httpRequestId: params.httpRequestId, executionType: params.executionType, progressUpdateType: params.progressUpdateType, stepNameToTest: params.flowRun.stepNameToTest, sampleData: params.sampleData, logsUploadUrl, logsFileId, traceContext, }, }) return params.flowRun } function queryBuilderForFlowRun(repo: Repository<FlowRun>): SelectQueryBuilder<FlowRun> { return repo.createQueryBuilder('flow_run') .leftJoinAndSelect('flow_run.flowVersion', 'flowVersion') .addSelect(['"flowVersion"."displayName"']) } async function queueOrCreateInstantly(params: CreateParams, log: FastifyBaseLogger): Promise<FlowRun> { const now = new Date().toISOString() const flowRun: FlowRun = { id: apId(), projectId: params.projectId, flowId: params.flowId, flowVersionId: params.flowVersionId, environment: params.environment, parentRunId: params.parentRunId, failParentOnFailure: params.failParentOnFailure ?? true, status: FlowRunStatus.QUEUED, stepNameToTest: params.stepNameToTest, created: now, updated: now, steps: {}, } switch (params.environment) { case RunEnvironment.TESTING: return flowRunRepo().save(flowRun) case RunEnvironment.PRODUCTION: await runsMetadataQueue(log).add(flowRun) return flowRun } } function createSqlPlaceholders(count: number, startIndex = 0): string { return Array.from({ length: count }, (_, index) => { return isPostgres() ? `$${startIndex + index + 1}` : '?' }).join(',') } type CreateParams = { projectId: ProjectId flowVersionId: FlowVersionId parentRunId?: FlowRunId failParentOnFailure: boolean | undefined stepNameToTest?: string flowId: FlowId environment: RunEnvironment } type ListParams = { projectId: ProjectId flowId: FlowId[] | undefined status: FlowRunStatus[] | undefined cursor: Cursor | null tags?: string[] limit: number createdAfter?: string createdBefore?: string failedStepName?: string flowRunIds?: FlowRunId[] archived?: boolean } type GetOneParams = { id: FlowRunId projectId: ProjectId | undefined } type AddToQueueParams = { flowRun: FlowRun platformId: PlatformId payload: unknown executeTrigger: boolean executionType: ExecutionType synchronousHandlerId: string | undefined httpRequestId: string | undefined progressUpdateType: ProgressUpdateType sampleData?: Record<string, unknown> } type StartParams = { flowId: FlowId payload: unknown platformId: PlatformId environment: RunEnvironment flowVersionId: FlowVersionId projectId: ProjectId parentRunId?: FlowRunId failParentOnFailure: boolean | undefined stepNameToTest?: string executeTrigger: boolean executionType: ExecutionType synchronousHandlerId: string | undefined httpRequestId: string | undefined progressUpdateType: ProgressUpdateType sampleData?: Record<string, unknown> } type TestParams = { projectId: ProjectId flowVersionId: FlowVersionId parentRunId?: FlowRunId stepNameToTest?: string } type RetryParams = { flowRunId: FlowRunId strategy: FlowRetryStrategy projectId: ProjectId } type CancelParams = { projectId: ProjectId platformId: PlatformId flowRunIds?: FlowRunId[] excludeFlowRunIds?: FlowRunId[] status?: FlowRunStatus[] flowId?: FlowId[] createdAfter?: string createdBefore?: string } type BulkRetryParams = { projectId: ProjectId flowRunIds?: FlowRunId[] strategy: FlowRetryStrategy status?: FlowRunStatus[] flowId?: FlowId[] createdAfter?: string archived?: boolean createdBefore?: string excludeFlowRunIds?: FlowRunId[] failedStepName?: string } type BulkArchiveActionParams = { projectId: ProjectId flowRunIds?: FlowRunId[] status?: FlowRunStatus[] flowId?: FlowId[] createdAfter?: string archived?: boolean createdBefore?: string excludeFlowRunIds?: FlowRunId[] failedStepName?: string } type ResumeWebhookParams = { flowRunId: FlowRunId requestId?: string progressUpdateType: ProgressUpdateType payload?: unknown executionType: ExecutionType checkRequestId: boolean } type FilterFlowRunsAndApplyFiltersParams = { projectId: ProjectId flowRunIds?: FlowRunId[] status?: FlowRunStatus[] archived?: boolean flowId?: FlowId[] createdAfter?: string createdBefore?: string excludeFlowRunIds?: FlowRunId[] failedStepName?: string }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server