Skip to main content
Glama

mcp-google-sheets

system-job.ts5.4 kB
import { apDayjs, apDayjsDuration } from '@activepieces/server-shared' import { assertNotNullOrUndefined, isNil, spreadIfDefined } from '@activepieces/shared' import { Job, JobsOptions, Queue, Worker } from 'bullmq' import { FastifyBaseLogger } from 'fastify' import { redisConnections } from '../../database/redis-connections' import { JobSchedule, SystemJobData, SystemJobName, SystemJobSchedule } from './common' import { systemJobHandlers } from './job-handlers' const FIFTEEN_MINUTES = apDayjsDuration(15, 'minute').asMilliseconds() const ONE_MONTH = apDayjsDuration(1, 'month').asSeconds() const SYSTEM_JOB_QUEUE = 'system-job-queue' export let systemJobsQueue: Queue<SystemJobData, unknown, SystemJobName> let systemJobWorker: Worker<SystemJobData, unknown, SystemJobName> export const systemJobsSchedule = (log: FastifyBaseLogger): SystemJobSchedule => ({ async init(): Promise<void> { systemJobsQueue = new Queue( SYSTEM_JOB_QUEUE, { connection: await redisConnections.create(), defaultJobOptions: { attempts: 10, backoff: { type: 'exponential', delay: FIFTEEN_MINUTES, }, removeOnComplete: true, removeOnFail: { age: ONE_MONTH, }, }, }, ) systemJobWorker = new Worker( SYSTEM_JOB_QUEUE, async (job) => { log.debug({ name: 'SystemJob#systemJobWorker' }, `Executing job (${job.name})`) const jobHandler = systemJobHandlers.getJobHandler(job.name) await jobHandler(job.data) }, { connection: await redisConnections.create(), concurrency: 1, }, ) await Promise.all([ systemJobsQueue.waitUntilReady(), systemJobWorker.waitUntilReady(), ]) await removeDeprecatedJobs() }, async upsertJob({ job, schedule }): Promise<void> { log.info({ name: 'SystemJob#upsertJob', jobName: job.name }, 'Upserting job') const existingJob = await getJobByNameAndJobId(job.name, job.jobId) const patternChanged = !isNil(existingJob) && schedule.type === 'repeated' ? schedule.cron !== existingJob.opts.repeat?.pattern : false if (patternChanged && !isNil(existingJob) && !isNil(existingJob.opts.repeat) && !isNil(existingJob.name)) { log.info({ name: 'SystemJob#upsertJob', jobName: job.name }, 'Pattern changed, removing job from queue') await systemJobsQueue.removeRepeatable(existingJob.name as SystemJobName, existingJob.opts.repeat) } if (isNil(existingJob) || patternChanged) { log.info({ name: 'SystemJob#upsertJob', jobName: job.name }, 'Adding job to queue') const jobOptions = configureJobOptions({ schedule, jobId: job.jobId }) await systemJobsQueue.add(job.name, job.data, jobOptions) return } }, async close(): Promise<void> { if (isNil(systemJobsQueue)) { return } await Promise.all([ systemJobWorker.close(), systemJobsQueue.close(), ]) }, }) async function removeDeprecatedJobs(): Promise<void> { const deprecatedJobs = [ 'trigger-data-cleaner', 'logs-cleanup-trigger', 'usage-report', 'archive-old-issues', 'platform-usage-report', ] const allSystemJobs = await systemJobsQueue.getJobSchedulers() const deprecatedJobsFromQueue = allSystemJobs.filter(f => !isNil(f) && (deprecatedJobs.includes(f.key) || deprecatedJobs.some(d => f.key.startsWith(d)))) for (const job of deprecatedJobsFromQueue) { await systemJobsQueue.removeJobScheduler(job.id ?? job.key) } const onetimeDeprecatedJobs = [ 'hard-delete-project', ] const oneTimeJobs = await systemJobsQueue.getJobs() const oneTimeJobsFromQueue = oneTimeJobs.filter(f => !isNil(f) && (onetimeDeprecatedJobs.includes(f.name) || onetimeDeprecatedJobs.some(d => f.name.startsWith(d)))) for (const job of oneTimeJobsFromQueue) { assertNotNullOrUndefined(job.id, 'Job id is required') await systemJobsQueue.remove(job.id) } } const configureJobOptions = ({ schedule, jobId }: { schedule: JobSchedule, jobId?: string }): JobsOptions => { const config: JobsOptions = {} switch (schedule.type) { case 'one-time': { const now = apDayjs() config.delay = schedule.date.diff(now, 'milliseconds') break } case 'repeated': { config.repeat = { pattern: schedule.cron, tz: 'UTC', } break } } return { ...config, ...spreadIfDefined('jobId', jobId), } } const getJobByNameAndJobId = async (name: string, jobId?: string): Promise<Job | undefined> => { const allSystemJobs = await systemJobsQueue.getJobs() return allSystemJobs.find(job => { if (isNil(job)) { return false } if (!isNil(jobId)) { return job.name === name && job.id === jobId } return job.name === name }) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server