Worker Package

npm downloads

The @hazeljs/worker package provides CPU-intensive task offloading via Node.js worker threads. Offload embeddings generation, data transforms, OCR, report generation, and other CPU-heavy work from the main event loop into a managed pool of worker threads.

Purpose

This package is not for HTTP scaling or clustering. It is specifically for:

  • CPU-heavy tasks — Embeddings, ML preprocessing, data transforms
  • File/document processing — OCR, parsing, media processing
  • Report generation — PDFs, exports, aggregations
  • Other workloads that would block the Node.js event loop

Workers and Worker Threads

Node.js runs on a single thread with an event loop. CPU-intensive work (heavy computations, synchronous processing) blocks that thread and stalls your entire application. Worker threads solve this by running code in separate V8 isolates:

  • Isolated execution — Each worker has its own JavaScript context, heap, and event loop
  • No shared memory — Data is passed via message passing (serialized/deserialized)
  • True parallelism — CPU work in a worker doesn't block the main thread
  • Same process — Workers run in the same process, unlike child_process or clustering

When you call workerExecutor.execute('task-name', payload), the payload is serialized (JSON), sent to a worker, the worker loads your task handler, runs it, and sends the result back. Your main thread stays responsive.

Worker Pool

A worker pool is a fixed set of worker threads that are reused for multiple tasks. Instead of spawning a new worker per task (expensive), the pool:

  • Pre-spawns workers at application startup (based on poolSize, default: os.cpus().length - 1)
  • Distributes tasks using round-robin — each execute() call goes to the next available worker
  • Reuses workers — After a task completes, the worker is ready for the next one
  • Manages lifecycle — Handles SIGTERM/SIGINT, waits for in-flight tasks, then terminates gracefully

The pool size balances parallelism vs. resource usage. More workers = more concurrent CPU work, but more memory. The default leaves one CPU core for the main thread.

Architecture

graph TD
  A["@WorkerTask Decorator<br/>(Marks Classes as Tasks)"] --> B["WorkerRegistry<br/>(Task Name → Handler Path)"]
  B --> C["WorkerPoolManager<br/>(Pool of Worker Threads)"]
  C --> D["WorkerExecutor<br/>(execute API)"]
  D --> E["Main Thread"]
  C --> F["Worker 1"]
  C --> G["Worker 2"]
  C --> H["Worker N"]
  
  style A fill:#8b5cf6,stroke:#a78bfa,stroke-width:2px,color:#fff
  style B fill:#3b82f6,stroke:#60a5fa,stroke-width:2px,color:#fff
  style C fill:#3b82f6,stroke:#60a5fa,stroke-width:2px,color:#fff
  style D fill:#10b981,stroke:#34d399,stroke-width:2px,color:#fff

Key Components

  1. WorkerExecutor — Main API for executing tasks; inject this into controllers and services
  2. WorkerRegistry — Maps task names to handler module paths; populated by discovery or taskRegistry
  3. WorkerPoolManager — Manages the pool of Node.js Worker instances; round-robin task distribution
  4. @WorkerTask — Class decorator to define task handlers; discovery finds these in the DI container

Installation

npm install @hazeljs/worker @hazeljs/core

Quick Start

1. Define a Task

Create a class with @WorkerTask and a run(payload) method:

import { WorkerTask } from '@hazeljs/worker';

@WorkerTask({
  name: 'generate-embeddings',
  timeout: 15000,
  maxConcurrency: 4,
})
export class GenerateEmbeddingsTask {
  async run(payload: { text: string[] }) {
    return expensiveEmbeddingGeneration(payload.text);
  }
}

2. Configure the Module

Provide a task registry (task name → path to compiled handler):

import { HazelModule } from '@hazeljs/core';
import { WorkerModule } from '@hazeljs/worker';
import path from 'path';

@HazelModule({
  imports: [
    WorkerModule.forRoot({
      taskRegistry: {
        'generate-embeddings': path.join(__dirname, 'dist/tasks/generate-embeddings.task.js'),
      },
      poolSize: 4,
    }),
  ],
  providers: [GenerateEmbeddingsTask],
})
export class AppModule {}

3. Execute Tasks

import { Controller, Get } from '@hazeljs/core';
import { WorkerExecutor } from '@hazeljs/worker';

@Controller('/api')
export class EmbeddingsController {
  constructor(private readonly workerExecutor: WorkerExecutor) {}

  @Get('/embed')
  async embed() {
    const { result, durationMs } = await this.workerExecutor.execute(
      'generate-embeddings',
      { text: ['hello world'] }
    );
    return { embeddings: result, durationMs };
  }
}

Task Path Resolution

Worker threads run in a separate V8 isolate. The worker must load your task code via require(path). You provide paths in one of two ways:

Option A: Explicit taskRegistry

WorkerModule.forRoot({
  taskRegistry: {
    'generate-embeddings': path.join(__dirname, 'dist/tasks/generate-embeddings.task.js'),
    'parse-document': path.join(__dirname, 'dist/tasks/parse-document.task.js'),
  },
})

