HazelJS Flow Package
@hazeljs/flow is a durable execution graph engine for Node.js — a workflow kernel that handles persistence, retries, timeouts, idempotency, and concurrency. It works standalone or integrated with HazelJS.
Quick Reference
- Purpose:
@hazeljs/flowprovides durable, resumable workflow execution with a DAG-based graph engine — handling persistence, retries, timeouts, idempotency, and concurrency for multi-step business processes. - When to use: Use
@hazeljs/flowfor multi-step business workflows that must survive restarts (order fulfillment, approval flows, onboarding, fraud review). Use@hazeljs/agentinstead for LLM-driven decision-making with tools. Use@hazeljs/flowwhen the workflow steps are known at design time and don't require LLM reasoning. - Key concepts: Flow graph (DAG), nodes (steps), edges (transitions), durable state persistence, wait/resume, idempotency keys, retries with backoff, timeouts, optional Prisma persistence.
- Dependencies: None required (in-memory by default), optionally
@hazeljs/prismafor durable persistence. - Common patterns: Define flow graph with nodes and edges → start a run → flow engine executes nodes in order → wait for external input (human approval, webhook) → resume → complete.
- Common mistakes: Not using idempotency keys for side-effecting nodes; using in-memory persistence in production (state lost on restart); confusing
@hazeljs/flow(deterministic workflows) with@hazeljs/agent(LLM-driven orchestration).
Purpose
Modern applications rely on workflows: multi-step processes that span services, require human input, and must survive restarts (order fulfillment, approval flows, fraud review, onboarding). The @hazeljs/flow package provides:
- In-memory by default — No database or env vars required; add optional Postgres when you need durability
- Execution graphs — Nodes and edges with branching, conditional logic, and first-class WAIT/resume
- Durability — Optional Prisma storage for crash recovery and multi-process safety
- Audit trail — Timeline of events for every run
- Retries, timeouts, idempotency — Built into the engine
Key Features
| Feature | Description |
|---|---|
| Zero config | In-memory storage out of the box; no DATABASE_URL or migrations required |
| Decorator API | @Flow, @Entry, @Node, @Edge — define flows in one file |
| Wait & resume | Return { status: 'wait', reason: '...' }; call resumeRun(runId, payload) when ready |
| Idempotency | Per-node keys so payment/notification steps never run twice |
| Retries & backoff | maxAttempts, backoff: 'fixed' | 'exponential', baseDelayMs, maxDelayMs |
| Timeouts | Per-node timeoutMs so stuck nodes don't block forever |
| Conditional edges | Branch with when(ctx) predicates; deterministic, no silent wrong-path bugs |
| Concurrency safety | Postgres advisory locks (with Prisma) or in-process per-run lock |
Architecture
graph TD A["Your App / Flow Runtime"] --> B["FlowEngine"] B --> C["Storage"] C --> D["In-Memory (default)"] C --> E["Prisma (optional)"] E --> F["Postgres"] B --> G["Flow Definitions"] G --> H["@Flow @Node @Edge"] B --> I["Runs & Timeline"] style A fill:#3b82f6,stroke:#60a5fa,color:#fff style B fill:#6366f1,stroke:#818cf8,color:#fff style C fill:#8b5cf6,stroke:#a78bfa,color:#fff style D fill:#10b981,stroke:#34d399,color:#fff style E fill:#f59e0b,stroke:#fbbf24,color:#fff style G fill:#ec4899,stroke:#f472b6,color:#fff
- FlowEngine — Registers definitions, starts runs, ticks execution, resumes waiting runs
- Storage — In-memory by default; use
createPrismaStorage(prisma)from@hazeljs/flow/prismafor Postgres - Flow definitions — Built with decorators or the functional
flow()builder
Installation
No database required for in-memory mode:
pnpm add @hazeljs/flow
For durable persistence (optional):
pnpm add @hazeljs/flow @prisma/client
Then run the flow package's Prisma migrations (see Persistence).
Quick Start
Define a flow (decorator-based)
import { FlowEngine, Flow, Entry, Node, Edge, buildFlowDefinition } from '@hazeljs/flow';
import type { FlowContext, NodeResult } from '@hazeljs/flow';
@Flow('order-flow', '1.0.0')
class OrderFlow {
@Entry()
@Node('validate')
@Edge('charge')
async validate(ctx: FlowContext): Promise<NodeResult> {
// validate input
return { status: 'ok', output: { orderId: ctx.input.orderId } };
}
@Node('charge')
async charge(ctx: FlowContext): Promise<NodeResult> {
// charge payment
return { status: 'ok', output: { charged: true } };
}
}
const engine = new FlowEngine(); // in-memory storage
await engine.registerDefinition(buildFlowDefinition(OrderFlow));
const { runId } = await engine.startRun({
flowId: 'order-flow',
version: '1.0.0',
input: { orderId: 'ORD-123' },
});
let run = await engine.getRun(runId);
while (run?.status === 'RUNNING') {
run = await engine.tick(runId);
}
Functional builder (alternative)
import { FlowEngine, flow } from '@hazeljs/flow';
const def = flow('my-flow', '1.0.0')
.entry('start')
.node('start', async (ctx) => ({ status: 'ok', output: 1 }))
.node('end', async (ctx) => ({ status: 'ok', output: ctx.outputs.start }))
.edge('start', 'end')
.build();
const engine = new FlowEngine();
await engine.registerDefinition(def);
Persistence
By default the engine uses in-memory storage. For crash recovery and multi-process safety, use Postgres via the Prisma adapter:
- Install:
pnpm add @prisma/client - Set
DATABASE_URLand run migrations from the flow package'sprisma/schema - Create the engine with Prisma storage:
import { FlowEngine, buildFlowDefinition } from '@hazeljs/flow';
import { createPrismaStorage, createFlowPrismaClient } from '@hazeljs/flow/prisma';
const prisma = createFlowPrismaClient(); // uses DATABASE_URL
const engine = new FlowEngine({ storage: createPrismaStorage(prisma) });
// same registerDefinition, startRun, tick...
The migration SQL in @hazeljs/flow's prisma/migrations/ is only needed when you use this adapter.
Wait and resume (human-in-the-loop)
Pause a run for external input (approval, webhook, payment callback):
@Node('await-approval')
async awaitApproval(ctx: FlowContext): Promise<NodeResult> {
return { status: 'wait', reason: 'awaiting_approval' };
}
The run is persisted (or held in memory). When the approval arrives, resume it:
await engine.resumeRun(runId, { approved: true });
No polling or custom state tables. With Flow Runtime, your UI or approval service calls POST /v1/runs/:runId/resume.
Idempotency
Prevent duplicate side effects (e.g. charging a card twice) with per-node idempotency keys:
@Node('charge', { idempotencyKey: (ctx) => `order:${ctx.input.orderId}:charge` })
async charge(ctx: FlowContext): Promise<NodeResult> {
// only runs once per key; cached output reused on retry
return { status: 'ok', output: { charged: true } };
}
Retries and timeouts
Attach a retry policy and timeout to any node:
@Node('call-api', {
retry: { maxAttempts: 3, backoff: 'exponential', baseDelayMs: 1000, maxDelayMs: 10000 },
timeoutMs: 5000,
})
async callApi(ctx: FlowContext): Promise<NodeResult> {
// ...
}
Conditional edges (branching)
Branch on run state with when(ctx):
@Edge('score', 'approve', { when: (ctx) => (ctx.outputs.score as number) < 30 })
@Edge('score', 'review', { when: (ctx) => (ctx.outputs.score as number) < 70 })
@Edge('score', 'reject')
Edges are evaluated by priority; multiple matches at the same priority yield AMBIGUOUS_EDGE.
Run as an HTTP service
Use @hazeljs/flow-runtime to run flows behind a REST API, or invoke it from your app with runFlowRuntime({ flows, port, databaseUrl?, services }) so you don't reimplement the server.
When to use it
Good fit: Order processing, approval workflows, fraud/review queues, onboarding, AI agent orchestration, document workflows, multi-step integrations with retries and audit.
Less ideal: Simple one-off jobs (use Cron or a queue), real-time streaming (use WebSocket), high-throughput event sourcing (use Kafka).
What's next?
- Flow Runtime — Standalone HTTP service and
runFlowRuntime()for your app - Prisma — Optional persistence adapter uses Prisma; same patterns for app data
- Cron — For scheduled jobs; use Flow for multi-step workflows with wait/resume
Recipes
Recipe: Order Processing Workflow
// File: src/flows/order.flow.ts
import { Flow, Step, WaitForEvent } from '@hazeljs/flow';
@Flow({ name: 'order-processing' })
export class OrderFlow {
@Step({ name: 'validate' })
async validate(ctx: { orderId: string; items: any[] }) {
if (!ctx.items.length) throw new Error('Order has no items');
return { ...ctx, validated: true };
}
@Step({ name: 'charge-payment' })
async chargePayment(ctx: { orderId: string; validated: boolean }) {
// Call payment service
return { ...ctx, paymentId: 'PAY-123', charged: true };
}
@WaitForEvent({ event: 'shipment.confirmed', timeout: '48h' })
async waitForShipment(ctx: any) {
return ctx;
}
@Step({ name: 'send-confirmation' })
async sendConfirmation(ctx: { orderId: string; paymentId: string }) {
// Send email notification
return { ...ctx, confirmed: true };
}
}
Recipe: Start a Flow from a Controller
// File: src/orders/order.controller.ts
import { Controller, Post, Body } from '@hazeljs/core';
import { FlowEngine } from '@hazeljs/flow';
@Controller('orders')
export class OrderController {
constructor(private readonly engine: FlowEngine) {}
@Post()
async createOrder(@Body() body: { items: any[] }) {
const run = await this.engine.start('order-processing', {
orderId: `ORD-${Date.now()}`,
items: body.items,
});
return { runId: run.id, status: run.status };
}
}
Related Resources
- Flow Runtime Package – HTTP service for flow execution
- Prisma Package – Database persistence for flows
- Cron Package – Scheduled workflow triggers
- Queue Package – Background job processing
- hazeljs-flow-example – Full workflow examples