Event hub providers
DuckFlux supports three event hub backends for emit and wait steps. All three implement the same EventHub interface — you can swap backends without changing your workflow YAML.
| Backend | Package | Infrastructure | Replay | Cross-process |
|---|---|---|---|---|
| In-memory | @duckflux/core (built-in) | None | Yes | No |
| NATS JetStream | @duckflux/hub-nats | NATS server | No | Yes |
| Redis Streams | @duckflux/hub-redis | Redis server | Yes | Yes |
EventHub interface
Section titled “EventHub interface”All backends implement this interface:
interface EventHub { publish(event: string, payload: unknown): Promise<void>; publishAndWaitAck(event: string, payload: unknown, timeoutMs: number): Promise<void>; subscribe(event: string, signal?: AbortSignal): AsyncIterable<EventEnvelope>; close(): Promise<void>;}
interface EventEnvelope { name: string; payload: unknown;}The hub is optional — you only need one if your workflow uses emit or wait.event steps. When absent, those steps will throw at runtime.
Backends
Section titled “Backends”The default backend. Events are held in-process; no external infrastructure needed.
Characteristics:
- Replay: subscribers that connect after
publishstill receive the event — the hub buffers all events untilclose(). - Fan-out: multiple
subscribe()calls on the same event each receive every message independently. - Single-process only: events do not cross process boundaries.
Library usage
Section titled “Library usage”import { MemoryHub, executeWorkflow } from "@duckflux/core";
const hub = new MemoryHub();const result = await executeWorkflow(workflow, inputs, basePath, { hub });await hub.close();CLI usage
Section titled “CLI usage”quack run workflow.yaml# --event-backend defaults to "memory", no flag required
quack run workflow.yaml --event-backend=memoryUses NATS JetStream for distributed event delivery.
Characteristics:
- No replay: subscribers only receive events published after they subscribe (ephemeral ordered consumers).
- Fan-out: each
NatsHubinstance gets an independent consumer — multiple instances on the same server each receive every message. - Stream names: NATS JetStream does not allow dots in stream names, so duckflux replaces dots with underscores internally (
order.payment.done→ streamorder_payment_done). Subjects keep their original dot notation.
Installation
Section titled “Installation”# Bunbun add @duckflux/hub-nats
# npmnpm install @duckflux/hub-natsLibrary usage
Section titled “Library usage”import { NatsHub } from "@duckflux/hub-nats";import { executeWorkflow } from "@duckflux/core";
const hub = await NatsHub.create({ url: "nats://localhost:4222", stream: "duckflux-events", // optional — defaults to the topic name});
const result = await executeWorkflow(workflow, inputs, basePath, { hub });await hub.close();CLI usage
Section titled “CLI usage”quack run workflow.yaml \ --event-backend=nats \ --nats-url=nats://localhost:4222
# With a custom stream namequack run workflow.yaml \ --event-backend=nats \ --nats-url=nats://localhost:4222 \ --nats-stream=my-streamConfiguration reference
Section titled “Configuration reference”| Option | CLI flag | Type | Default | Description |
|---|---|---|---|---|
url | --nats-url | string | — | NATS server URL. Required. |
stream | --nats-stream | string | duckflux-events | JetStream stream name. |
Running a local NATS server
Section titled “Running a local NATS server”# Dockerdocker run -p 4222:4222 nats:latest -js
# Or with nats-server installednats-server -jsUses Redis Streams (XADD / XREADGROUP) for persistent, distributed event delivery.
Characteristics:
- Replay: the hub reads from stream position
0, so subscribers receive events published before they connected. - Fan-out: hubs with different
consumerGroupnames each receive all messages independently. Hubs with the same consumer group share the load (competing consumers). - Persistence: messages remain in the Redis stream after consumption and can be re-read by new consumer groups.
Installation
Section titled “Installation”# Bunbun add @duckflux/hub-redis
# npmnpm install @duckflux/hub-redisLibrary usage
Section titled “Library usage”import { RedisHub } from "@duckflux/hub-redis";import { executeWorkflow } from "@duckflux/core";
const hub = await RedisHub.create({ addr: "localhost:6379", // optional — this is the default db: 0, // optional — this is the default consumerGroup: "my-app", // optional — defaults to "duckflux"});
const result = await executeWorkflow(workflow, inputs, basePath, { hub });await hub.close();CLI usage
Section titled “CLI usage”quack run workflow.yaml \ --event-backend=redis \ --redis-addr=localhost:6379
# With custom optionsquack run workflow.yaml \ --event-backend=redis \ --redis-addr=redis.internal:6379 \ --redis-db=2Configuration reference
Section titled “Configuration reference”| Option | CLI flag | Type | Default | Description |
|---|---|---|---|---|
addr | --redis-addr | string | localhost:6379 | Redis server address (host:port). |
db | --redis-db | number | 0 | Redis database number. |
consumerGroup | — | string | duckflux | Consumer group name. Different groups = independent fan-out. |
Running a local Redis server
Section titled “Running a local Redis server”# Dockerdocker run -p 6379:6379 redis:latest
# Or with redis-server installedredis-serverYou can implement your own backend by satisfying the EventHub interface from @duckflux/core — for example, to integrate with Kafka, an in-house broker, or a test double.
See Implementing a custom event hub for the full interface contract, a minimal working implementation, and a testing pattern.
Passing a hub in library mode
Section titled “Passing a hub in library mode”The hub is injected via ExecuteOptions and propagated automatically through the entire execution — including sub-workflows, loops, parallel branches, and conditionals.
import { executeWorkflow } from "@duckflux/core";
const result = await executeWorkflow( workflow, inputs, basePath, { hub }, // <-- inject here);Always call hub.close() when execution is complete to release connections and internal buffers.
Sub-workflows
Section titled “Sub-workflows”When a workflow calls a sub-workflow, the same hub instance is passed down. This means a parent can wait for an event emitted by a child:
flow: - call: child.yaml - wait: event: child.done timeout: 10soutput: event.resultflow: - type: emit event: child.done payload: result: '"hello from child"'const hub = new MemoryHub();const result = await executeWorkflow(parentWorkflow, {}, basePath, { hub });// result.output === "hello from child"await hub.close();Choosing a backend
Section titled “Choosing a backend”| Scenario | Recommended backend |
|---|---|
| Local development, single process | memory |
| Multiple processes on the same machine | redis (simple) or nats |
| Distributed systems, microservices | nats or redis |
| Need event replay for late subscribers | memory or redis |
| Low-latency, high-throughput messaging | nats |
| Events must survive process restarts | redis |