Data Package
The @hazeljs/data package provides data processing and ETL capabilities for HazelJS applications. It includes declarative pipelines with schema validation, stream processing, built-in transformers, data quality checks, and Apache Flink integration.
Purpose
Building data pipelines typically requires orchestrating multiple steps—normalization, validation, enrichment—with proper error handling and schema enforcement. The @hazeljs/data package simplifies this by providing:
- Declarative Pipelines – Use
@Pipeline,@Transform, and@Validatedecorators to define ETL flows - Fluent Schema API – Zod-like validation with
Schema.string(),Schema.number(),Schema.object(),Schema.array(),Schema.literal(),Schema.union(), plus.optional(),.nullable(),.default(),.transform(),.refine(), and full TypeScript inference viaInfer<T> - ETL Service – Execute multi-step pipelines with conditional steps, per-step retry/timeout, and dead letter queue (DLQ)
- PipelineBuilder – Programmatic, immutable pipeline DSL with
.branch(),.parallel(),.catch() - Built-in Transformers – trimString, toLowerCase, toUpperCase, parseJson, stringifyJson, pick, omit, renameKeys
- Data Quality – QualityService with completeness, notNull, uniqueness, range, pattern, referentialIntegrity, profiling, and anomaly detection
- Stream Processing – StreamProcessor with tumbling/sliding/session windows and stream join
- Connectors – DataSource/DataSink interfaces with MemorySource, MemorySink, CsvSource, HttpSource
- PII Safety –
@Mask,@Redact,@Encrypt,@Decryptdecorators for sensitive data - Test Utilities – SchemaFaker, PipelineTestHarness, MockSource, MockSink
- Flink Integration – Optional Apache Flink deployment for distributed stream processing
Architecture
The package uses a decorator-driven pipeline architecture with service orchestration:
graph TD A["@Pipeline Decorator<br/>(Defines Pipeline Class)"] --> B["@Transform / @Validate<br/>(Step Decorators)"] B --> C["ETLService<br/>(Orchestrates Execution)"] C --> D["PipelineBase.execute()<br/>(Single Entry Point)"] E["SchemaValidator<br/>(Fluent Schema)"] --> F["@Validate Step"] G["StreamService"] --> H["Batch Processing<br/>(processBatch)"] G --> I["StreamProcessor<br/>(Streaming + Windowing)"] C --> J["Step 1 → Step 2 → Step 3"] style A fill:#8b5cf6,stroke:#a78bfa,stroke-width:2px,color:#fff style B fill:#3b82f6,stroke:#60a5fa,stroke-width:2px,color:#fff style C fill:#10b981,stroke:#34d399,stroke-width:2px,color:#fff style D fill:#f59e0b,stroke:#fbbf24,stroke-width:2px,color:#fff
Key Components
- DataModule – Registers SchemaValidator, ETLService, PipelineBuilder, StreamService, QualityService, FlinkService
- ETLService – Executes pipelines by invoking decorated steps in order; supports
when,retry,timeoutMs,dlq,executeBatch,onStepComplete - PipelineBase – Base class providing
execute()for pipelines (inject ETLService) - PipelineBuilder – Immutable fluent API for programmatic pipelines with branch, parallel, catch
- SchemaValidator – Validates data against fluent schemas; use
validate()orsafeValidate() - StreamProcessor – In-process streaming with tumblingWindow, slidingWindow, sessionWindow, joinStreams
- Decorators –
@Pipeline,@Transform,@Validate,@Stream,@Mask,@Redact,@Encrypt
Advantages
1. Declarative ETL
Define pipelines with decorators—no manual step wiring or execution logic. Add conditional steps with when, retry with backoff, timeouts, and DLQ for graceful failure handling.
2. Type-Safe Validation
Fluent Schema API with full TypeScript inference (Infer<typeof Schema>). Supports string, number, boolean, date, object, array, literal, union, plus optional, nullable, default, transform, refine, refineAsync.
3. Programmatic Pipelines
Use PipelineBuilder for pipelines defined in code: .addTransform(), .addValidate(), .branch(), .parallel(), .catch(), .toSchema().
4. Data Quality Built-In
QualityService for completeness, notNull, uniqueness, range, pattern, referentialIntegrity, plus profile() for field stats and detectAnomalies() for outlier detection. Quality score 0–100.
5. Batch and Streaming
StreamService for batch; StreamProcessor for in-process streaming with windowing and join. Optional Flink for distributed processing.
Installation
npm install @hazeljs/data @hazeljs/core
Quick Start
1. Import DataModule
import { HazelApp } from '@hazeljs/core';
import { DataModule } from '@hazeljs/data';
const app = new HazelApp({
imports: [DataModule.forRoot()],
});
app.listen(3000);
2. Define a Pipeline
import { Injectable } from '@hazeljs/core';
import {
Pipeline,
PipelineBase,
Transform,
Validate,
ETLService,
Schema,
Infer,
} from '@hazeljs/data';
const OrderSchema = Schema.object({
id: Schema.string().required(),
customerId: Schema.string().required(),
status: Schema.string().oneOf(['pending', 'paid', 'shipped', 'delivered', 'cancelled']),
items: Schema.array(
Schema.object({
sku: Schema.string().min(1),
qty: Schema.number().min(1),
price: Schema.number().min(0),
})
),
});
type Order = Infer<typeof OrderSchema>;
@Pipeline('order-processing')
@Injectable()
export class OrderProcessingPipeline extends PipelineBase {
constructor(etlService: ETLService) {
super(etlService);
}
@Transform({ step: 1, name: 'normalize' })
async normalize(data: Record<string, unknown>): Promise<Record<string, unknown>> {
return {
...data,
status: String(data.status).toLowerCase(),
};
}
@Validate({ step: 2, name: 'validate', schema: OrderSchema })
async validate(data: Order): Promise<Order> {
return data;
}
@Transform({ step: 3, name: 'enrich' })
async enrich(data: Order): Promise<Order & { subtotal: number; tax: number; total: number }> {
const items = data.items ?? [];
const subtotal = items.reduce((sum, i) => sum + i.qty * i.price, 0);
const tax = subtotal * 0.1;
return {
...data,
subtotal,
tax,
total: subtotal + tax,
};
}
}
3. Execute from a Controller
import { Controller, Post, Body, Inject } from '@hazeljs/core';
import { OrderProcessingPipeline } from './pipelines/order-processing.pipeline';
@Controller('data')
export class DataController {
constructor(
@Inject(OrderProcessingPipeline) private pipeline: OrderProcessingPipeline
) {}
@Post('pipeline/orders')
async processOrder(@Body() body: unknown) {
const result = await this.pipeline.execute(body);
return { ok: true, data: result };
}
}
Schema Validation
Build schemas with the fluent API. Full type inference via Infer<T>:
import { Schema, Infer } from '@hazeljs/data';
const UserSchema = Schema.object({
email: Schema.string().email(),
name: Schema.string().min(1).max(200),
age: Schema.number().min(0).max(150),
role: Schema.string().oneOf(['user', 'admin', 'moderator', 'guest']),
active: Schema.boolean().default(true),
tags: Schema.array(Schema.string()).optional(),
});
type User = Infer<typeof UserSchema>;
// Validate
const result = UserSchema.validate(rawData);
if (result.success) {
const user: User = result.data;
} else {
console.error(result.errors);
}
// Or use SchemaValidator for DI
const validator = new SchemaValidator();
const user = validator.validate(UserSchema, rawData); // throws on failure
const safe = validator.safeValidate(UserSchema, rawData); // returns { success, data } or { success, errors }
Schema Types and Modifiers
| Type | Example |
|---|---|
Schema.string() | .email(), .url(), .min(), .max(), .uuid(), .oneOf(), .pattern(), .required(), .trim() |
Schema.number() | .min(), .max(), .integer(), .positive(), .negative(), .multipleOf() |
Schema.boolean() | .default() |
Schema.date() | .min(), .max(), .default() |
Schema.object({...}) | .strict(), .pick(), .omit(), .extend() |
Schema.array(itemSchema) | .min(), .max(), .nonempty() |
Schema.literal(value) | Literal values |
Schema.union([a, b]) | Discriminated unions |
| Modifiers | .optional(), .nullable(), .default(), .transform(), .refine(), .refineAsync() |
| Utilities | .toJsonSchema(), Infer<typeof Schema> |
Pipeline Options
Steps support conditional execution, retry, timeout, and DLQ:
@Transform({
step: 2,
name: 'enrich',
when: (data) => (data as { type: string }).type === 'order',
retry: { attempts: 3, delay: 500, backoff: 'exponential' },
timeoutMs: 5000,
dlq: { handler: (item, err, step) => logger.error('DLQ', { item, err, step }) },
})
async enrich(data: unknown) {
return { ...data, enriched: true };
}
PipelineBuilder (Programmatic Pipelines)
Build pipelines in code without decorators:
import { PipelineBuilder } from '@hazeljs/data';
const pipeline = new PipelineBuilder('orders')
.addTransform('normalize', (d) => ({ ...d, email: (d as { email: string }).email?.toLowerCase() }))
.branch(
'classify',
(d) => (d as { type: string }).type === 'premium',
(b) => b.addTransform('enrichPremium', enrichPremium),
(b) => b.addTransform('enrichStandard', enrichStandard)
)
.parallel('enrich', [
(d) => ({ ...d, a: 1 }),
(d) => ({ ...d, b: 2 }),
])
.addValidate('validate', (d) => d)
.catch((data, err) => ({ ...data, error: err.message }));
const result = await pipeline.execute(rawData);
const def = pipeline.toSchema(); // Serializable pipeline definition
Batch and Stream Processing
import { StreamService, ETLService, StreamProcessor } from '@hazeljs/data';
// Batch
const streamService = new StreamService(etlService);
const results = await streamService.processBatch(pipeline, orders);
// Streaming
async function* source() {
yield { v: 1 };
yield { v: 2 };
}
for await (const r of streamService.processStream(pipeline, source())) {
console.log(r);
}
// Windowing (StreamProcessor)
const processor = new StreamProcessor(etlService);
for await (const batch of processor.tumblingWindow(timestampedSource, 60_000)) {
console.log(batch.items, batch.windowStart, batch.windowEnd);
}
// Also: slidingWindow, sessionWindow, joinStreams
Data Quality
import { QualityService } from '@hazeljs/data';
const qualityService = new QualityService();
// Register built-in checks
qualityService.registerCheck('completeness', qualityService.completeness(['id', 'email']));
qualityService.registerCheck('notNull', qualityService.notNull(['id', 'status']));
qualityService.registerCheck('uniqueness', qualityService.uniqueness(['id']));
qualityService.registerCheck('range', qualityService.range('age', { min: 0, max: 120 }));
qualityService.registerCheck('pattern', qualityService.pattern('phone', /^\d{10}$/));
qualityService.registerCheck('ref', qualityService.referentialIntegrity('status', ['active', 'inactive']));
const report = await qualityService.runChecks('users', records);
console.log(report.score, report.passed, report.checks);
// Profiling and anomaly detection
const profile = qualityService.profile('users', records);
const anomalies = qualityService.detectAnomalies(records, ['value'], 2);
Connectors
import { MemorySource, MemorySink, CsvSource, HttpSource } from '@hazeljs/data';
// In-memory (for tests)
const source = new MemorySource([{ id: 1 }, { id: 2 }]);
const sink = new MemorySink();
const records = await source.readAll();
await sink.writeBatch(records);
// CSV
const csvSource = new CsvSource({ path: 'data.csv' });
const csvSink = new CsvSink({ path: 'out.csv' });
// HTTP API
const httpSource = new HttpSource({ url: 'https://api.example.com/data' });
PII Decorators
Mask, redact, or encrypt sensitive fields before pipeline steps run:
import { Transform, Mask, Redact, Encrypt } from '@hazeljs/data';
@Transform({ step: 1, name: 'sanitize' })
@Mask({ fields: ['email', 'ssn'], showLast: 4 })
sanitize(data: User) {
return data; // email/ssn already masked
}
@Transform({ step: 2, name: 'redact' })
@Redact({ fields: ['internalId'] })
redact(data: Record<string, unknown>) {
return data; // internalId removed
}
Test Utilities
import { SchemaFaker, PipelineTestHarness, MockSource, MockSink } from '@hazeljs/data';
// Generate fake data from schema
const fake = SchemaFaker.generate(UserSchema);
const many = SchemaFaker.generateMany(UserSchema, 10);
// Pipeline test harness (captures per-step events)
const harness = PipelineTestHarness.create(etlService, pipeline);
const { result, events } = await harness.run(input);
await harness.runAndAssertSuccess(input); // throws if any step failed
// Mock source/sink (alias for MemorySource/MemorySink)
const source = new MockSource([{ x: 1 }]);
const sink = new MockSink();
Built-in Transformers
| Transformer | Description |
|---|---|
trimString | Trim whitespace from strings |
toLowerCase / toUpperCase | Case conversion |
parseJson / stringifyJson | JSON parsing and serialization |
pick | Select specific keys from objects |
omit | Remove specific keys from objects |
renameKeys | Rename object keys |
Flink Configuration
For distributed stream processing with Apache Flink:
DataModule.forRoot({
flink: {
url: process.env.FLINK_REST_URL ?? 'http://localhost:8081',
timeout: 30000,
},
});
Flink Use Case: Real-Time Event Processing
Use @Stream and FlinkService when you need distributed, fault-tolerant stream processing (e.g. Kafka → transform → Kafka) instead of in-process StreamProcessor. Typical use cases: clickstream enrichment, log aggregation, real-time analytics, or event-driven pipelines that must scale across workers and survive restarts.
Flink integration scope: The @hazeljs/data package only helps you manage Flink jobs (config, deploy, operate). It does not write, execute, or transform data inside Flink; your JAR or SQL does that on the cluster.
Node vs Flink cluster: Your TypeScript pipeline defines the topology and config (source/sink, steps, parallelism). Flink itself runs JVM jobs (Java/Scala). For cluster execution you either deploy via Flink SQL (no JAR) or provide a pre-built Flink JAR that implements the same logic; the package uploads and runs that JAR with the config generated from your pipeline.
How does a HazelJS pipeline get onto the Flink cluster? The package does not compile or translate your Node.js pipeline into a Flink job. It does two things: (1) Builds a description from your @Stream pipeline—job name, parallelism, checkpoint settings, and a job graph (Kafka source/sink topics, step names, operator types like map/filter). Your TypeScript transform logic is not sent to Flink. (2) Uses the Flink REST API to upload a JAR and start a job with that config (name, parallelism, optional program args). So the pipeline you write in Node is a design-time and config source: it produces config + graph you can use to implement the same flow in a Java/Scala Flink job (or in SQL), then deploy that JAR (or SQL) via this package. There is no built-in “HazelJS → Flink” codegen; you bring the JAR or SQL that runs on the cluster.
1. Define a streaming pipeline with @Stream
The pipeline reads from a Kafka topic, runs your @Transform steps, and writes to another topic. Source and sink use the kafka://topic-name form.
import { Injectable } from '@hazeljs/core';
import {
Stream,
PipelineBase,
Transform,
ETLService,
FlinkService,
Schema,
Infer,
} from '@hazeljs/data';
const EventSchema = Schema.object({
eventId: Schema.string().required(),
userId: Schema.string().required(),
eventType: Schema.string().oneOf(['click', 'view', 'purchase']),
timestamp: Schema.number().required(),
payload: Schema.object({}).optional(), // extend with known keys or leave flexible
});
type Event = Infer<typeof EventSchema>;
@Stream({
name: 'user-events-enrichment',
source: 'kafka://user-events-raw',
sink: 'kafka://user-events-enriched',
parallelism: 4,
})
@Injectable()
export class UserEventsStreamPipeline extends PipelineBase {
constructor(etlService: ETLService) {
super(etlService);
}
@Transform({ step: 1, name: 'parse' })
async parse(event: Record<string, unknown>): Promise<Event> {
const parsed = EventSchema.validate(event);
if (!parsed.success) throw new Error(JSON.stringify(parsed.errors));
return parsed.data as Event;
}
@Transform({ step: 2, name: 'enrich' })
async enrich(event: Event): Promise<Event & { processedAt: number }> {
return {
...event,
processedAt: Date.now(),
};
}
@Transform({ step: 3, name: 'filterInvalid' })
async filterInvalid(data: Event & { processedAt: number }) {
if (!data.userId || !data.eventId) return null;
return data;
}
}
Where does this logic run? The parse, enrich, and filterInvalid code above runs only in Node.js—for example when you use StreamProcessor or ETLService to process a stream in-process. It does not run on the Flink cluster. Flink executes JVM (Java/Scala) or SQL; the package does not translate this TypeScript to Flink. So when you deploy to a Flink cluster, the same behavior (parse → validate, enrich with timestamp, filter invalid) must be implemented inside your Flink JAR (e.g. as MapFunction/ProcessFunction) or in Flink SQL. This pipeline class is the design and config source (topology, step names, source/sink); the logic that runs on the cluster is whatever you implement in the JAR or SQL.
How do you actually get Flink’s distributed processing? Distributed execution comes from the job you run on the cluster—your JAR or SQL. Flink schedules that job across task managers, scales with parallelism, does checkpointing and recovery, and reads/writes Kafka in a distributed way. The HazelJS pipeline does not become that job. What the package gives you is: (1) One place to define topology and config (source/sink topics, step order, parallelism) in TypeScript, so your Node app and your Flink job stay aligned. (2) Deployment and operations from Node: upload the JAR, start the job with that config (name, parallelism), list jobs, create savepoints, cancel. (3) Optional in-process use of the same pipeline via StreamProcessor for dev or small-scale streaming. So you “take advantage” of Flink by implementing the pipeline in a JAR (or SQL) and using this package to deploy and manage it with config derived from the Node pipeline; the distribution is entirely in the Flink job you provide.
2. Configure Flink and deploy
Configure the Flink cluster URL (and optional auth) via DataModule.forRoot(), then inject FlinkService to deploy the pipeline. You can override job name, parallelism, or checkpoint interval.
// app.module.ts
DataModule.forRoot({
flink: {
url: process.env.FLINK_REST_URL ?? 'http://localhost:8081',
timeout: 30000,
auth: process.env.FLINK_TOKEN
? { type: 'token', token: process.env.FLINK_TOKEN }
: undefined,
},
});
// In a controller or bootstrap service
@Injectable()
export class StreamDeploymentService {
constructor(
private readonly flinkService: FlinkService,
@Inject(UserEventsStreamPipeline) private readonly pipeline: UserEventsStreamPipeline
) {}
async deploy() {
const result = await this.flinkService.deployStream(this.pipeline, {
jobName: 'user-events-enrichment-v1',
parallelism: 4,
checkpointInterval: 60_000,
});
if (result.jobId) {
console.log('Flink job submitted:', result.jobId, result.webUI);
} else {
console.log('Job config generated (submit with JAR):', result.jobConfig);
}
return result;
}
}
3. Deploy with a JAR (production)
Apache Flink runs JVM jobs (Java/Scala), not Node.js. The Node pipeline you define with @Stream and @Transform is used to generate job config and a job graph (source/sink topics, step names, parallelism). The actual execution on the cluster requires a separate Flink JAR that you build with Java or Scala (e.g. a Flink DataStream job that implements the same topology, or a generic runner that reads the generated config).
Use deployStreamWithJar to upload that JAR to the Flink cluster and start the job with the config (e.g. job name, parallelism) derived from your pipeline. The JAR is not produced from the Node module—it is a pre-built Flink application that you provide.
// pipeline = your @Stream-decorated instance (used for config/graph)
// jarFile = path to your Java/Scala Flink job JAR (built with Maven/Gradle/sbt)
const result = await this.flinkService.deployStreamWithJar(
this.pipeline,
'/path/to/your-flink-job.jar',
{ parallelism: 8 }
);
console.log('Job ID:', result.jobId, 'Web UI:', result.webUI);
4. Monitor and manage jobs
Use FlinkService to check status, list jobs, create savepoints, or cancel jobs.
// List running and completed jobs
const jobs = await this.flinkService.listJobs();
// Check status of a running job
const status = await this.flinkService.getJobStatus(jobId);
console.log(status.state, status.duration);
// Create a savepoint for stateful upgrades
await this.flinkService.createSavepoint(jobId, 's3://bucket/savepoints/');
// Stop with savepoint, then redeploy
await this.flinkService.stopJob(jobId, 's3://bucket/savepoints/sp-1');
5. Flink SQL Gateway (optional)
For SQL-based streaming (e.g. CREATE TABLE + INSERT INTO ... SELECT), use the SQL Gateway and create a session, then submit DDL/DML.
const { operationId, sessionId } = await this.flinkService.deployStreamWithSql(`
CREATE TABLE clicks (
user_id STRING,
page_id STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
INSERT INTO click_summaries
SELECT user_id, COUNT(*), TUMBLE_END(ts, INTERVAL '1' MINUTE)
FROM clicks
GROUP BY user_id, TUMBLE(ts, INTERVAL '1' MINUTE);
`, sessionId);
When to use Flink vs in-process streaming
| Use case | Use |
|---|---|
| Single-node, low latency, simple windows | StreamProcessor (tumbling/sliding/session windows, joinStreams) |
| Distributed, fault-tolerant, Kafka-to-Kafka, scale-out | FlinkService + @Stream pipeline |
| Ad-hoc or SQL-heavy streaming | FlinkService.deployStreamWithSql + Flink SQL Gateway |
Ensure Kafka (or your source/sink) is available and that KAFKA_BOOTSTRAP_SERVERS is set when the job graph is built; the StreamBuilder uses it for the connector properties.
Summary: How the HazelJS pipeline relates to the Flink cluster
- In Node: You define a pipeline with
@Streamand@Transform. That gives you jobConfig (name, parallelism, checkpoints) and jobGraph (source/sink topics, step names). Your transform code runs only in Node (e.g. viaStreamProcessoror ETLService for in-process streaming). - On Flink: Execution is always a JVM job (JAR) or Flink SQL. You implement that job yourself (Java/Scala or SQL). The package does not generate runnable Flink code from your pipeline.
- Using the package for deployment: Call
deployStreamWithJar(pipeline, pathToYourJar)to upload and run your JAR with the config from the pipeline, ordeployStreamWithSql(sql)to submit SQL. The pipeline instance is the source of config and graph; the JAR or SQL is what actually runs on the cluster.
Related Resources
- Config Package – Environment and configuration for data sources
- Prisma Package – Database access for pipeline outputs
- hazeljs-data-starter – Full example with order and user pipelines