Skip to main content
Glama

Activepieces MCP Server

by eldoonreval
index.tsβ€’11.1 kB
import { webhookSecretsUtils } from '@activepieces/server-shared' import { ActionType, EngineOperation, EngineOperationType, ExecuteFlowOperation, ExecutePropsOptions, ExecuteStepOperation, ExecuteToolOperation, ExecuteTriggerOperation, ExecuteValidateAuthOperation, flowStructureUtil, FlowVersion, isNil, RunEnvironment, TriggerHookType } from '@activepieces/shared' import { FastifyBaseLogger } from 'fastify' import { executionFiles } from '../cache/execution-files' import { pieceEngineUtil } from '../utils/flow-engine-util' import { workerMachine } from '../utils/machine' import { webhookUtils } from '../utils/webhook-utils' import { EngineHelperResponse, EngineHelperResult, EngineRunner, engineRunnerUtils } from './engine-runner-types' import { EngineProcessManager } from './process/engine-process-manager' let processManager: EngineProcessManager export const engineRunner = (log: FastifyBaseLogger): EngineRunner => ({ async executeFlow(engineToken, operation) { log.debug({ flowVersion: operation.flowVersion.id, projectId: operation.projectId, }, '[threadEngineRunner#executeFlow]') await prepareFlowSandbox(log, engineToken, operation.flowVersion, operation.runEnvironment, operation.projectId) const input: ExecuteFlowOperation = { ...operation, engineToken, publicApiUrl: workerMachine.getPublicApiUrl(), internalApiUrl: workerMachine.getInternalApiUrl(), } return execute(log, input, EngineOperationType.EXECUTE_FLOW) }, async executeTrigger(engineToken, operation) { log.debug({ hookType: operation.hookType, projectId: operation.projectId, }, '[threadEngineRunner#executeTrigger]') const triggerPiece = await pieceEngineUtil(log).getTriggerPiece(engineToken, operation.flowVersion) const lockedVersion = await pieceEngineUtil(log).lockSingleStepPieceVersion({ engineToken, stepName: operation.flowVersion.trigger.name, flowVersion: operation.flowVersion, }) const input: ExecuteTriggerOperation<TriggerHookType> = { projectId: operation.projectId, hookType: operation.hookType, webhookUrl: operation.webhookUrl, triggerPayload: operation.triggerPayload, test: operation.test, flowVersion: lockedVersion, appWebhookUrl: await webhookUtils(log).getAppWebhookUrl({ appName: triggerPiece.pieceName, publicApiUrl: workerMachine.getPublicApiUrl(), }), publicApiUrl: workerMachine.getPublicApiUrl(), internalApiUrl: workerMachine.getInternalApiUrl(), webhookSecret: await webhookSecretsUtils.getWebhookSecret(lockedVersion), engineToken, } await executionFiles(log).provision({ pieces: [triggerPiece], codeSteps: [], customPiecesPath: executionFiles(log).getCustomPiecesPath(operation), }) return execute(log, input, EngineOperationType.EXECUTE_TRIGGER_HOOK) }, async extractPieceMetadata(engineToken, operation) { log.debug({ operation }, '[threadEngineRunner#extractPieceMetadata]') const lockedPiece = await pieceEngineUtil(log).resolveExactVersion(engineToken, operation) await executionFiles(log).provision({ pieces: [lockedPiece], codeSteps: [], customPiecesPath: executionFiles(log).getCustomPiecesPath(operation), }) return execute(log, operation, EngineOperationType.EXTRACT_PIECE_METADATA) }, async executeValidateAuth(engineToken, operation) { log.debug({ operation }, '[threadEngineRunner#executeValidateAuth]') const { piece } = operation const lockedPiece = await pieceEngineUtil(log).resolveExactVersion(engineToken, piece) await executionFiles(log).provision({ pieces: [lockedPiece], codeSteps: [], customPiecesPath: executionFiles(log).getCustomPiecesPath(operation), }) const input: ExecuteValidateAuthOperation = { ...operation, publicApiUrl: workerMachine.getPublicApiUrl(), internalApiUrl: workerMachine.getInternalApiUrl(), engineToken, } return execute(log, input, EngineOperationType.EXECUTE_VALIDATE_AUTH) }, async executeAction(engineToken, operation) { log.debug({ stepName: operation.stepName, flowVersionId: operation.flowVersion.id, }, '[threadEngineRunner#executeAction]') const step = flowStructureUtil.getActionOrThrow(operation.stepName, operation.flowVersion.trigger) switch (step.type) { case ActionType.PIECE: { const lockedPiece = await pieceEngineUtil(log).getExactPieceForStep(engineToken, step) await executionFiles(log).provision({ pieces: [lockedPiece], codeSteps: [], customPiecesPath: executionFiles(log).getCustomPiecesPath(operation), }) break } case ActionType.CODE: { const codes = pieceEngineUtil(log).getCodeSteps(operation.flowVersion).filter((code) => code.name === operation.stepName) await executionFiles(log).provision({ pieces: [], codeSteps: codes, customPiecesPath: executionFiles(log).getCustomPiecesPath(operation), runEnvironment: operation.runEnvironment, }) break } case ActionType.ROUTER: case ActionType.LOOP_ON_ITEMS: break } const lockedFlowVersion = await pieceEngineUtil(log).lockSingleStepPieceVersion({ engineToken, flowVersion: operation.flowVersion, stepName: operation.stepName, }) const input: ExecuteStepOperation = { flowVersion: lockedFlowVersion, stepName: operation.stepName, projectId: operation.projectId, sampleData: operation.sampleData, publicApiUrl: workerMachine.getPublicApiUrl(), internalApiUrl: workerMachine.getInternalApiUrl(), engineToken, runEnvironment: operation.runEnvironment, } return execute(log, input, EngineOperationType.EXECUTE_STEP) }, async executeProp(engineToken, operation) { log.debug({ piece: operation.piece, propertyName: operation.propertyName, stepName: operation.actionOrTriggerName, }, '[threadEngineRunner#executeProp]') const { piece } = operation const lockedPiece = await pieceEngineUtil(log).resolveExactVersion(engineToken, piece) await executionFiles(log).provision({ pieces: [lockedPiece], codeSteps: [], customPiecesPath: executionFiles(log).getCustomPiecesPath(operation), }) const input: ExecutePropsOptions = { ...operation, publicApiUrl: workerMachine.getPublicApiUrl(), internalApiUrl: workerMachine.getInternalApiUrl(), engineToken, } return execute(log, input, EngineOperationType.EXECUTE_PROPERTY) }, async excuteTool(engineToken, operation) { log.debug({ operation }, '[threadEngineRunner#excuteTool]') const lockedPiece = await pieceEngineUtil(log).resolveExactVersion(engineToken, operation) await executionFiles(log).provision({ pieces: [lockedPiece], codeSteps: [], customPiecesPath: executionFiles(log).getCustomPiecesPath(operation), }) const input: ExecuteToolOperation = { ...operation, publicApiUrl: workerMachine.getPublicApiUrl(), internalApiUrl: workerMachine.getInternalApiUrl(), engineToken, } return execute(log, input, EngineOperationType.EXECUTE_TOOL) }, async shutdownAllWorkers() { if (!isNil(processManager)) { await processManager.shutdown() } }, }) async function prepareFlowSandbox(log: FastifyBaseLogger, engineToken: string, flowVersion: FlowVersion, runEnvironment: RunEnvironment, projectId: string): Promise<void> { const pieces = await pieceEngineUtil(log).extractFlowPieces({ flowVersion, engineToken, }) const codeSteps = pieceEngineUtil(log).getCodeSteps(flowVersion) await executionFiles(log).provision({ pieces, codeSteps, customPiecesPath: executionFiles(log).getCustomPiecesPath({ projectId }), runEnvironment, }) } async function execute<Result extends EngineHelperResult>(log: FastifyBaseLogger, operation: EngineOperation, operationType: EngineOperationType): Promise<EngineHelperResponse<Result>> { const memoryLimit = Math.floor(Number(workerMachine.getSettings().SANDBOX_MEMORY_LIMIT) / 1024) const startTime = Date.now() if (isNil(processManager)) { processManager = new EngineProcessManager(log, workerMachine.getSettings().FLOW_WORKER_CONCURRENCY + workerMachine.getSettings().SCHEDULED_WORKER_CONCURRENCY, { env: getEnvironmentVariables(), resourceLimits: { maxOldGenerationSizeMb: memoryLimit, maxYoungGenerationSizeMb: memoryLimit, stackSizeMb: memoryLimit, }, execArgv: [ `--max-old-space-size=${memoryLimit}`, `--max-semi-space-size=${memoryLimit}`, `--stack-size=${memoryLimit * 1024}`, // stack size is in KB ], }) } const { engine, stdError, stdOut } = await processManager.executeTask(operationType, operation) return engineRunnerUtils(log).readResults({ timeInSeconds: (Date.now() - startTime) / 1000, verdict: engine.status, output: engine.response, standardOutput: stdOut, standardError: stdError, }) } function getEnvironmentVariables(): Record<string, string | undefined> { const allowedEnvVariables = workerMachine.getSettings().SANDBOX_PROPAGATED_ENV_VARS const propagatedEnvVars = Object.fromEntries(allowedEnvVariables.map((envVar) => [envVar, process.env[envVar]])) return { ...propagatedEnvVars, NODE_OPTIONS: '--enable-source-maps', AP_PAUSED_FLOW_TIMEOUT_DAYS: workerMachine.getSettings().PAUSED_FLOW_TIMEOUT_DAYS.toString(), AP_EXECUTION_MODE: workerMachine.getSettings().EXECUTION_MODE, AP_PIECES_SOURCE: workerMachine.getSettings().PIECES_SOURCE, AP_MAX_FILE_SIZE_MB: workerMachine.getSettings().MAX_FILE_SIZE_MB.toString(), AP_FILE_STORAGE_LOCATION: workerMachine.getSettings().FILE_STORAGE_LOCATION, AP_S3_USE_SIGNED_URLS: workerMachine.getSettings().S3_USE_SIGNED_URLS, } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eldoonreval/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server