HazelJS PubSub Package

npm downloads

@hazeljs/pubsub provides Google Cloud Pub/Sub integration for HazelJS with typed publishing, decorator-based consumers, and first-class acknowledgement control.

Quick Reference

  • Purpose: @hazeljs/pubsub helps you publish and consume Pub/Sub messages inside HazelJS modules using decorators and DI services.
  • When to use: Use @hazeljs/pubsub when your architecture already uses Google Cloud Pub/Sub for async communication, event fan-out, or cross-service decoupling.
  • Key concepts: PubSubModule, PubSubPublisherService, @PubSubConsumer(), @PubSubSubscribe(), ack() / nack(), optional subscription auto-create.
  • Dependencies: @hazeljs/core, @google-cloud/pubsub.
  • Common patterns: Register PubSubModule.forRoot(...) -> decorate consumer classes and methods -> inject PubSubPublisherService where you publish events.
  • Common mistakes: Forgetting IAM/subscription permissions; enabling auto-create without topic; not handling nack/retry behavior for transient failures.

Purpose

Distributed systems need reliable event delivery, fan-out, and background processing. The @hazeljs/pubsub package provides:

  • Decorator-Based Consumers - Use @PubSubConsumer and @PubSubSubscribe for declarative message handlers
  • Publisher Service - Inject PubSubPublisherService to publish string, buffer, or object payloads
  • Ack/Nack Control - Automatic behavior plus manual overrides in each handler payload
  • Auto Subscription Bootstrap - Optionally create missing subscriptions at runtime
  • Type Safety - Typed options and handler payload contracts

Architecture

graph TD
  A["PubSubModule.forRoot()"] --> B["PUBSUB_CLIENT_TOKEN"]
  B --> C["PubSubPublisherService"]
  B --> D["PubSubSubscriberService"]
  D --> E["@PubSubConsumer + @PubSubSubscribe"]
  E --> F["Subscription Handlers"]
  C --> G["Topic Publish"]

Installation

npm install @hazeljs/pubsub

Quick Start

1. Configure PubSubModule

import { HazelModule } from '@hazeljs/core';
import { PubSubModule } from '@hazeljs/pubsub';

@HazelModule({
  imports: [
    PubSubModule.forRoot({
      projectId: process.env.GCP_PROJECT_ID,
      keyFilename: process.env.GOOGLE_APPLICATION_CREDENTIALS,
    }),
  ],
})
export class AppModule {}

2. Publish Events

import { Service } from '@hazeljs/core';
import { PubSubPublisherService } from '@hazeljs/pubsub';

@Service()
export class OrderService {
  constructor(private readonly publisher: PubSubPublisherService) {}

  async createOrder(order: { id: string; total: number }) {
    await this.publisher.publishJson('orders-topic', order, {
      attributes: {
        source: 'order-service',
        version: 'v1',
      },
      orderingKey: order.id,
    });
  }
}

3. Consume with Decorators

import { Service } from '@hazeljs/core';
import {
  PubSubConsumer,
  PubSubSubscribe,
  type PubSubSubscriptionHandlerPayload,
} from '@hazeljs/pubsub';

@PubSubConsumer({
  ackOnSuccess: true,
  nackOnError: true,
  parseJson: true,
})
@Service()
export class OrderConsumer {
  @PubSubSubscribe({
    subscription: 'orders-subscription',
    topic: 'orders-topic',
    autoCreateSubscription: true,
  })
  async handleOrder(payload: PubSubSubscriptionHandlerPayload<{ id: string; total: number }>) {
    console.log('Received order:', payload.data.id, payload.data.total);
  }
}

4. Register Consumers as Providers

import { HazelModule } from '@hazeljs/core';
import { PubSubModule } from '@hazeljs/pubsub';
import { OrderConsumer } from './order.consumer';

@HazelModule({
  imports: [PubSubModule.forRoot({ projectId: process.env.GCP_PROJECT_ID })],
  providers: [OrderConsumer],
})
export class AppModule {}

Acknowledgement Model

Default behavior:

  • Successful handler execution -> ack()
  • Handler throws -> nack()

Override options:

  • Globally at @PubSubConsumer({...})
  • Per-subscription at @PubSubSubscribe({...})
  • Per-message by returning 'ack' | 'nack' or calling payload.ack() / payload.nack()

Async Configuration

import { ConfigService } from '@hazeljs/config';
import { PubSubModule } from '@hazeljs/pubsub';

PubSubModule.forRootAsync({
  useFactory: (config: ConfigService) => ({
    projectId: config.get('GCP_PROJECT_ID'),
    keyFilename: config.get('GOOGLE_APPLICATION_CREDENTIALS'),
    apiEndpoint: config.get('PUBSUB_EMULATOR_HOST'),
  }),
  inject: [ConfigService],
});

Local Emulator

export PUBSUB_EMULATOR_HOST=localhost:8085
PubSubModule.forRoot({
  projectId: 'local-project',
  apiEndpoint: process.env.PUBSUB_EMULATOR_HOST,
});

API Reference

PubSubPublisherService

MethodDescription
publish(topicName, data, options?)Publish string, Buffer, or object payloads
publishJson(topicName, data, options?)Publish JSON payload with content-type attribute

Decorators

DecoratorDescription
@PubSubConsumer(options?)Class-level defaults (ackOnSuccess, nackOnError, parseJson, autoCreateSubscription)
@PubSubSubscribe(options)Method-level subscription metadata and behavior

PubSubModule

MethodDescription
forRoot(options?)Register Pub/Sub client synchronously
forRootAsync({ useFactory, inject })Register Pub/Sub client from async config
registerSubscriptionsFromProvider(provider)Register subscription handlers from provider instances

Best Practices

  1. Design handlers as idempotent because redelivery can happen.
  2. Use message attributes for metadata (source, tenant, correlation IDs).
  3. Use ordering keys carefully only where strict ordering matters.
  4. Keep handlers fast and delegate expensive work to queue/worker pipelines.
  5. Monitor nack rates to detect schema drift or downstream outages.