HazelJS Flow Package

npm downloads

@hazeljs/flow is a durable execution graph engine for Node.js — a workflow kernel that handles persistence, retries, timeouts, idempotency, and concurrency. It works standalone or integrated with HazelJS.

Quick Reference

  • Purpose: @hazeljs/flow provides durable, resumable workflow execution with a DAG-based graph engine — handling persistence, retries, timeouts, idempotency, and concurrency for multi-step business processes.
  • When to use: Use @hazeljs/flow for multi-step business workflows that must survive restarts (order fulfillment, approval flows, onboarding, fraud review). Use @hazeljs/agent instead for LLM-driven decision-making with tools. Use @hazeljs/flow when the workflow steps are known at design time and don't require LLM reasoning.
  • Key concepts: Flow graph (DAG), nodes (steps), edges (transitions), durable state persistence, wait/resume, idempotency keys, retries with backoff, timeouts, optional Prisma persistence.
  • Dependencies: None required (in-memory by default), optionally @hazeljs/prisma for durable persistence.
  • Common patterns: Define flow graph with nodes and edges → start a run → flow engine executes nodes in order → wait for external input (human approval, webhook) → resume → complete.
  • Common mistakes: Not using idempotency keys for side-effecting nodes; using in-memory persistence in production (state lost on restart); confusing @hazeljs/flow (deterministic workflows) with @hazeljs/agent (LLM-driven orchestration).

Purpose

Modern applications rely on workflows: multi-step processes that span services, require human input, and must survive restarts (order fulfillment, approval flows, fraud review, onboarding). The @hazeljs/flow package provides:

  • In-memory by default — No database or env vars required; add optional Postgres when you need durability
  • Execution graphs — Nodes and edges with branching, conditional logic, and first-class WAIT/resume
  • Durability — Optional Prisma storage for crash recovery and multi-process safety
  • Audit trail — Timeline of events for every run
  • Retries, timeouts, idempotency — Built into the engine

Key Features

FeatureDescription
Zero configIn-memory storage out of the box; no DATABASE_URL or migrations required
Decorator API@Flow, @Entry, @Node, @Edge — define flows in one file
Wait & resumeReturn { status: 'wait', reason: '...' }; call resumeRun(runId, payload) when ready
IdempotencyPer-node keys so payment/notification steps never run twice
Retries & backoffmaxAttempts, backoff: 'fixed' | 'exponential', baseDelayMs, maxDelayMs
TimeoutsPer-node timeoutMs so stuck nodes don't block forever
Conditional edgesBranch with when(ctx) predicates; deterministic, no silent wrong-path bugs
Concurrency safetyPostgres advisory locks (with Prisma) or in-process per-run lock

Architecture

graph TD
  A["Your App / Flow Runtime"] --> B["FlowEngine"]
  B --> C["Storage"]
  C --> D["In-Memory (default)"]
  C --> E["Prisma (optional)"]
  E --> F["Postgres"]
  B --> G["Flow Definitions"]
  G --> H["@Flow @Node @Edge"]
  B --> I["Runs & Timeline"]
  style A fill:#3b82f6,stroke:#60a5fa,color:#fff
  style B fill:#6366f1,stroke:#818cf8,color:#fff
  style C fill:#8b5cf6,stroke:#a78bfa,color:#fff
  style D fill:#10b981,stroke:#34d399,color:#fff
  style E fill:#f59e0b,stroke:#fbbf24,color:#fff
  style G fill:#ec4899,stroke:#f472b6,color:#fff
  • FlowEngine — Registers definitions, starts runs, ticks execution, resumes waiting runs
  • Storage — In-memory by default; use createPrismaStorage(prisma) from @hazeljs/flow/prisma for Postgres
  • Flow definitions — Built with decorators or the functional flow() builder

Installation

No database required for in-memory mode:

pnpm add @hazeljs/flow

For durable persistence (optional):

pnpm add @hazeljs/flow @prisma/client

Then run the flow package's Prisma migrations (see Persistence).

Quick Start

Define a flow (decorator-based)

import { FlowEngine, Flow, Entry, Node, Edge, buildFlowDefinition } from '@hazeljs/flow';
import type { FlowContext, NodeResult } from '@hazeljs/flow';

@Flow('order-flow', '1.0.0')
class OrderFlow {
  @Entry()
  @Node('validate')
  @Edge('charge')
  async validate(ctx: FlowContext): Promise<NodeResult> {
    // validate input
    return { status: 'ok', output: { orderId: ctx.input.orderId } };
  }

  @Node('charge')
  async charge(ctx: FlowContext): Promise<NodeResult> {
    // charge payment
    return { status: 'ok', output: { charged: true } };
  }
}

const engine = new FlowEngine();  // in-memory storage
await engine.registerDefinition(buildFlowDefinition(OrderFlow));

const { runId } = await engine.startRun({
  flowId: 'order-flow',
  version: '1.0.0',
  input: { orderId: 'ORD-123' },
});

let run = await engine.getRun(runId);
while (run?.status === 'RUNNING') {
  run = await engine.tick(runId);
}

Functional builder (alternative)

