HazelJS Queue Package

npm downloads

@hazeljs/queue provides Redis-backed job queues for HazelJS using BullMQ — enqueue background jobs, process with workers, and handle retries, delays, and priorities.

Quick Reference

  • Purpose: @hazeljs/queue provides background job processing with Redis-backed queues (BullMQ), decorator-based workers, retries, delays, priorities, and job lifecycle management.
  • When to use: Use @hazeljs/queue for background job processing (email sending, file processing, AI tasks). Use @hazeljs/kafka for event streaming between microservices instead. Use @hazeljs/cron for scheduled recurring tasks.
  • Key concepts: QueueModule.forRoot(), @Processor() worker decorator, @Process() job handler decorator, job enqueueing, retries with backoff, delayed jobs, job priorities, job events.
  • Dependencies: @hazeljs/core, bullmq, ioredis.
  • Common patterns: Register QueueModule.forRoot({ connection: { host, port } }) → enqueue jobs with queue.add(name, data) → process with @Processor('queue-name') class and @Process() method.
  • Common mistakes: Not running a separate worker process in production; not configuring retry/backoff for flaky jobs; not monitoring failed jobs; Redis connection not configured.

Purpose

Building scalable applications requires offloading long-running tasks to background workers. The @hazeljs/queue package simplifies this by providing:

  • Redis-Backed Queues – Uses BullMQ for reliable, distributed job processing
  • QueueService – Injectable service for adding jobs from controllers and services
  • @Queue Decorator – Mark methods as job processors for worker setup
  • Job Options – Delay, priority, retry attempts, backoff strategies, timeouts
  • Cron Integration – Works with @hazeljs/cron for distributed scheduled jobs
  • Worker Management – Separate worker processes or in-process workers
  • Job Monitoring – Track job status, failures, and completions

Architecture

graph TD
  A["Controller/Service"] --> B["QueueService.add()"]
  B --> C["Redis Queue<br/>(BullMQ)"]
  
  C --> D["Worker Process 1"]
  C --> E["Worker Process 2"]
  C --> F["Worker Process N"]
  
  D --> G["@Queue Processor"]
  E --> G
  F --> G
  
  G --> H{Job Result}
  H -->|Success| I["Completed"]
  H -->|Failure| J["Retry Logic"]
  J -->|Max Attempts| K["Failed"]
  J -->|Retry| C
  
  style A fill:#3b82f6,stroke:#60a5fa,stroke-width:2px,color:#fff
  style B fill:#3b82f6,stroke:#60a5fa,stroke-width:2px,color:#fff
  style C fill:#10b981,stroke:#34d399,stroke-width:2px,color:#fff
  style D fill:#8b5cf6,stroke:#a78bfa,stroke-width:2px,color:#fff
  style E fill:#8b5cf6,stroke:#a78bfa,stroke-width:2px,color:#fff
  style F fill:#8b5cf6,stroke:#a78bfa,stroke-width:2px,color:#fff

Key Components

  1. QueueModule – Registers QueueService and manages queue connections
  2. QueueService – Injectable service for adding jobs to queues
  3. @Queue Decorator – Marks methods as job processors
  4. BullMQ Worker – Processes jobs from Redis queues
  5. Job Options – Configure delay, retry, priority, backoff

Installation

npm install @hazeljs/queue ioredis

Quick Start

1. Import QueueModule

import { HazelModule } from '@hazeljs/core';
import { QueueModule } from '@hazeljs/queue';

@HazelModule({
  imports: [
    QueueModule.forRoot({
      connection: {
        host: process.env.REDIS_HOST || 'localhost',
        port: parseInt(process.env.REDIS_PORT || '6379', 10),
      },
    }),
  ],
})
export class AppModule {}

2. Add Jobs from Controllers

import { Injectable } from '@hazeljs/core';
import { QueueService } from '@hazeljs/queue';

@Injectable()
export class EmailService {
  constructor(private queue: QueueService) {}

  async sendWelcomeEmail(userId: string, email: string) {
    await this.queue.add('emails', 'welcome', { userId, email });
  }