Option B: Convention (taskDirectory)

WorkerModule.forRoot({
  taskDirectory: path.join(__dirname, 'dist/worker-tasks'),
})

Task name generate-embeddings maps to dist/worker-tasks/generate-embeddings.js. Add your @WorkerTask classes as providers so discovery can find them.

Complete Example

This example shows a full flow: a CPU-heavy PDF text extraction task, module config, and a controller that executes it.

1. Task handler (runs in worker thread)

// src/tasks/extract-pdf-text.task.ts
import { WorkerTask } from '@hazeljs/worker';

@WorkerTask({
  name: 'extract-pdf-text',
  timeout: 30000,
})
export class ExtractPdfTextTask {
  async run(payload: { pdfPath: string }): Promise<{ text: string; pageCount: number }> {
    // CPU-intensive: parse PDF, extract text
    const { parsePdf } = await import('some-pdf-library');
    const doc = await parsePdf(payload.pdfPath);
    return {
      text: doc.getText(),
      pageCount: doc.pageCount,
    };
  }
}

2. App module (register WorkerModule and task)

Point taskRegistry to the compiled handler path (e.g. dist/tasks/ after build):

// src/app.module.ts
import { HazelModule } from '@hazeljs/core';
import { WorkerModule } from '@hazeljs/worker';
import path from 'path';
import { ExtractPdfTextTask } from './tasks/extract-pdf-text.task';

@HazelModule({
  imports: [
    WorkerModule.forRoot({
      taskRegistry: {
        'extract-pdf-text': path.join(__dirname, 'tasks', 'extract-pdf-text.task.js'),
      },
      poolSize: 4,
      timeout: 30000,
    }),
  ],
  providers: [ExtractPdfTextTask],
})
export class AppModule {}

3. Controller (execute from main thread)

// src/pdf.controller.ts
import { Controller, Post, Body } from '@hazeljs/core';
import { WorkerExecutor } from '@hazeljs/worker';

@Controller('/api')
export class PdfController {
  constructor(private readonly workerExecutor: WorkerExecutor) {}

  @Post('/extract')
  async extractText(@Body() body: { pdfPath: string }) {
    const { result, durationMs } = await this.workerExecutor.execute<{
      text: string;
      pageCount: number;
    }>('extract-pdf-text', { pdfPath: body.pdfPath });

    return {
      text: result.text,
      pageCount: result.pageCount,
      durationMs,
    };
  }
}

Flow: Request hits controller → WorkerExecutor.execute() serializes payload → pool picks a worker → worker loads handler, runs run(payload) → result serialized back → controller returns response. The main thread never blocks on PDF parsing.

Module Options

interface WorkerModuleOptions {
  poolSize?: number;              // Default: os.cpus().length - 1
  taskRegistry?: Record<string, string>;
  taskDirectory?: string;
  timeout?: number;               // Default: 30000
  isGlobal?: boolean;             // Default: true
  gracefulShutdownTimeout?: number;  // Default: 10000
}

@WorkerTask Decorator

@WorkerTask({
  name: string;           // Unique task identifier
  timeout?: number;       // Per-task timeout (ms)
  maxConcurrency?: number; // Per-task concurrency limit
})
export class MyTask {
  async run(payload: TInput): Promise<TOutput> {
    // CPU-intensive work
  }
}

WorkerExecutor API

// Execute a task
const { result, durationMs } = await workerExecutor.execute<T>(
  'task-name',
  payload,
  { timeout?: number }
);

// Check if task exists
workerExecutor.hasTask('task-name');

// List registered tasks
workerExecutor.getTaskNames();

Graceful Shutdown

The worker pool registers SIGTERM/SIGINT handlers. On shutdown, it waits for in-flight tasks (up to gracefulShutdownTimeout), then terminates workers.

Inspector Integration

When @hazeljs/inspector is installed, worker tasks appear at /__hazel/workers. No extra configuration needed.

Error Handling

import {
  WorkerTaskNotFoundError,
  WorkerTaskTimeoutError,
  WorkerExecutionFailedError,
} from '@hazeljs/worker';

try {
  const { result } = await workerExecutor.execute('my-task', payload);
  return result;
} catch (err) {
  if (err instanceof WorkerTaskNotFoundError) {
    // Task not registered
  } else if (err instanceof WorkerTaskTimeoutError) {
    // Task exceeded timeout
  } else if (err instanceof WorkerExecutionFailedError) {
    // Task threw in worker
  }
  throw err;
}

Best Practices

  1. Use for CPU work only — I/O-bound work belongs on the main thread
  2. Keep payloads small — Data is JSON-serialized between threads
  3. Provide taskRegistry — Explicit paths avoid resolution issues
  4. Compile to JS — Point to dist/ output, not .ts source
  5. Handle errors — Wrap execute() in try/catch

What's Next?

  • Learn about Queue for Redis-backed job queues
  • Explore Inspector to view worker tasks in the dashboard
  • Check out AI for embeddings and model inference