HazelJS PubSub Package
@hazeljs/pubsub provides Google Cloud Pub/Sub integration for HazelJS with typed publishing, decorator-based consumers, and first-class acknowledgement control.
Quick Reference
- Purpose:
@hazeljs/pubsubhelps you publish and consume Pub/Sub messages inside HazelJS modules using decorators and DI services. - When to use: Use
@hazeljs/pubsubwhen your architecture already uses Google Cloud Pub/Sub for async communication, event fan-out, or cross-service decoupling. - Key concepts:
PubSubModule,PubSubPublisherService,@PubSubConsumer(),@PubSubSubscribe(),ack()/nack(), optional subscription auto-create. - Dependencies:
@hazeljs/core,@google-cloud/pubsub. - Common patterns: Register
PubSubModule.forRoot(...)-> decorate consumer classes and methods -> injectPubSubPublisherServicewhere you publish events. - Common mistakes: Forgetting IAM/subscription permissions; enabling auto-create without topic; not handling nack/retry behavior for transient failures.
Purpose
Distributed systems need reliable event delivery, fan-out, and background processing. The @hazeljs/pubsub package provides:
- Decorator-Based Consumers - Use
@PubSubConsumerand@PubSubSubscribefor declarative message handlers - Publisher Service - Inject
PubSubPublisherServiceto publish string, buffer, or object payloads - Ack/Nack Control - Automatic behavior plus manual overrides in each handler payload
- Auto Subscription Bootstrap - Optionally create missing subscriptions at runtime
- Type Safety - Typed options and handler payload contracts
Architecture
graph TD A["PubSubModule.forRoot()"] --> B["PUBSUB_CLIENT_TOKEN"] B --> C["PubSubPublisherService"] B --> D["PubSubSubscriberService"] D --> E["@PubSubConsumer + @PubSubSubscribe"] E --> F["Subscription Handlers"] C --> G["Topic Publish"]
Installation
npm install @hazeljs/pubsub
Quick Start
1. Configure PubSubModule
import { HazelModule } from '@hazeljs/core';
import { PubSubModule } from '@hazeljs/pubsub';
@HazelModule({
imports: [
PubSubModule.forRoot({
projectId: process.env.GCP_PROJECT_ID,
keyFilename: process.env.GOOGLE_APPLICATION_CREDENTIALS,
}),
],
})
export class AppModule {}
2. Publish Events
import { Service } from '@hazeljs/core';
import { PubSubPublisherService } from '@hazeljs/pubsub';
@Service()
export class OrderService {
constructor(private readonly publisher: PubSubPublisherService) {}
async createOrder(order: { id: string; total: number }) {
await this.publisher.publishJson('orders-topic', order, {
attributes: {
source: 'order-service',
version: 'v1',
},
orderingKey: order.id,
});
}
}
3. Consume with Decorators
import { Service } from '@hazeljs/core';
import {
PubSubConsumer,
PubSubSubscribe,
type PubSubSubscriptionHandlerPayload,
} from '@hazeljs/pubsub';
@PubSubConsumer({
ackOnSuccess: true,
nackOnError: true,
parseJson: true,
})
@Service()
export class OrderConsumer {
@PubSubSubscribe({
subscription: 'orders-subscription',
topic: 'orders-topic',
autoCreateSubscription: true,
})
async handleOrder(payload: PubSubSubscriptionHandlerPayload<{ id: string; total: number }>) {
console.log('Received order:', payload.data.id, payload.data.total);
}
}
4. Register Consumers as Providers
import { HazelModule } from '@hazeljs/core';
import { PubSubModule } from '@hazeljs/pubsub';
import { OrderConsumer } from './order.consumer';
@HazelModule({
imports: [PubSubModule.forRoot({ projectId: process.env.GCP_PROJECT_ID })],
providers: [OrderConsumer],
})
export class AppModule {}
Acknowledgement Model
Default behavior:
- Successful handler execution ->
ack() - Handler throws ->
nack()
Override options:
- Globally at
@PubSubConsumer({...}) - Per-subscription at
@PubSubSubscribe({...}) - Per-message by returning
'ack' | 'nack'or callingpayload.ack()/payload.nack()
Async Configuration
import { ConfigService } from '@hazeljs/config';
import { PubSubModule } from '@hazeljs/pubsub';
PubSubModule.forRootAsync({
useFactory: (config: ConfigService) => ({
projectId: config.get('GCP_PROJECT_ID'),
keyFilename: config.get('GOOGLE_APPLICATION_CREDENTIALS'),
apiEndpoint: config.get('PUBSUB_EMULATOR_HOST'),
}),
inject: [ConfigService],
});
Local Emulator
export PUBSUB_EMULATOR_HOST=localhost:8085
PubSubModule.forRoot({
projectId: 'local-project',
apiEndpoint: process.env.PUBSUB_EMULATOR_HOST,
});
API Reference
PubSubPublisherService
| Method | Description |
|---|---|
publish(topicName, data, options?) | Publish string, Buffer, or object payloads |
publishJson(topicName, data, options?) | Publish JSON payload with content-type attribute |
Decorators
| Decorator | Description |
|---|---|
@PubSubConsumer(options?) | Class-level defaults (ackOnSuccess, nackOnError, parseJson, autoCreateSubscription) |
@PubSubSubscribe(options) | Method-level subscription metadata and behavior |
PubSubModule
| Method | Description |
|---|---|
forRoot(options?) | Register Pub/Sub client synchronously |
forRootAsync({ useFactory, inject }) | Register Pub/Sub client from async config |
registerSubscriptionsFromProvider(provider) | Register subscription handlers from provider instances |
Best Practices
- Design handlers as idempotent because redelivery can happen.
- Use message attributes for metadata (source, tenant, correlation IDs).
- Use ordering keys carefully only where strict ordering matters.
- Keep handlers fast and delegate expensive work to queue/worker pipelines.
- Monitor nack rates to detect schema drift or downstream outages.
Related Resources
- Queue Package - Redis/BullMQ background jobs
- Kafka Package - Event streaming for Kafka-based systems
- Messaging Package - Channel adapters + AI messaging flows
- Google Pub/Sub Docs