Skip to main content
Glama

mcp-google-sheets

flow-worker.ts6.41 kB
import { inspect } from 'util' import { rejectedPromiseHandler } from '@activepieces/server-shared' import { ConsumeJobRequest, ConsumeJobResponse, ConsumeJobResponseStatus, WebsocketClientEvent, WebsocketServerEvent, WorkerJobType, WorkerMachineHealthcheckRequest } from '@activepieces/shared' import { FastifyBaseLogger } from 'fastify' import { io, Socket } from 'socket.io-client' import { workerCache } from './cache/worker-cache' import { agentJobExecutor } from './executors/agent-job-executor' import { executeTriggerExecutor } from './executors/execute-trigger-executor' import { flowJobExecutor } from './executors/flow-job-executor' import { renewWebhookExecutor } from './executors/renew-webhook-executor' import { userInteractionJobExecutor } from './executors/user-interaction-job-executor' import { webhookExecutor } from './executors/webhook-job-executor' import { engineRunner } from './runner' import { engineRunnerSocket } from './runner/engine-runner-socket' import { workerMachine } from './utils/machine' let workerToken: string let heartbeatInterval: NodeJS.Timeout let socket: Socket export const flowWorker = (log: FastifyBaseLogger) => ({ async init({ workerToken: token }: { workerToken: string }): Promise<void> { rejectedPromiseHandler(workerCache(log).deleteStaleCache(), log) await engineRunnerSocket(log).init() workerToken = token const { url, path } = workerMachine.getSocketUrlAndPath() socket = io(url, { transports: ['websocket'], path, autoConnect: false, reconnection: true, }) socket.auth = { token: workerToken, workerId: workerMachine.getWorkerId() } socket.on('connect', () => { log.info({ message: 'Connected to server', workerId: workerMachine.getWorkerId(), socketId: socket.id, }) }) socket.on('disconnect', () => { log.info({ message: 'Disconnected from server', }) }) socket.on('connect_error', (error) => { log.error({ message: 'Socket connection error', error: error.message, }) }) socket.on('error', (error) => { log.error({ message: 'Socket error', error: error.message, }) }) socket.on(WebsocketClientEvent.CONSUME_JOB_REQUEST, async (request: ConsumeJobRequest, callback: (data: unknown) => void) => { log.info({ message: 'Received consume job request', jobId: request.jobId, attempsStarted: request.attempsStarted, }) try { const response: ConsumeJobResponse = await consumeJob(request, log) callback(response) } catch (error) { log.info({ message: 'Failed to consume job', error, }) const response: ConsumeJobResponse = { status: ConsumeJobResponseStatus.INTERNAL_ERROR, errorMessage: inspect(error), } callback(response) } }) socket.connect() heartbeatInterval = setInterval(async () => { if (!socket.connected) { log.error({ message: 'Not connected to server, retrying...', }) return } try { const request: WorkerMachineHealthcheckRequest = await workerMachine.getSystemInfo() const response = await socket.timeout(10000).emitWithAck(WebsocketServerEvent.MACHINE_HEARTBEAT, request) await workerMachine.init(response, log) } catch (error) { log.error({ message: 'Failed to send heartbeat, retrying...', error, }) } }, 15000) }, async close(): Promise<void> { await engineRunnerSocket(log).disconnect() if (socket) { socket.disconnect() } clearTimeout(heartbeatInterval) if (workerMachine.hasSettings()) { await engineRunner(log).shutdownAllWorkers() } }, }) async function consumeJob(request: ConsumeJobRequest, log: FastifyBaseLogger): Promise<ConsumeJobResponse> { const { jobId, jobData, attempsStarted, engineToken, timeoutInSeconds } = request switch (jobData.jobType) { case WorkerJobType.EXECUTE_EXTRACT_PIECE_INFORMATION: case WorkerJobType.EXECUTE_PROPERTY: case WorkerJobType.EXECUTE_TOOL: case WorkerJobType.EXECUTE_VALIDATION: case WorkerJobType.EXECUTE_TRIGGER_HOOK: await userInteractionJobExecutor(log).execute(jobData, engineToken, workerToken, timeoutInSeconds) return { status: ConsumeJobResponseStatus.OK, } case WorkerJobType.EXECUTE_FLOW: await flowJobExecutor(log).executeFlow({ jobData, attempsStarted, engineToken, timeoutInSeconds }) return { status: ConsumeJobResponseStatus.OK, } case WorkerJobType.EXECUTE_POLLING: return executeTriggerExecutor(log).executeTrigger({ jobId, data: jobData, engineToken, workerToken, timeoutInSeconds, }) case WorkerJobType.RENEW_WEBHOOK: return renewWebhookExecutor(log).renewWebhook({ data: jobData, engineToken, timeoutInSeconds, }) case WorkerJobType.DELAYED_FLOW: { throw new Error('Delayed flow is handled by the app') } case WorkerJobType.EXECUTE_WEBHOOK: { return webhookExecutor(log).consumeWebhook(jobId, jobData, engineToken, workerToken, timeoutInSeconds) } case WorkerJobType.EXECUTE_AGENT: { await agentJobExecutor(log).executeAgent({ jobData, engineToken, workerToken, }) return { status: ConsumeJobResponseStatus.OK, } } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server