import { Queue, Worker, QueueEvents, Job } from 'bullmq';
import IORedis from 'ioredis';
// ============================================
// Configuration
// ============================================
const connection = new IORedis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
maxRetriesPerRequest: null,
});
// ============================================
// Types
// ============================================
type EmailJobData = {
to: string;
subject: string;
body: string;
};
type ReportJobData = {
userId: string;
dateRange: [string, string];
};
// ============================================
// Queue Definitions
// ============================================
export const emailQueue = new Queue<EmailJobData>('email-queue', { connection });
export const reportQueue = new Queue<ReportJobData>('report-queue', { connection });
// ============================================
// Workers
// ============================================
const emailWorker = new Worker<EmailJobData>('email-queue', async (job) => {
console.log(`Processing email job ${job.id} to ${job.data.to}`);
// Simulate sending email
await new Promise(resolve => setTimeout(resolve, 1000));
if (Math.random() < 0.1) throw new Error('Random failure');
return { sent: true, messageId: '123' };
}, {
connection,
concurrency: 5, // Process 5 emails at a time
limiter: {
max: 10, // Max 10 jobs
duration: 1000, // Per 1 second
}
});
const reportWorker = new Worker<ReportJobData>('report-queue', async (job) => {
console.log(`Generating report for user ${job.data.userId}`);
// Simulate long task
let progress = 0;
while (progress < 100) {
await new Promise(resolve => setTimeout(resolve, 500));
progress += 20;
await job.updateProgress(progress);
}
return { reportUrl: 'https://example.com/report.pdf' };
}, { connection });
// ============================================
// Events
// ============================================
const setupEvents = (worker: Worker) => {
worker.on('completed', (job) => {
console.log(`Job ${job.id} completed! Result: ${JSON.stringify(job.returnvalue)}`);
});
worker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed: ${err.message}`);
});
worker.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
};
setupEvents(emailWorker);
setupEvents(reportWorker);
// ============================================
// Helper Functions
// ============================================
export const queueService = {
async sendEmail(data: EmailJobData) {
return emailQueue.add('send-email', data, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: true,
});
},
async generateReport(data: ReportJobData) {
return reportQueue.add('generate-report', data, {
priority: 1, // Higher priority
});
},
async close() {
await emailWorker.close();
await reportWorker.close();
await emailQueue.close();
await reportQueue.close();
await connection.quit();
}
};