HazelJS Queue Package
@hazeljs/queue provides Redis-backed job queues for HazelJS using BullMQ — enqueue background jobs, process with workers, and handle retries, delays, and priorities.
Quick Reference
- Purpose:
@hazeljs/queueprovides background job processing with Redis-backed queues (BullMQ), decorator-based workers, retries, delays, priorities, and job lifecycle management. - When to use: Use
@hazeljs/queuefor background job processing (email sending, file processing, AI tasks). Use@hazeljs/kafkafor event streaming between microservices instead. Use@hazeljs/cronfor scheduled recurring tasks. - Key concepts:
QueueModule.forRoot(),@Processor()worker decorator,@Process()job handler decorator, job enqueueing, retries with backoff, delayed jobs, job priorities, job events. - Dependencies:
@hazeljs/core,bullmq,ioredis. - Common patterns: Register
QueueModule.forRoot({ connection: { host, port } })→ enqueue jobs withqueue.add(name, data)→ process with@Processor('queue-name')class and@Process()method. - Common mistakes: Not running a separate worker process in production; not configuring retry/backoff for flaky jobs; not monitoring failed jobs; Redis connection not configured.
Purpose
Building scalable applications requires offloading long-running tasks to background workers. The @hazeljs/queue package simplifies this by providing:
- Redis-Backed Queues – Uses BullMQ for reliable, distributed job processing
- QueueService – Injectable service for adding jobs from controllers and services
- @Queue Decorator – Mark methods as job processors for worker setup
- Job Options – Delay, priority, retry attempts, backoff strategies, timeouts
- Cron Integration – Works with
@hazeljs/cronfor distributed scheduled jobs - Worker Management – Separate worker processes or in-process workers
- Job Monitoring – Track job status, failures, and completions
Architecture
graph TD
A["Controller/Service"] --> B["QueueService.add()"]
B --> C["Redis Queue<br/>(BullMQ)"]
C --> D["Worker Process 1"]
C --> E["Worker Process 2"]
C --> F["Worker Process N"]
D --> G["@Queue Processor"]
E --> G
F --> G
G --> H{Job Result}
H -->|Success| I["Completed"]
H -->|Failure| J["Retry Logic"]
J -->|Max Attempts| K["Failed"]
J -->|Retry| C
style A fill:#3b82f6,stroke:#60a5fa,stroke-width:2px,color:#fff
style B fill:#3b82f6,stroke:#60a5fa,stroke-width:2px,color:#fff
style C fill:#10b981,stroke:#34d399,stroke-width:2px,color:#fff
style D fill:#8b5cf6,stroke:#a78bfa,stroke-width:2px,color:#fff
style E fill:#8b5cf6,stroke:#a78bfa,stroke-width:2px,color:#fff
style F fill:#8b5cf6,stroke:#a78bfa,stroke-width:2px,color:#fffKey Components
- QueueModule – Registers QueueService and manages queue connections
- QueueService – Injectable service for adding jobs to queues
- @Queue Decorator – Marks methods as job processors
- BullMQ Worker – Processes jobs from Redis queues
- Job Options – Configure delay, retry, priority, backoff
Installation
npm install @hazeljs/queue ioredis
Quick Start
1. Import QueueModule
import { HazelModule } from '@hazeljs/core';
import { QueueModule } from '@hazeljs/queue';
@HazelModule({
imports: [
QueueModule.forRoot({
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379', 10),
},
}),
],
})
export class AppModule {}
2. Add Jobs from Controllers
import { Injectable } from '@hazeljs/core';
import { QueueService } from '@hazeljs/queue';
@Injectable()
export class EmailService {
constructor(private queue: QueueService) {}
async sendWelcomeEmail(userId: string, email: string) {
await this.queue.add('emails', 'welcome', { userId, email });
}
async sendDelayedReminder(userId: string, delayMs: number) {
await this.queue.addDelayed('emails', 'reminder', { userId }, delayMs);
}
async processWithRetry(data: { orderId: string }) {
await this.queue.addWithRetry('orders', 'process', data, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
});
}
}
3. Process Jobs with BullMQ Worker
Create a worker process (or run alongside your app):
import { Worker } from 'bullmq';
const worker = new Worker(
'emails',
async (job) => {
if (job.name === 'welcome') {
await sendWelcomeEmail(job.data.userId, job.data.email);
} else if (job.name === 'reminder') {
await sendReminder(job.data.userId);
}
},
{
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379', 10),
},
}
);
worker.on('completed', (job) => console.log(`Job ${job.id} completed`));
worker.on('failed', (job, err) => console.error(`Job ${job?.id} failed:`, err));
4. Using @Queue Decorator
Mark methods as job processors for metadata-driven worker setup:
import { Injectable } from '@hazeljs/core';
import { Queue } from '@hazeljs/queue';
@Injectable()
export class EmailProcessor {
@Queue('emails')
async handleWelcome(job: { data: { userId: string; email: string } }) {
await this.sendWelcome(job.data.userId, job.data.email);
}
@Queue('emails')
async handleReminder(job: { data: { userId: string } }) {
await this.sendReminder(job.data.userId);
}
private async sendWelcome(userId: string, email: string) {
// Send welcome email
}
private async sendReminder(userId: string) {
// Send reminder
}
}
Job Options
Delay
Schedule jobs to run in the future:
// Run in 5 minutes
await queueService.addDelayed('notifications', 'reminder', { userId: '123' }, 5 * 60 * 1000);
Priority
Higher priority jobs are processed first:
await queueService.add('orders', 'process', { orderId: '123' }, {
priority: 1, // Higher = processed first
});
Retry with Backoff
Configure automatic retries with exponential or fixed backoff:
await queueService.addWithRetry('payments', 'charge', { orderId: '123' }, {
attempts: 5,
backoff: {
type: 'exponential', // or 'fixed'
delay: 1000, // Initial delay in ms
},
});
Timeout
Set a maximum execution time for jobs:
await queueService.add('reports', 'generate', { reportId: '123' }, {
timeout: 30000, // 30 seconds
});
Integration with Cron
For distributed cron jobs, enqueue work from cron handlers instead of executing inline:
import { Injectable } from '@hazeljs/core';
import { Cron, CronExpression } from '@hazeljs/cron';
import { QueueService } from '@hazeljs/queue';
@Injectable()
export class TaskService {
constructor(private queue: QueueService) {}
@Cron({
name: 'daily-cleanup',
cronTime: CronExpression.EVERY_DAY_AT_MIDNIGHT,
})
async triggerCleanup() {
// Enqueue for distributed processing
await this.queue.add('maintenance', 'daily-cleanup', {});
}
@Cron({
name: 'hourly-reports',
cronTime: CronExpression.EVERY_HOUR,
})
async triggerReports() {
await this.queue.add('reports', 'hourly-summary', {
timestamp: new Date().toISOString(),
});
}
}
Benefits:
- Cron triggers are lightweight (just enqueue)
- Workers can scale independently
- Failed jobs are retried automatically
- No blocking on the cron scheduler
API Reference
QueueService
| Method | Parameters | Description |
|---|---|---|
add(queueName, jobName, data?, options?) | Queue name, job name, data, options | Add a job to the queue |
addDelayed(queueName, jobName, data, delayMs) | Queue name, job name, data, delay | Add a delayed job |
addWithRetry(queueName, jobName, data, options) | Queue name, job name, data, retry options | Add with retry configuration |
getQueue(name) | Queue name | Get BullMQ Queue instance |
close() | None | Close all queue connections |
Job Options (JobsOptions)
| Option | Type | Description |
|---|---|---|
delay | number | Delay before processing (milliseconds) |
priority | number | Job priority (higher = processed first) |
attempts | number | Maximum retry attempts |
backoff | { type: 'fixed' | 'exponential', delay: number } | Retry backoff strategy |
timeout | number | Job timeout (milliseconds) |
removeOnComplete | boolean | Remove job after completion |
removeOnFail | boolean | Remove job after failure |
@Queue Decorator
@Queue(queueName: string)
Marks a method as a job processor for the specified queue.
Use Cases
Email Notifications
@Injectable()
export class NotificationService {
constructor(private queue: QueueService) {}
async sendOrderConfirmation(orderId: string, email: string) {
await this.queue.add('emails', 'order-confirmation', { orderId, email });
}
async sendShippingUpdate(orderId: string, trackingNumber: string) {
await this.queue.add('emails', 'shipping-update', { orderId, trackingNumber });
}
}
Report Generation
@Injectable()
export class ReportService {
constructor(private queue: QueueService) {}
async generateMonthlyReport(userId: string, month: string) {
await this.queue.add('reports', 'monthly', { userId, month }, {
timeout: 60000, // 1 minute timeout
priority: 2,
});
}
}
Data Processing
@Injectable()
export class DataService {
constructor(private queue: QueueService) {}
async processUpload(fileId: string) {
await this.queue.addWithRetry('data-processing', 'upload', { fileId }, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});
}
}
Agent Tasks
@Injectable()
export class AgentService {
constructor(private queue: QueueService) {}
async runAgentWorkflow(agentId: string, input: string) {
await this.queue.add('agents', 'execute', { agentId, input }, {
timeout: 120000, // 2 minutes
attempts: 2,
});
}
}
Advantages
1. Reliable Processing
Redis-backed queues ensure jobs aren't lost even if workers crash.
2. Horizontal Scalability
Add more worker processes to handle increased load.
3. Automatic Retries
Configurable retry logic with exponential or fixed backoff.
4. Job Prioritization
Process high-priority jobs first.
5. Delayed Execution
Schedule jobs to run in the future.
6. Distributed Cron
Combine with @hazeljs/cron for scalable scheduled tasks.
Best Practices
- Separate Worker Processes – Run workers in separate processes for better isolation
- Set Timeouts – Always set timeouts for long-running jobs
- Use Retries Wisely – Configure appropriate retry attempts and backoff
- Monitor Failures – Set up alerts for failed jobs
- Clean Up Completed Jobs – Use
removeOnCompleteto avoid Redis bloat - Use Priorities – Prioritize critical jobs (payments, notifications)
Recipes
Recipe: Email Queue with Processor
// File: src/email/email.queue.ts
import { Service } from '@hazeljs/core';
import { QueueService, Process } from '@hazeljs/queue';
@Service()
export class EmailQueue {
constructor(private readonly queue: QueueService) {}
async sendWelcomeEmail(userId: string, email: string) {
await this.queue.add('email', {
type: 'welcome',
userId,
email,
});
}
@Process('email')
async processEmail(job: { data: { type: string; userId: string; email: string } }) {
const { type, email } = job.data;
// Send email via your email provider
console.log(`Sending ${type} email to ${email}`);
}
}
Recipe: Register Queue Module with Redis
// File: src/app.module.ts
import { HazelModule } from '@hazeljs/core';
import { QueueModule } from '@hazeljs/queue';
import { EmailQueue } from './email/email.queue';
@HazelModule({
imports: [
QueueModule.register({
connection: { host: 'localhost', port: 6379 },
defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
}),
],
providers: [EmailQueue],
})
export class AppModule {}
Related Resources
- Cron Package – Schedule jobs that enqueue to Queue
- Worker Package – CPU-intensive task offloading
- Kafka Package – Event streaming and processing
- BullMQ Documentation – Underlying queue library