lock.ts•1.3 kB
import { ApLock, exceptionHandler } from '@activepieces/server-shared'
import { isNil } from '@activepieces/shared'
import { Mutex } from 'async-mutex'
import { FastifyBaseLogger } from 'fastify'
import RedLock from 'redlock'
import { redisConnections } from '../database/redis'
const lockMutex = new Mutex()
export const distributedLock = {
acquireLock: async ({ key, timeout = 3000, log }: AcquireLockParams): Promise<ApLock> => {
try {
const redLock = await getOrCreateRedLock()
return await redLock.acquire([key], timeout, {
retryCount: Math.ceil(timeout / 2000) * 2,
retryDelay: 2000,
})
}
catch (e) {
exceptionHandler.handle(e, log)
throw e
}
},
}
let _redLock: RedLock
function getOrCreateRedLock(): Promise<RedLock> {
return lockMutex.runExclusive(async () => {
if (!isNil(_redLock)) {
return _redLock
}
const redisConnection = await redisConnections.createNew()
_redLock = new RedLock([redisConnection], {
driftFactor: 0.01,
retryCount: 30,
})
return _redLock
})
}
type AcquireLockParams = {
key: string
timeout?: number
log: FastifyBaseLogger
}