Skip to main content
Glama
runs-metadata-queue-factory.ts3.68 kB
import { apId, ApId, FlowRun as FlowRunSchema } from '@activepieces/shared' import { Static, Type } from '@sinclair/typebox' import { Value } from '@sinclair/typebox/value' import { Queue } from 'bullmq' import { BullMQOtel } from 'bullmq-otel' import Redis from 'ioredis' import { apDayjsDuration } from '../dayjs-helper' import { DistributedStore } from '../redis/distributed-store-factory' import { QueueName } from './index' export const redisMetadataKey = (runId: ApId): string => `runs_metadata:${runId}` export const runsMetadataQueueFactory = ({ createRedisConnection, distributedStore, }: RunsMetadataQueueFactoryParams) => { let queueInstance: Queue<RunsMetadataJobData> | undefined = undefined return { async init(config: RunsMetadataQueueConfig): Promise<void> { queueInstance = new Queue<RunsMetadataJobData>(QueueName.RUNS_METADATA, { connection: await createRedisConnection(), telemetry: config.isOtelEnabled ? new BullMQOtel(QueueName.RUNS_METADATA) : undefined, defaultJobOptions: { attempts: 5, backoff: { type: 'exponential', delay: apDayjsDuration(8, 'minute').asMilliseconds(), }, removeOnComplete: true, removeOnFail: { age: apDayjsDuration(config.redisFailedJobRetentionDays, 'day').asSeconds(), count: config.redisFailedJobRetentionMaxCount, }, }, }) await queueInstance.waitUntilReady() }, async add(params: RunsMetadataUpsertData): Promise<void> { if (!queueInstance) { throw new Error('Runs metadata queue not initialized') } const cleanedParams = Value.Clean(RunsMetadataUpsertData, params) as RunsMetadataUpsertData await distributedStore.merge(redisMetadataKey(cleanedParams.id), { ...cleanedParams, requestId: apId(), }) await queueInstance.add( 'update-run-metadata', { runId: cleanedParams.id, projectId: cleanedParams.projectId }, { deduplication: { id: cleanedParams.id } }, ) }, get(): Queue<RunsMetadataJobData> { if (!queueInstance) { throw new Error('Runs metadata queue not initialized') } return queueInstance }, isInitialized(): boolean { return queueInstance !== undefined }, } } type RunsMetadataQueueFactoryParams = { createRedisConnection: () => Promise<Redis> distributedStore: DistributedStore } export type RunsMetadataJobData = { runId: string projectId: string } export type RunsMetadataQueueConfig = { isOtelEnabled: boolean redisFailedJobRetentionDays: number redisFailedJobRetentionMaxCount: number } export const RunsMetadataUpsertData = Type.Composite([ Type.Required(Type.Pick(FlowRunSchema, ['id', 'projectId'])), Type.Partial(Type.Pick(FlowRunSchema, [ 'flowId', 'flowVersionId', 'environment', 'startTime', 'finishTime', 'status', 'tags', 'pauseMetadata', 'failedStep', 'stepNameToTest', 'parentRunId', 'failParentOnFailure', 'logsFileId', 'updated', ])), Type.Object({ requestId: Type.Optional(Type.String()), }), ]) export type RunsMetadataUpsertData = Static<typeof RunsMetadataUpsertData>

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server