import { FlowEngine, flow } from '@hazeljs/flow';

const def = flow('my-flow', '1.0.0')
  .entry('start')
  .node('start', async (ctx) => ({ status: 'ok', output: 1 }))
  .node('end', async (ctx) => ({ status: 'ok', output: ctx.outputs.start }))
  .edge('start', 'end')
  .build();

const engine = new FlowEngine();
await engine.registerDefinition(def);

Persistence

By default the engine uses in-memory storage. For crash recovery and multi-process safety, use Postgres via the Prisma adapter:

  1. Install: pnpm add @prisma/client
  2. Set DATABASE_URL and run migrations from the flow package's prisma/ schema
  3. Create the engine with Prisma storage:
import { FlowEngine, buildFlowDefinition } from '@hazeljs/flow';
import { createPrismaStorage, createFlowPrismaClient } from '@hazeljs/flow/prisma';

const prisma = createFlowPrismaClient();  // uses DATABASE_URL
const engine = new FlowEngine({ storage: createPrismaStorage(prisma) });
// same registerDefinition, startRun, tick...

The migration SQL in @hazeljs/flow's prisma/migrations/ is only needed when you use this adapter.

Wait and resume (human-in-the-loop)

Pause a run for external input (approval, webhook, payment callback):

@Node('await-approval')
async awaitApproval(ctx: FlowContext): Promise<NodeResult> {
  return { status: 'wait', reason: 'awaiting_approval' };
}

The run is persisted (or held in memory). When the approval arrives, resume it:

await engine.resumeRun(runId, { approved: true });

No polling or custom state tables. With Flow Runtime, your UI or approval service calls POST /v1/runs/:runId/resume.

Idempotency

Prevent duplicate side effects (e.g. charging a card twice) with per-node idempotency keys:

@Node('charge', { idempotencyKey: (ctx) => `order:${ctx.input.orderId}:charge` })
async charge(ctx: FlowContext): Promise<NodeResult> {
  // only runs once per key; cached output reused on retry
  return { status: 'ok', output: { charged: true } };
}

Retries and timeouts

Attach a retry policy and timeout to any node:

@Node('call-api', {
  retry: { maxAttempts: 3, backoff: 'exponential', baseDelayMs: 1000, maxDelayMs: 10000 },
  timeoutMs: 5000,
})
async callApi(ctx: FlowContext): Promise<NodeResult> {
  // ...
}

Conditional edges (branching)

Branch on run state with when(ctx):

@Edge('score', 'approve', { when: (ctx) => (ctx.outputs.score as number) < 30 })
@Edge('score', 'review', { when: (ctx) => (ctx.outputs.score as number) < 70 })
@Edge('score', 'reject')

Edges are evaluated by priority; multiple matches at the same priority yield AMBIGUOUS_EDGE.

Run as an HTTP service

Use @hazeljs/flow-runtime to run flows behind a REST API, or invoke it from your app with runFlowRuntime({ flows, port, databaseUrl?, services }) so you don't reimplement the server.

When to use it

Good fit: Order processing, approval workflows, fraud/review queues, onboarding, AI agent orchestration, document workflows, multi-step integrations with retries and audit.

Less ideal: Simple one-off jobs (use Cron or a queue), real-time streaming (use WebSocket), high-throughput event sourcing (use Kafka).

What's next?

  • Flow Runtime — Standalone HTTP service and runFlowRuntime() for your app
  • Prisma — Optional persistence adapter uses Prisma; same patterns for app data
  • Cron — For scheduled jobs; use Flow for multi-step workflows with wait/resume

Recipes

Recipe: Order Processing Workflow

// File: src/flows/order.flow.ts
import { Flow, Step, WaitForEvent } from '@hazeljs/flow';

@Flow({ name: 'order-processing' })
export class OrderFlow {
  @Step({ name: 'validate' })
  async validate(ctx: { orderId: string; items: any[] }) {
    if (!ctx.items.length) throw new Error('Order has no items');
    return { ...ctx, validated: true };
  }

  @Step({ name: 'charge-payment' })
  async chargePayment(ctx: { orderId: string; validated: boolean }) {
    // Call payment service
    return { ...ctx, paymentId: 'PAY-123', charged: true };
  }

  @WaitForEvent({ event: 'shipment.confirmed', timeout: '48h' })
  async waitForShipment(ctx: any) {
    return ctx;
  }

  @Step({ name: 'send-confirmation' })
  async sendConfirmation(ctx: { orderId: string; paymentId: string }) {
    // Send email notification
    return { ...ctx, confirmed: true };
  }
}

Recipe: Start a Flow from a Controller

// File: src/orders/order.controller.ts
import { Controller, Post, Body } from '@hazeljs/core';
import { FlowEngine } from '@hazeljs/flow';

@Controller('orders')
export class OrderController {
  constructor(private readonly engine: FlowEngine) {}

  @Post()
  async createOrder(@Body() body: { items: any[] }) {
    const run = await this.engine.start('order-processing', {
      orderId: `ORD-${Date.now()}`,
      items: body.items,
    });
    return { runId: run.id, status: run.status };
  }
}