import { Queue, Worker, QueueEvents, Job } from "bullmq";
import IORedis from "ioredis";
// ============================================
// Configuration
// ============================================
const connection = new IORedis({
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
maxRetriesPerRequest: null,
});
// ============================================
// Types
// ============================================
type EmailJobData = {
to: string;
subject: string;
body: string;
};
type ReportJobData = {
userId: string;
dateRange: [string, string];
};
// ============================================
// Queue Definitions
// ============================================
export const emailQueue = new Queue<EmailJobData>("email-queue", {
connection,
});
export const reportQueue = new Queue<ReportJobData>("report-queue", {
connection,
});
// ============================================
// Workers
// ============================================
const emailWorker = new Worker<EmailJobData>(
"email-queue",
async (job) => {
console.log(`Processing email job ${job.id} to ${job.data.to}`);
// Simulate sending email
await new Promise((resolve) => setTimeout(resolve, 1000));
if (Math.random() < 0.1) throw new Error("Random failure");
return { sent: true, messageId: "123" };
},
{
connection,
concurrency: 5, // Process 5 emails at a time
limiter: {
max: 10, // Max 10 jobs
duration: 1000, // Per 1 second
},
},
);
const reportWorker = new Worker<ReportJobData>(
"report-queue",
async (job) => {
console.log(`Generating report for user ${job.data.userId}`);
// Simulate long task
let progress = 0;
while (progress < 100) {
await new Promise((resolve) => setTimeout(resolve, 500));
progress += 20;
await job.updateProgress(progress);
}
return { reportUrl: "https://example.com/report.pdf" };
},
{ connection },
);
// ============================================
// Events
// ============================================
const setupEvents = (worker: Worker) => {
worker.on("completed", (job) => {
console.log(
`Job ${job.id} completed! Result: ${JSON.stringify(job.returnvalue)}`,
);
});
worker.on("failed", (job, err) => {
console.error(`Job ${job?.id} failed: ${err.message}`);
});
worker.on("progress", (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
};
setupEvents(emailWorker);
setupEvents(reportWorker);
// ============================================
// Helper Functions
// ============================================
export const queueService = {
async sendEmail(data: EmailJobData) {
return emailQueue.add("send-email", data, {
attempts: 3,
backoff: {
type: "exponential",
delay: 1000,
},
removeOnComplete: true,
});
},
async generateReport(data: ReportJobData) {
return reportQueue.add("generate-report", data, {
priority: 1, // Higher priority
});
},
async close() {
await emailWorker.close();
await reportWorker.close();
await emailQueue.close();
await reportQueue.close();
await connection.quit();
},
};