Worker Package
The @hazeljs/worker package provides CPU-intensive task offloading via Node.js worker threads. Offload embeddings generation, data transforms, OCR, report generation, and other CPU-heavy work from the main event loop into a managed pool of worker threads.
Purpose
This package is not for HTTP scaling or clustering. It is specifically for:
- CPU-heavy tasks — Embeddings, ML preprocessing, data transforms
- File/document processing — OCR, parsing, media processing
- Report generation — PDFs, exports, aggregations
- Other workloads that would block the Node.js event loop
Workers and Worker Threads
Node.js runs on a single thread with an event loop. CPU-intensive work (heavy computations, synchronous processing) blocks that thread and stalls your entire application. Worker threads solve this by running code in separate V8 isolates:
- Isolated execution — Each worker has its own JavaScript context, heap, and event loop
- No shared memory — Data is passed via message passing (serialized/deserialized)
- True parallelism — CPU work in a worker doesn't block the main thread
- Same process — Workers run in the same process, unlike
child_processor clustering
When you call workerExecutor.execute('task-name', payload), the payload is serialized (JSON), sent to a worker, the worker loads your task handler, runs it, and sends the result back. Your main thread stays responsive.
Worker Pool
A worker pool is a fixed set of worker threads that are reused for multiple tasks. Instead of spawning a new worker per task (expensive), the pool:
- Pre-spawns workers at application startup (based on
poolSize, default:os.cpus().length - 1) - Distributes tasks using round-robin — each
execute()call goes to the next available worker - Reuses workers — After a task completes, the worker is ready for the next one
- Manages lifecycle — Handles SIGTERM/SIGINT, waits for in-flight tasks, then terminates gracefully
The pool size balances parallelism vs. resource usage. More workers = more concurrent CPU work, but more memory. The default leaves one CPU core for the main thread.
Architecture
graph TD A["@WorkerTask Decorator<br/>(Marks Classes as Tasks)"] --> B["WorkerRegistry<br/>(Task Name → Handler Path)"] B --> C["WorkerPoolManager<br/>(Pool of Worker Threads)"] C --> D["WorkerExecutor<br/>(execute API)"] D --> E["Main Thread"] C --> F["Worker 1"] C --> G["Worker 2"] C --> H["Worker N"] style A fill:#8b5cf6,stroke:#a78bfa,stroke-width:2px,color:#fff style B fill:#3b82f6,stroke:#60a5fa,stroke-width:2px,color:#fff style C fill:#3b82f6,stroke:#60a5fa,stroke-width:2px,color:#fff style D fill:#10b981,stroke:#34d399,stroke-width:2px,color:#fff
Key Components
- WorkerExecutor — Main API for executing tasks; inject this into controllers and services
- WorkerRegistry — Maps task names to handler module paths; populated by discovery or
taskRegistry - WorkerPoolManager — Manages the pool of Node.js Worker instances; round-robin task distribution
- @WorkerTask — Class decorator to define task handlers; discovery finds these in the DI container
Installation
npm install @hazeljs/worker @hazeljs/core
Quick Start
1. Define a Task
Create a class with @WorkerTask and a run(payload) method:
import { WorkerTask } from '@hazeljs/worker';
@WorkerTask({
name: 'generate-embeddings',
timeout: 15000,
maxConcurrency: 4,
})
export class GenerateEmbeddingsTask {
async run(payload: { text: string[] }) {
return expensiveEmbeddingGeneration(payload.text);
}
}
2. Configure the Module
Provide a task registry (task name → path to compiled handler):
import { HazelModule } from '@hazeljs/core';
import { WorkerModule } from '@hazeljs/worker';
import path from 'path';
@HazelModule({
imports: [
WorkerModule.forRoot({
taskRegistry: {
'generate-embeddings': path.join(__dirname, 'dist/tasks/generate-embeddings.task.js'),
},
poolSize: 4,
}),
],
providers: [GenerateEmbeddingsTask],
})
export class AppModule {}
3. Execute Tasks
import { Controller, Get } from '@hazeljs/core';
import { WorkerExecutor } from '@hazeljs/worker';
@Controller('/api')
export class EmbeddingsController {
constructor(private readonly workerExecutor: WorkerExecutor) {}
@Get('/embed')
async embed() {
const { result, durationMs } = await this.workerExecutor.execute(
'generate-embeddings',
{ text: ['hello world'] }
);
return { embeddings: result, durationMs };
}
}
Task Path Resolution
Worker threads run in a separate V8 isolate. The worker must load your task code via require(path). You provide paths in one of two ways:
Option A: Explicit taskRegistry
WorkerModule.forRoot({
taskRegistry: {
'generate-embeddings': path.join(__dirname, 'dist/tasks/generate-embeddings.task.js'),
'parse-document': path.join(__dirname, 'dist/tasks/parse-document.task.js'),
},
})
Option B: Convention (taskDirectory)
WorkerModule.forRoot({
taskDirectory: path.join(__dirname, 'dist/worker-tasks'),
})
Task name generate-embeddings maps to dist/worker-tasks/generate-embeddings.js. Add your @WorkerTask classes as providers so discovery can find them.
Complete Example
This example shows a full flow: a CPU-heavy PDF text extraction task, module config, and a controller that executes it.
1. Task handler (runs in worker thread)
// src/tasks/extract-pdf-text.task.ts
import { WorkerTask } from '@hazeljs/worker';
@WorkerTask({
name: 'extract-pdf-text',
timeout: 30000,
})
export class ExtractPdfTextTask {
async run(payload: { pdfPath: string }): Promise<{ text: string; pageCount: number }> {
// CPU-intensive: parse PDF, extract text
const { parsePdf } = await import('some-pdf-library');
const doc = await parsePdf(payload.pdfPath);
return {
text: doc.getText(),
pageCount: doc.pageCount,
};
}
}
2. App module (register WorkerModule and task)
Point taskRegistry to the compiled handler path (e.g. dist/tasks/ after build):
// src/app.module.ts
import { HazelModule } from '@hazeljs/core';
import { WorkerModule } from '@hazeljs/worker';
import path from 'path';
import { ExtractPdfTextTask } from './tasks/extract-pdf-text.task';
@HazelModule({
imports: [
WorkerModule.forRoot({
taskRegistry: {
'extract-pdf-text': path.join(__dirname, 'tasks', 'extract-pdf-text.task.js'),
},
poolSize: 4,
timeout: 30000,
}),
],
providers: [ExtractPdfTextTask],
})
export class AppModule {}
3. Controller (execute from main thread)
// src/pdf.controller.ts
import { Controller, Post, Body } from '@hazeljs/core';
import { WorkerExecutor } from '@hazeljs/worker';
@Controller('/api')
export class PdfController {
constructor(private readonly workerExecutor: WorkerExecutor) {}
@Post('/extract')
async extractText(@Body() body: { pdfPath: string }) {
const { result, durationMs } = await this.workerExecutor.execute<{
text: string;
pageCount: number;
}>('extract-pdf-text', { pdfPath: body.pdfPath });
return {
text: result.text,
pageCount: result.pageCount,
durationMs,
};
}
}
Flow: Request hits controller → WorkerExecutor.execute() serializes payload → pool picks a worker → worker loads handler, runs run(payload) → result serialized back → controller returns response. The main thread never blocks on PDF parsing.
Module Options
interface WorkerModuleOptions {
poolSize?: number; // Default: os.cpus().length - 1
taskRegistry?: Record<string, string>;
taskDirectory?: string;
timeout?: number; // Default: 30000
isGlobal?: boolean; // Default: true
gracefulShutdownTimeout?: number; // Default: 10000
}
@WorkerTask Decorator
@WorkerTask({
name: string; // Unique task identifier
timeout?: number; // Per-task timeout (ms)
maxConcurrency?: number; // Per-task concurrency limit
})
export class MyTask {
async run(payload: TInput): Promise<TOutput> {
// CPU-intensive work
}
}
WorkerExecutor API
// Execute a task
const { result, durationMs } = await workerExecutor.execute<T>(
'task-name',
payload,
{ timeout?: number }
);
// Check if task exists
workerExecutor.hasTask('task-name');
// List registered tasks
workerExecutor.getTaskNames();
Graceful Shutdown
The worker pool registers SIGTERM/SIGINT handlers. On shutdown, it waits for in-flight tasks (up to gracefulShutdownTimeout), then terminates workers.
Inspector Integration
When @hazeljs/inspector is installed, worker tasks appear at /__hazel/workers. No extra configuration needed.
Error Handling
import {
WorkerTaskNotFoundError,
WorkerTaskTimeoutError,
WorkerExecutionFailedError,
} from '@hazeljs/worker';
try {
const { result } = await workerExecutor.execute('my-task', payload);
return result;
} catch (err) {
if (err instanceof WorkerTaskNotFoundError) {
// Task not registered
} else if (err instanceof WorkerTaskTimeoutError) {
// Task exceeded timeout
} else if (err instanceof WorkerExecutionFailedError) {
// Task threw in worker
}
throw err;
}
Best Practices
- Use for CPU work only — I/O-bound work belongs on the main thread
- Keep payloads small — Data is JSON-serialized between threads
- Provide taskRegistry — Explicit paths avoid resolution issues
- Compile to JS — Point to
dist/output, not.tssource - Handle errors — Wrap
execute()in try/catch