Skip to main content
Glama

mcp-google-sheets

piece-executor.ts12.2 kB
import { URL } from 'url' import { ActionContext, PauseHook, PauseHookParams, PiecePropertyMap, RespondHook, RespondHookParams, StaticPropsValue, StopHook, StopHookParams, TagsManager } from '@activepieces/pieces-framework' import { assertNotNullOrUndefined, AUTHENTICATION_PROPERTY_NAME, ExecutionType, FlowActionType, FlowRunStatus, GenericStepOutput, isNil, PauseType, PieceAction, RespondResponse, StepOutputStatus } from '@activepieces/shared' import dayjs from 'dayjs' import { continueIfFailureHandler, handleExecutionError, runWithExponentialBackoff } from '../helper/error-handling' import { PausedFlowTimeoutError } from '../helper/execution-errors' import { pieceLoader } from '../helper/piece-loader' import { createFlowsContext } from '../services/flows.service' import { progressService } from '../services/progress.service' import { createFilesService } from '../services/step-files.service' import { createContextStore } from '../services/storage.service' import { HookResponse, utils } from '../utils' import { propsProcessor } from '../variables/props-processor' import { ActionHandler, BaseExecutor } from './base-executor' import { ExecutionVerdict } from './context/flow-execution-context' const AP_PAUSED_FLOW_TIMEOUT_DAYS = Number(process.env.AP_PAUSED_FLOW_TIMEOUT_DAYS) export const pieceExecutor: BaseExecutor<PieceAction> = { async handle({ action, executionState, constants, }) { if (executionState.isCompleted({ stepName: action.name })) { return executionState } const resultExecution = await runWithExponentialBackoff(executionState, action, constants, executeAction) return continueIfFailureHandler(resultExecution, action, constants) }, } const executeAction: ActionHandler<PieceAction> = async ({ action, executionState, constants }) => { const stepStartTime = performance.now() const stepOutput = GenericStepOutput.create({ input: {}, type: FlowActionType.PIECE, status: StepOutputStatus.RUNNING, }) try { assertNotNullOrUndefined(action.settings.actionName, 'actionName') const { pieceAction, piece } = await pieceLoader.getPieceAndActionOrThrow({ pieceName: action.settings.pieceName, pieceVersion: action.settings.pieceVersion, actionName: action.settings.actionName, pieceSource: constants.piecesSource, }) const { resolvedInput, censoredInput } = await constants.propsResolver.resolve<StaticPropsValue<PiecePropertyMap>>({ unresolvedInput: action.settings.input, executionState, }) stepOutput.input = censoredInput const { processedInput, errors } = await propsProcessor.applyProcessorsAndValidators(resolvedInput, pieceAction.props, piece.auth, pieceAction.requireAuth, action.settings.propertySettings) if (Object.keys(errors).length > 0) { throw new Error(JSON.stringify(errors, null, 2)) } const params: { hookResponse: HookResponse } = { hookResponse: { type: 'none', tags: [], }, } const outputContext = progressService.createOutputContext({ engineConstants: constants, flowExecutorContext: executionState, stepName: action.name, stepOutput, }) const isPaused = executionState.isPaused({ stepName: action.name }) if (!isPaused) { await progressService.sendUpdate({ engineConstants: constants, flowExecutorContext: executionState.upsertStep(action.name, stepOutput), }) } const context: ActionContext = { executionType: isPaused ? ExecutionType.RESUME : ExecutionType.BEGIN, resumePayload: constants.resumePayload!, store: createContextStore({ apiUrl: constants.internalApiUrl, prefix: '', flowId: constants.flowId, engineToken: constants.engineToken, }), output: outputContext, flows: createFlowsContext({ engineToken: constants.engineToken, internalApiUrl: constants.internalApiUrl, flowId: constants.flowId, flowVersionId: constants.flowVersionId, }), auth: processedInput[AUTHENTICATION_PROPERTY_NAME], files: createFilesService({ apiUrl: constants.internalApiUrl, engineToken: constants.engineToken, stepName: action.name, flowId: constants.flowId, }), server: { token: constants.engineToken, apiUrl: constants.internalApiUrl, publicUrl: constants.publicApiUrl, }, propsValue: processedInput, tags: createTagsManager(params), connections: utils.createConnectionManager({ apiUrl: constants.internalApiUrl, projectId: constants.projectId, engineToken: constants.engineToken, target: 'actions', hookResponse: params.hookResponse, }), /* @deprecated Use server.publicApiUrl instead. */ serverUrl: constants.publicApiUrl, run: { id: constants.flowRunId, stop: createStopHook(params), pause: createPauseHook(params, executionState.pauseRequestId, constants.httpRequestId), respond: createRespondHook(params), }, project: { id: constants.projectId, externalId: constants.externalProjectId, }, generateResumeUrl: (params) => { const url = new URL(`${constants.publicApiUrl}v1/flow-runs/${constants.flowRunId}/requests/${executionState.pauseRequestId}${params.sync ? '/sync' : ''}`) url.search = new URLSearchParams(params.queryParams).toString() return url.toString() }, } const testSingleStepMode = !isNil(constants.stepNameToTest) const runMethodToExecute = (testSingleStepMode && !isNil(pieceAction.test)) ? pieceAction.test : pieceAction.run const output = await runMethodToExecute(context) const newExecutionContext = executionState.addTags(params.hookResponse.tags) const webhookResponse = getResponse(params.hookResponse) const isSamePiece = constants.triggerPieceName === action.settings.pieceName if (!isNil(webhookResponse) && !isNil(constants.serverHandlerId) && !isNil(constants.httpRequestId) && isSamePiece) { await progressService.sendFlowResponse(constants, { workerHandlerId: constants.serverHandlerId, httpRequestId: constants.httpRequestId, runResponse: { status: webhookResponse.status ?? 200, body: webhookResponse.body ?? {}, headers: webhookResponse.headers ?? {}, }, }) } const stepEndTime = performance.now() if (params.hookResponse.type === 'stopped') { assertNotNullOrUndefined(params.hookResponse.response, 'stopResponse') return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(output).setStatus(StepOutputStatus.SUCCEEDED).setDuration(stepEndTime - stepStartTime)).setVerdict(ExecutionVerdict.SUCCEEDED, { reason: FlowRunStatus.SUCCEEDED, stopResponse: (params.hookResponse.response as StopHookParams).response, }).increaseTask() } if (params.hookResponse.type === 'paused') { assertNotNullOrUndefined(params.hookResponse.response, 'pauseResponse') return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(output).setStatus(StepOutputStatus.PAUSED).setDuration(stepEndTime - stepStartTime)) .setVerdict(ExecutionVerdict.PAUSED, { reason: FlowRunStatus.PAUSED, pauseMetadata: (params.hookResponse.response as PauseHookParams).pauseMetadata, }) } return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(output).setStatus(StepOutputStatus.SUCCEEDED).setDuration(stepEndTime - stepStartTime)).increaseTask().setVerdict(ExecutionVerdict.RUNNING, undefined) } catch (e) { const handledError = handleExecutionError(e) const failedStepOutput = stepOutput .setStatus(StepOutputStatus.FAILED) .setErrorMessage(handledError.message) .setDuration(performance.now() - stepStartTime) return executionState .upsertStep(action.name, failedStepOutput) .setVerdict(ExecutionVerdict.FAILED, handledError.verdictResponse) .increaseTask() } } function getResponse(hookResponse: HookResponse): RespondResponse | undefined { switch (hookResponse.type) { case 'stopped': case 'respond': return hookResponse.response.response case 'paused': if (hookResponse.response.pauseMetadata.type === PauseType.WEBHOOK) { return hookResponse.response.pauseMetadata.response } else { return undefined } case 'none': return undefined } } const createTagsManager = (hkParams: createTagsManagerParams): TagsManager => { return { add: async (params: addTagsParams): Promise<void> => { hkParams.hookResponse.tags.push(params.name) }, } } type addTagsParams = { name: string } type createTagsManagerParams = { hookResponse: HookResponse } function createStopHook(params: CreateStopHookParams): StopHook { return (req?: StopHookParams) => { params.hookResponse = { ...params.hookResponse, type: 'stopped', response: req ?? { response: {} }, } } } type CreateStopHookParams = { hookResponse: HookResponse } function createRespondHook(params: CreateRespondHookParams): RespondHook { return (req?: RespondHookParams) => { params.hookResponse = { ...params.hookResponse, type: 'respond', response: req ?? { response: {} }, } } } type CreateRespondHookParams = { hookResponse: HookResponse } function createPauseHook(params: CreatePauseHookParams, pauseId: string, requestIdToReply: string | null): PauseHook { return (req) => { switch (req.pauseMetadata.type) { case PauseType.DELAY: { const diffInDays = dayjs(req.pauseMetadata.resumeDateTime).diff(dayjs(), 'days') if (diffInDays > AP_PAUSED_FLOW_TIMEOUT_DAYS) { throw new PausedFlowTimeoutError(undefined, AP_PAUSED_FLOW_TIMEOUT_DAYS) } params.hookResponse = { ...params.hookResponse, type: 'paused', response: { pauseMetadata: { ...req.pauseMetadata, requestIdToReply: requestIdToReply ?? undefined, }, }, } break } case PauseType.WEBHOOK: params.hookResponse = { ...params.hookResponse, type: 'paused', response: { pauseMetadata: { ...req.pauseMetadata, requestId: pauseId, requestIdToReply: requestIdToReply ?? undefined, response: req.pauseMetadata.response ?? {}, }, }, } break } } } type CreatePauseHookParams = { hookResponse: HookResponse }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server