  async sendDelayedReminder(userId: string, delayMs: number) {
    await this.queue.addDelayed('emails', 'reminder', { userId }, delayMs);
  }

  async processWithRetry(data: { orderId: string }) {
    await this.queue.addWithRetry('orders', 'process', data, {
      attempts: 3,
      backoff: { type: 'exponential', delay: 1000 },
    });
  }
}

3. Process Jobs with BullMQ Worker

Create a worker process (or run alongside your app):

import { Worker } from 'bullmq';

const worker = new Worker(
  'emails',
  async (job) => {
    if (job.name === 'welcome') {
      await sendWelcomeEmail(job.data.userId, job.data.email);
    } else if (job.name === 'reminder') {
      await sendReminder(job.data.userId);
    }
  },
  {
    connection: {
      host: process.env.REDIS_HOST || 'localhost',
      port: parseInt(process.env.REDIS_PORT || '6379', 10),
    },
  }
);

worker.on('completed', (job) => console.log(`Job ${job.id} completed`));
worker.on('failed', (job, err) => console.error(`Job ${job?.id} failed:`, err));

4. Using @Queue Decorator

Mark methods as job processors for metadata-driven worker setup:

import { Injectable } from '@hazeljs/core';
import { Queue } from '@hazeljs/queue';

@Injectable()
export class EmailProcessor {
  @Queue('emails')
  async handleWelcome(job: { data: { userId: string; email: string } }) {
    await this.sendWelcome(job.data.userId, job.data.email);
  }

  @Queue('emails')
  async handleReminder(job: { data: { userId: string } }) {
    await this.sendReminder(job.data.userId);
  }

  private async sendWelcome(userId: string, email: string) {
    // Send welcome email
  }

  private async sendReminder(userId: string) {
    // Send reminder
  }
}

Job Options

Delay

Schedule jobs to run in the future:

// Run in 5 minutes
await queueService.addDelayed('notifications', 'reminder', { userId: '123' }, 5 * 60 * 1000);

Priority

Higher priority jobs are processed first:

await queueService.add('orders', 'process', { orderId: '123' }, {
  priority: 1, // Higher = processed first
});

Retry with Backoff

Configure automatic retries with exponential or fixed backoff:

await queueService.addWithRetry('payments', 'charge', { orderId: '123' }, {
  attempts: 5,
  backoff: {
    type: 'exponential', // or 'fixed'
    delay: 1000, // Initial delay in ms
  },
});

Timeout

Set a maximum execution time for jobs:

await queueService.add('reports', 'generate', { reportId: '123' }, {
  timeout: 30000, // 30 seconds
});

Integration with Cron

For distributed cron jobs, enqueue work from cron handlers instead of executing inline:

import { Injectable } from '@hazeljs/core';
import { Cron, CronExpression } from '@hazeljs/cron';
import { QueueService } from '@hazeljs/queue';

@Injectable()
export class TaskService {
  constructor(private queue: QueueService) {}

  @Cron({
    name: 'daily-cleanup',
    cronTime: CronExpression.EVERY_DAY_AT_MIDNIGHT,
  })
  async triggerCleanup() {
    // Enqueue for distributed processing
    await this.queue.add('maintenance', 'daily-cleanup', {});
  }

  @Cron({
    name: 'hourly-reports',
    cronTime: CronExpression.EVERY_HOUR,
  })
  async triggerReports() {
    await this.queue.add('reports', 'hourly-summary', {
      timestamp: new Date().toISOString(),
    });
  }
}

Benefits:

  • Cron triggers are lightweight (just enqueue)
  • Workers can scale independently
  • Failed jobs are retried automatically
  • No blocking on the cron scheduler

API Reference

QueueService

MethodParametersDescription
add(queueName, jobName, data?, options?)Queue name, job name, data, optionsAdd a job to the queue
addDelayed(queueName, jobName, data, delayMs)Queue name, job name, data, delayAdd a delayed job
addWithRetry(queueName, jobName, data, options)Queue name, job name, data, retry optionsAdd with retry configuration
getQueue(name)Queue nameGet BullMQ Queue instance
close()NoneClose all queue connections

