Skip to main content
Glama

mcp-google-sheets

engine-controller.ts13.1 kB
import { apAxios, GetRunForWorkerRequest } from '@activepieces/server-shared' import { assertNotNullOrUndefined, CreateTriggerRunRequestBody, EngineHttpResponse, FileType, FlowRunResponse, FlowRunStatus, GetFlowVersionForWorkerRequest, isNil, ListFlowsRequest, PauseType, PopulatedFlow, PrincipalType, ProgressUpdateType, SendFlowResponseRequest, UpdateLogsBehavior, UpdateRunProgressRequest, WebsocketClientEvent } from '@activepieces/shared' import { FastifyPluginAsyncTypebox, Type } from '@fastify/type-provider-typebox' import { FastifyBaseLogger } from 'fastify' import { StatusCodes } from 'http-status-codes' import { entitiesMustBeOwnedByCurrentProject } from '../authentication/authorization' import { domainHelper } from '../ee/custom-domains/domain-helper' import { fileService } from '../file/file.service' import { flowService } from '../flows/flow/flow.service' import { flowRunService } from '../flows/flow-run/flow-run-service' import { stepRunProgressHandler } from '../flows/flow-run/step-run-progress.handler' import { flowVersionService } from '../flows/flow-version/flow-version.service' import { triggerRunService } from '../trigger/trigger-run/trigger-run.service' import { triggerSourceService } from '../trigger/trigger-source/trigger-source-service' import { engineResponseWatcher } from './engine-response-watcher' export const flowEngineWorker: FastifyPluginAsyncTypebox = async (app) => { app.addHook('preSerialization', entitiesMustBeOwnedByCurrentProject) app.get('/runs/:runId', { config: { allowedPrincipals: [PrincipalType.ENGINE], }, schema: { params: GetRunForWorkerRequest, }, }, async (request) => { const { runId } = request.params return flowRunService(request.log).getOnePopulatedOrThrow({ id: runId, projectId: request.principal.projectId, }) }) app.get('/populated-flows', GetAllFlowsByProjectParams, async (request) => { return flowService(request.log).list({ projectId: request.principal.projectId, limit: request.query.limit ?? 1000000, cursorRequest: request.query.cursor ?? null, folderId: request.query.folderId, status: request.query.status, name: request.query.name, versionState: request.query.versionState, connectionExternalIds: request.query.connectionExternalIds, agentExternalIds: request.query.agentExternalIds, externalIds: request.query.externalIds, }) }) app.post('/update-run', UpdateRunProgress, async (request, reply) => { const { runId, workerHandlerId, runDetails, httpRequestId, failedStepName: failedStepName, testSingleStepMode } = request.body const progressUpdateType = request.body.progressUpdateType ?? ProgressUpdateType.NONE const nonSupportedStatuses = [FlowRunStatus.RUNNING, FlowRunStatus.SUCCEEDED, FlowRunStatus.PAUSED] if (!nonSupportedStatuses.includes(runDetails.status) && !isNil(workerHandlerId) && !isNil(httpRequestId)) { await engineResponseWatcher(request.log).publish( httpRequestId, workerHandlerId, await getFlowResponse(runDetails), ) } const runWithoutSteps = await flowRunService(request.log).updateRun({ flowRunId: runId, status: runDetails.status, tasks: runDetails.tasks, duration: runDetails.duration, projectId: request.principal.projectId, tags: runDetails.tags ?? [], failedStepName, }) await handleUpdateLogsBehavior({ log: request.log, updateLogsBehavior: request.body.updateLogsBehavior, executionStateContentLength: request.body.executionStateContentLength, logsFileId: request.body.logsFileId, executionStateBuffer: request.body.executionStateBuffer, projectId: request.principal.projectId, runId, }) if (runDetails.status === FlowRunStatus.PAUSED) { await flowRunService(request.log).pause({ flowRunId: runId, pauseMetadata: { progressUpdateType, handlerId: workerHandlerId ?? undefined, ...(runDetails.pauseMetadata!), }, }) } const shouldMarkParentAsFailed = runWithoutSteps.failParentOnFailure && !isNil(runWithoutSteps.parentRunId) && ![FlowRunStatus.SUCCEEDED, FlowRunStatus.RUNNING, FlowRunStatus.PAUSED, FlowRunStatus.QUEUED].includes(runWithoutSteps.status) if (shouldMarkParentAsFailed) { await markParentRunAsFailed({ parentRunId: runWithoutSteps.parentRunId!, childRunId: runWithoutSteps.id, projectId: request.principal.projectId, platformId: request.principal.platform.id, log: request.log, }) } app.io.to(request.principal.projectId).emit(WebsocketClientEvent.FLOW_RUN_PROGRESS, { runId, }) if (testSingleStepMode) { const response = await stepRunProgressHandler(request.log).extractStepResponse({ runId, }) if (!isNil(response)) { app.io.to(request.principal.projectId).emit(WebsocketClientEvent.TEST_STEP_FINISHED, response) } } return reply.status(StatusCodes.NO_CONTENT).send() }) app.post('/update-flow-response', UpdateFlowResponseParams, async (request) => { const { workerHandlerId, httpRequestId, runResponse } = request.body await engineResponseWatcher(request.log).publish( httpRequestId, workerHandlerId, runResponse, ) return {} }) app.post('/create-trigger-run', CreateTriggerRunParams, async (request) => { const { status, payload, flowId, simulate, jobId } = request.body const { projectId } = request.principal const trigger = await triggerSourceService(request.log).getByFlowId({ flowId, projectId, simulate, }) if (!isNil(trigger)) { await triggerRunService(request.log).create({ status, payload, triggerSourceId: trigger.id, projectId, pieceName: trigger.pieceName, pieceVersion: trigger.pieceVersion, jobId, }) } }) app.get('/flows', GetLockedVersionRequest, async (request) => { const populatedFlow = await getFlow(request.principal.projectId, request.query, request.log) return { ...populatedFlow, version: await flowVersionService(request.log).lockPieceVersions({ flowVersion: populatedFlow.version, projectId: request.principal.projectId, }), } }) app.get('/files/:fileId', GetFileRequestParams, async (request, reply) => { const { fileId } = request.params const { data } = await fileService(request.log).getDataOrThrow({ fileId, type: FileType.PACKAGE_ARCHIVE, }) return reply .type('application/zip') .status(StatusCodes.OK) .send(data) }) } async function handleUpdateLogsBehavior(request: HandleUpdateLogsBehaviorParams): Promise<void> { const { updateLogsBehavior, executionStateContentLength, logsFileId, executionStateBuffer, projectId } = request switch (updateLogsBehavior) { case UpdateLogsBehavior.UPDATE_LOGS: { assertNotNullOrUndefined(executionStateContentLength, 'executionStateContentLength is required') await flowRunService(request.log).updateLogs({ flowRunId: request.runId, logsFileId, projectId, executionStateString: executionStateBuffer, executionStateContentLength, }) break } case UpdateLogsBehavior.UPDATE_LOGS_SIZE: { assertNotNullOrUndefined(executionStateContentLength, 'executionStateContentLength is required') assertNotNullOrUndefined(logsFileId, 'logsFileId is required') await flowRunService(request.log).updateLogsSizeAndAttachLogsFile({ flowRunId: request.runId, logsFileId, executionStateContentLength, }) break } case UpdateLogsBehavior.NONE: { break } } } type HandleUpdateLogsBehaviorParams = { log: FastifyBaseLogger updateLogsBehavior: UpdateLogsBehavior executionStateContentLength: number | null logsFileId: string | undefined executionStateBuffer: string | undefined projectId: string runId: string } async function getFlowResponse( result: FlowRunResponse, ): Promise<EngineHttpResponse> { switch (result.status) { case FlowRunStatus.INTERNAL_ERROR: return { status: StatusCodes.INTERNAL_SERVER_ERROR, body: { message: 'An internal error has occurred', }, headers: {}, } case FlowRunStatus.FAILED: case FlowRunStatus.MEMORY_LIMIT_EXCEEDED: return { status: StatusCodes.INTERNAL_SERVER_ERROR, body: { message: 'The flow has failed and there is no response returned', }, headers: {}, } case FlowRunStatus.TIMEOUT: return { status: StatusCodes.GATEWAY_TIMEOUT, body: { message: 'The request took too long to reply', }, headers: {}, } case FlowRunStatus.QUOTA_EXCEEDED: return { status: StatusCodes.NO_CONTENT, body: {}, headers: {}, } // Case that should be handled before default: throw new Error(`Unexpected flow run status: ${result.status}`) } } async function getFlow(projectId: string, request: GetFlowVersionForWorkerRequest, log: FastifyBaseLogger): Promise<PopulatedFlow> { // TODO this can be optimized by getting the flow version directly const flowVersion = await flowVersionService(log).getOneOrThrow(request.versionId) return flowService(log).getOnePopulatedOrThrow({ id: flowVersion.flowId, projectId, versionId: request.versionId, }) } async function markParentRunAsFailed({ parentRunId, childRunId, projectId, platformId, log, }: MarkParentRunAsFailedParams): Promise<void> { const flowRun = await flowRunService(log).getOneOrThrow({ id: parentRunId, projectId, }) const requestId = flowRun.pauseMetadata?.type === PauseType.WEBHOOK ? flowRun.pauseMetadata?.requestId : undefined assertNotNullOrUndefined(requestId, 'Parent run has no request id') const callbackUrl = await domainHelper.getPublicApiUrl({ path: `/v1/flow-runs/${parentRunId}/requests/${requestId}`, platformId }) const childRunUrl = await domainHelper.getPublicUrl({ path: `/projects/${projectId}/runs/${childRunId}`, platformId }) await apAxios.post(callbackUrl, { status: 'error', data: { message: 'Subflow execution failed', link: childRunUrl, }, }) } type MarkParentRunAsFailedParams = { parentRunId: string childRunId: string projectId: string platformId: string log: FastifyBaseLogger } const GetAllFlowsByProjectParams = { config: { allowedPrincipals: [PrincipalType.ENGINE], }, schema: { querystring: Type.Omit(ListFlowsRequest, ['projectId']), }, } const GetFileRequestParams = { config: { allowedPrincipals: [PrincipalType.ENGINE], }, schema: { params: Type.Object({ fileId: Type.String(), }), }, } const UpdateRunProgress = { config: { allowedPrincipals: [PrincipalType.ENGINE], }, schema: { body: UpdateRunProgressRequest, }, } const GetLockedVersionRequest = { config: { allowedPrincipals: [PrincipalType.ENGINE], }, schema: { querystring: GetFlowVersionForWorkerRequest, response: { [StatusCodes.OK]: PopulatedFlow, }, }, } const CreateTriggerRunParams = { config: { allowedPrincipals: [PrincipalType.ENGINE], }, schema: { body: CreateTriggerRunRequestBody, }, } const UpdateFlowResponseParams = { config: { allowedPrincipals: [PrincipalType.ENGINE], }, schema: { body: SendFlowResponseRequest, }, }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server