Skip to main content
Glama
flow-worker.ts3.02 kB
import { rejectedPromiseHandler, RunsMetadataQueueConfig, runsMetadataQueueFactory } from '@activepieces/server-shared' import { WebsocketServerEvent, WorkerSettingsResponse } from '@activepieces/shared' import { FastifyBaseLogger } from 'fastify' import { appSocket } from './app-socket' import { registryPieceManager } from './cache/pieces/production/registry-piece-manager' import { workerCache } from './cache/worker-cache' import { engineRunner } from './compute' import { engineRunnerSocket } from './compute/engine-runner-socket' import { jobQueueWorker } from './consume/job-queue-worker' import { workerMachine } from './utils/machine' import { workerDistributedLock, workerDistributedStore, workerRedisConnections } from './utils/worker-redis' export const runsMetadataQueue = runsMetadataQueueFactory({ createRedisConnection: workerRedisConnections.create, distributedStore: workerDistributedStore, }) export const flowWorker = (log: FastifyBaseLogger) => ({ async init({ workerToken: token, markAsHealthy }: FlowWorkerInitParams): Promise<void> { rejectedPromiseHandler(workerCache(log).deleteStaleCache(), log) await engineRunnerSocket(log).init() await appSocket(log).init({ workerToken: token, onConnect: async () => { const request = await workerMachine.getSystemInfo() const response = await appSocket(log).emitWithAck<WorkerSettingsResponse>(WebsocketServerEvent.FETCH_WORKER_SETTINGS, request) await workerMachine.init(response, token, log) await registryPieceManager(log).warmup() await jobQueueWorker(log).start(token) await initRunsMetadataQueue(log) await markAsHealthy() await registryPieceManager(log).distributedWarmup() }, }) }, async close(): Promise<void> { await engineRunnerSocket(log).disconnect() appSocket(log).disconnect() if (runsMetadataQueue.isInitialized()) { await runsMetadataQueue.get().close() } await workerRedisConnections.destroy() await workerDistributedLock(log).destroy() if (workerMachine.hasSettings()) { await engineRunner(log).shutdownAllWorkers() } await jobQueueWorker(log).close() }, }) async function initRunsMetadataQueue(log: FastifyBaseLogger): Promise<void> { const settings = workerMachine.getSettings() const config: RunsMetadataQueueConfig = { isOtelEnabled: settings.OTEL_ENABLED ?? false, redisFailedJobRetentionDays: settings.REDIS_FAILED_JOB_RETENTION_DAYS, redisFailedJobRetentionMaxCount: settings.REDIS_FAILED_JOB_RETENTION_MAX_COUNT, } await runsMetadataQueue.init(config) log.info({ message: 'Initialized runs metadata queue for worker', }, '[flowWorker#init]') } type FlowWorkerInitParams = { workerToken: string markAsHealthy: () => Promise<void> }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server