Job Options (JobsOptions)

OptionTypeDescription
delaynumberDelay before processing (milliseconds)
prioritynumberJob priority (higher = processed first)
attemptsnumberMaximum retry attempts
backoff{ type: 'fixed' | 'exponential', delay: number }Retry backoff strategy
timeoutnumberJob timeout (milliseconds)
removeOnCompletebooleanRemove job after completion
removeOnFailbooleanRemove job after failure

@Queue Decorator

@Queue(queueName: string)

Marks a method as a job processor for the specified queue.

Use Cases

Email Notifications

@Injectable()
export class NotificationService {
  constructor(private queue: QueueService) {}

  async sendOrderConfirmation(orderId: string, email: string) {
    await this.queue.add('emails', 'order-confirmation', { orderId, email });
  }

  async sendShippingUpdate(orderId: string, trackingNumber: string) {
    await this.queue.add('emails', 'shipping-update', { orderId, trackingNumber });
  }
}

Report Generation

@Injectable()
export class ReportService {
  constructor(private queue: QueueService) {}

  async generateMonthlyReport(userId: string, month: string) {
    await this.queue.add('reports', 'monthly', { userId, month }, {
      timeout: 60000, // 1 minute timeout
      priority: 2,
    });
  }
}

Data Processing

@Injectable()
export class DataService {
  constructor(private queue: QueueService) {}

  async processUpload(fileId: string) {
    await this.queue.addWithRetry('data-processing', 'upload', { fileId }, {
      attempts: 3,
      backoff: { type: 'exponential', delay: 2000 },
    });
  }
}

Agent Tasks

@Injectable()
export class AgentService {
  constructor(private queue: QueueService) {}

  async runAgentWorkflow(agentId: string, input: string) {
    await this.queue.add('agents', 'execute', { agentId, input }, {
      timeout: 120000, // 2 minutes
      attempts: 2,
    });
  }
}

Advantages

1. Reliable Processing

Redis-backed queues ensure jobs aren't lost even if workers crash.

2. Horizontal Scalability

Add more worker processes to handle increased load.

3. Automatic Retries

Configurable retry logic with exponential or fixed backoff.

4. Job Prioritization

Process high-priority jobs first.

5. Delayed Execution

Schedule jobs to run in the future.

6. Distributed Cron

Combine with @hazeljs/cron for scalable scheduled tasks.

Best Practices

  1. Separate Worker Processes – Run workers in separate processes for better isolation
  2. Set Timeouts – Always set timeouts for long-running jobs
  3. Use Retries Wisely – Configure appropriate retry attempts and backoff
  4. Monitor Failures – Set up alerts for failed jobs
  5. Clean Up Completed Jobs – Use removeOnComplete to avoid Redis bloat
  6. Use Priorities – Prioritize critical jobs (payments, notifications)

Recipes

Recipe: Email Queue with Processor

// File: src/email/email.queue.ts
import { Service } from '@hazeljs/core';
import { QueueService, Process } from '@hazeljs/queue';

@Service()
export class EmailQueue {
  constructor(private readonly queue: QueueService) {}

  async sendWelcomeEmail(userId: string, email: string) {
    await this.queue.add('email', {
      type: 'welcome',
      userId,
      email,
    });
  }

  @Process('email')
  async processEmail(job: { data: { type: string; userId: string; email: string } }) {
    const { type, email } = job.data;
    // Send email via your email provider
    console.log(`Sending ${type} email to ${email}`);
  }
}

Recipe: Register Queue Module with Redis

// File: src/app.module.ts
import { HazelModule } from '@hazeljs/core';
import { QueueModule } from '@hazeljs/queue';
import { EmailQueue } from './email/email.queue';

@HazelModule({
  imports: [
    QueueModule.register({
      connection: { host: 'localhost', port: 6379 },
      defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
    }),
  ],
  providers: [EmailQueue],
})
export class AppModule {}