Skip to main content
Glama

mcp-google-sheets

flow-execution-context.ts9.12 kB
import { assertEqual, FlowActionType, FlowError, FlowRunResponse, FlowRunStatus, GenericStepOutput, isNil, LoopStepOutput, LoopStepResult, PauseMetadata, RespondResponse, spreadIfDefined, StepOutput, StepOutputStatus } from '@activepieces/shared' import { nanoid } from 'nanoid' import { loggingUtils } from '../../helper/logging-utils' import { StepExecutionPath } from './step-execution-path' export enum ExecutionVerdict { RUNNING = 'RUNNING', PAUSED = 'PAUSED', SUCCEEDED = 'SUCCEEDED', FAILED = 'FAILED', } export type VerdictResponse = { reason: FlowRunStatus.PAUSED pauseMetadata: PauseMetadata } | { reason: FlowRunStatus.SUCCEEDED stopResponse: RespondResponse } | { reason: FlowRunStatus.INTERNAL_ERROR } export class FlowExecutorContext { tasks: number tags: readonly string[] steps: Readonly<Record<string, StepOutput>> pauseRequestId: string verdict: ExecutionVerdict verdictResponse: VerdictResponse | undefined currentPath: StepExecutionPath error?: FlowError stepNameToTest?: boolean /** * Execution time in milliseconds */ duration: number constructor(copyFrom?: FlowExecutorContext) { this.tasks = copyFrom?.tasks ?? 0 this.tags = copyFrom?.tags ?? [] this.steps = copyFrom?.steps ?? {} this.pauseRequestId = copyFrom?.pauseRequestId ?? nanoid() this.duration = copyFrom?.duration ?? -1 this.verdict = copyFrom?.verdict ?? ExecutionVerdict.RUNNING this.verdictResponse = copyFrom?.verdictResponse ?? undefined this.error = copyFrom?.error ?? undefined this.currentPath = copyFrom?.currentPath ?? StepExecutionPath.empty() this.stepNameToTest = copyFrom?.stepNameToTest ?? false } static empty(): FlowExecutorContext { return new FlowExecutorContext() } public setPauseRequestId(pauseRequestId: string): FlowExecutorContext { return new FlowExecutorContext({ ...this, pauseRequestId, }) } public getLoopStepOutput({ stepName }: { stepName: string }): LoopStepOutput | undefined { const stateAtPath = getStateAtPath({ currentPath: this.currentPath, steps: this.steps }) const stepOutput = stateAtPath[stepName] if (isNil(stepOutput)) { return undefined } assertEqual(stepOutput.type, FlowActionType.LOOP_ON_ITEMS, 'stepOutput.type', 'LOOP_ON_ITEMS') // The new LoopStepOutput is needed as casting directly to LoopClassOutput will just cast the data but the class methods will not be available return new LoopStepOutput(stepOutput as GenericStepOutput<FlowActionType.LOOP_ON_ITEMS, LoopStepResult>) } public isCompleted({ stepName }: { stepName: string }): boolean { const stateAtPath = getStateAtPath({ currentPath: this.currentPath, steps: this.steps }) const stepOutput = stateAtPath[stepName] if (isNil(stepOutput)) { return false } return stepOutput.status !== StepOutputStatus.PAUSED } public isPaused({ stepName }: { stepName: string }): boolean { const stateAtPath = getStateAtPath({ currentPath: this.currentPath, steps: this.steps }) const stepOutput = stateAtPath[stepName] if (isNil(stepOutput)) { return false } return stepOutput.status === StepOutputStatus.PAUSED } public setDuration(duration: number): FlowExecutorContext { return new FlowExecutorContext({ ...this, duration, }) } public addTags(tags: string[]): FlowExecutorContext { return new FlowExecutorContext({ ...this, tags: [...this.tags, ...tags].filter((value, index, self) => { return self.indexOf(value) === index }), }) } public increaseTask(tasks = 1): FlowExecutorContext { return new FlowExecutorContext({ ...this, tasks: this.tasks + tasks, }) } public upsertStep(stepName: string, stepOutput: StepOutput): FlowExecutorContext { const steps = { ...this.steps, } const targetMap = getStateAtPath({ currentPath: this.currentPath, steps }) targetMap[stepName] = stepOutput const error = stepOutput.status === StepOutputStatus.FAILED ? { stepName, message: stepOutput.errorMessage, } : this.error return new FlowExecutorContext({ ...this, tasks: this.tasks, ...spreadIfDefined('error', error), steps, }) } public getStepOutput(stepName: string): StepOutput | undefined { const stateAtPath = getStateAtPath({ currentPath: this.currentPath, steps: this.steps }) return stateAtPath[stepName] } public setCurrentPath(currentStatePath: StepExecutionPath): FlowExecutorContext { return new FlowExecutorContext({ ...this, currentPath: currentStatePath, }) } public setVerdict(verdict: ExecutionVerdict, response?: VerdictResponse): FlowExecutorContext { return new FlowExecutorContext({ ...this, verdict, verdictResponse: response, }) } public setRetryable(retryable: boolean): FlowExecutorContext { return new FlowExecutorContext({ ...this, retryable, }) } public async toResponse(): Promise<FlowRunResponse> { const baseExecutionOutput = { duration: this.duration, tasks: this.tasks, tags: [...this.tags], steps: await loggingUtils.trimExecution(this.steps), } switch (this.verdict) { case ExecutionVerdict.FAILED: { const verdictResponse = this.verdictResponse if (verdictResponse?.reason === FlowRunStatus.INTERNAL_ERROR) { return { ...baseExecutionOutput, error: this.error, status: FlowRunStatus.INTERNAL_ERROR, } } return { ...baseExecutionOutput, error: this.error, status: FlowRunStatus.FAILED, } } case ExecutionVerdict.PAUSED: { const verdictResponse = this.verdictResponse if (verdictResponse?.reason !== FlowRunStatus.PAUSED) { throw new Error('Verdict Response should have pause metadata response') } return { ...baseExecutionOutput, status: FlowRunStatus.PAUSED, pauseMetadata: verdictResponse.pauseMetadata, } } case ExecutionVerdict.RUNNING: { return { ...baseExecutionOutput, status: FlowRunStatus.RUNNING, } } case ExecutionVerdict.SUCCEEDED: { const verdictResponse = this.verdictResponse return { ...baseExecutionOutput, status: FlowRunStatus.SUCCEEDED, response: !isNil(verdictResponse) && 'stopResponse' in verdictResponse ? verdictResponse.stopResponse : undefined, } } } } public currentState(): Record<string, unknown> { let flattenedSteps: Record<string, unknown> = extractOutput(this.steps) let targetMap = this.steps this.currentPath.path.forEach(([stepName, iteration]) => { const stepOutput = targetMap[stepName] if (!stepOutput.output || stepOutput.type !== FlowActionType.LOOP_ON_ITEMS) { throw new Error('[ExecutionState#getTargetMap] Not instance of Loop On Items step output') } targetMap = stepOutput.output.iterations[iteration] flattenedSteps = { ...flattenedSteps, ...extractOutput(targetMap), } }) return flattenedSteps } } function extractOutput(steps: Record<string, StepOutput>): Record<string, unknown> { return Object.entries(steps).reduce((acc: Record<string, unknown>, [stepName, step]) => { acc[stepName] = step.output return acc }, {} as Record<string, unknown>) } function getStateAtPath({ currentPath, steps }: { currentPath: StepExecutionPath, steps: Record<string, StepOutput> }): Record<string, StepOutput> { let targetMap = steps currentPath.path.forEach(([stepName, iteration]) => { const stepOutput = targetMap[stepName] if (!stepOutput.output || stepOutput.type !== FlowActionType.LOOP_ON_ITEMS) { throw new Error('[ExecutionState#getTargetMap] Not instance of Loop On Items step output') } targetMap = stepOutput.output.iterations[iteration] }) return targetMap }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server