alerts-handler.ts•4.31 kB
import { NotificationStatus } from '@activepieces/shared'
import dayjs from 'dayjs'
import { FastifyBaseLogger } from 'fastify'
import { redisConnections } from '../../database/redis-connections'
import { SystemJobName } from '../../helper/system-jobs/common'
import { systemJobsSchedule } from '../../helper/system-jobs/system-job'
import { platformService } from '../../platform/platform.service'
import { projectService } from '../../project/project-service'
import { domainHelper } from '../custom-domains/domain-helper'
import { emailService } from '../helper/email/email-service'
const HOUR_IN_SECONDS = 3600
const DAY_IN_SECONDS = 86400
const HOURLY_LIMIT = 5
const DAILY_LIMIT = 15
export const alertsHandler = (log: FastifyBaseLogger) => ({
[NotificationStatus.NEVER]: async (_: IssueParams): Promise<void> => Promise.resolve(),
[NotificationStatus.ALWAYS]: async (params: IssueParams): Promise<void> => sendAlertOnFlowRun(params, log),
[NotificationStatus.NEW_ISSUE]: async (params: IssueParams): Promise<void> => sendAlertOnNewIssue(params, log),
})
async function scheduleSendingReminder(params: IssueRemindersParams, log: FastifyBaseLogger): Promise<void> {
const { projectId } = params
if (params.issueCount === 1) {
const project = await projectService.getOneOrThrow(projectId)
const platform = await platformService.getOneOrThrow(project.platformId)
const reminderKey = `reminder:${projectId}`
const redisConnection = await redisConnections.useExisting()
const isEmailScheduled = await redisConnection.get(reminderKey)
if (isEmailScheduled) {
return
}
const endOfDay = dayjs().endOf('day')
await redisConnection.set(reminderKey, 0, 'EXAT', endOfDay.unix())
await systemJobsSchedule(log).upsertJob({
job: {
name: SystemJobName.ISSUES_REMINDER,
data: {
projectId,
platformId: platform.id,
projectName: project.displayName,
},
jobId: `issues-reminder-${projectId}`,
},
schedule: {
type: 'one-time',
date: endOfDay,
},
})
}
}
async function sendAlertOnNewIssue(params: IssueParams, log: FastifyBaseLogger): Promise<void> {
const { platformId, issueCount } = params
const isOldIssue = issueCount > 1
if (isOldIssue) {
return
}
const issueUrl = await domainHelper.getPublicUrl({
platformId,
path: 'runs?limit=10#Issues',
})
await scheduleSendingReminder({ projectId: params.projectId, issueCount: params.issueCount }, log)
await emailService(log).sendIssueCreatedNotification({
...params,
issueOrRunsPath: issueUrl,
isIssue: true,
})
}
async function sendAlertOnFlowRun(params: IssueParams, log: FastifyBaseLogger): Promise<void> {
const { flowId, platformId, flowRunId } = params
const hourlyFlowIdKey = `alerts:hourly:${flowId}`
const dailyFlowIdKey = `alerts:daily:${flowId}`
const [hourlyCount, dailyCount] = await Promise.all([
incrementAndExpire(hourlyFlowIdKey, HOUR_IN_SECONDS),
incrementAndExpire(dailyFlowIdKey, DAY_IN_SECONDS),
])
if (hourlyCount > HOURLY_LIMIT || dailyCount > DAILY_LIMIT) {
return
}
const flowRunsUrl = await domainHelper.getInternalUrl({
platformId,
path: `runs/${flowRunId}`,
})
await scheduleSendingReminder({ projectId: params.projectId, issueCount: params.issueCount }, log)
await emailService(log).sendIssueCreatedNotification({
...params,
issueOrRunsPath: flowRunsUrl,
isIssue: false,
})
}
async function incrementAndExpire(key: string, expiryTime: number): Promise<number> {
const redis = await redisConnections.useExisting()
const count = await redis.incr(key)
if (count === 1) {
await redis.expire(key, expiryTime)
}
return count
}
type IssueParams = {
projectId: string
platformId: string
flowId: string
flowRunId: string
flowName: string
issueCount: number
createdAt: string
}
type IssueRemindersParams = Pick<IssueParams, 'projectId' | 'issueCount'>