Skip to content

Event hub providers

DuckFlux supports three event hub backends for emit and wait steps. All three implement the same EventHub interface — you can swap backends without changing your workflow YAML.

BackendPackageInfrastructureReplayCross-process
In-memory@duckflux/core (built-in)NoneYesNo
NATS JetStream@duckflux/hub-natsNATS serverNoYes
Redis Streams@duckflux/hub-redisRedis serverYesYes

All backends implement this interface:

interface EventHub {
publish(event: string, payload: unknown): Promise<void>;
publishAndWaitAck(event: string, payload: unknown, timeoutMs: number): Promise<void>;
subscribe(event: string, signal?: AbortSignal): AsyncIterable<EventEnvelope>;
close(): Promise<void>;
}
interface EventEnvelope {
name: string;
payload: unknown;
}

The hub is optional — you only need one if your workflow uses emit or wait.event steps. When absent, those steps will throw at runtime.


The default backend. Events are held in-process; no external infrastructure needed.

Characteristics:

  • Replay: subscribers that connect after publish still receive the event — the hub buffers all events until close().
  • Fan-out: multiple subscribe() calls on the same event each receive every message independently.
  • Single-process only: events do not cross process boundaries.
import { MemoryHub, executeWorkflow } from "@duckflux/core";
const hub = new MemoryHub();
const result = await executeWorkflow(workflow, inputs, basePath, { hub });
await hub.close();
Terminal window
quack run workflow.yaml
# --event-backend defaults to "memory", no flag required
quack run workflow.yaml --event-backend=memory

The hub is injected via ExecuteOptions and propagated automatically through the entire execution — including sub-workflows, loops, parallel branches, and conditionals.

import { executeWorkflow } from "@duckflux/core";
const result = await executeWorkflow(
workflow,
inputs,
basePath,
{ hub }, // <-- inject here
);

Always call hub.close() when execution is complete to release connections and internal buffers.

When a workflow calls a sub-workflow, the same hub instance is passed down. This means a parent can wait for an event emitted by a child:

parent.yaml
flow:
- call: child.yaml
- wait:
event: child.done
timeout: 10s
output: event.result
child.yaml
flow:
- type: emit
event: child.done
payload:
result: '"hello from child"'
const hub = new MemoryHub();
const result = await executeWorkflow(parentWorkflow, {}, basePath, { hub });
// result.output === "hello from child"
await hub.close();

ScenarioRecommended backend
Local development, single processmemory
Multiple processes on the same machineredis (simple) or nats
Distributed systems, microservicesnats or redis
Need event replay for late subscribersmemory or redis
Low-latency, high-throughput messagingnats
Events must survive process restartsredis