Skip to content

Library usage

@duckflux/core is a fully embeddable runtime library. You can parse, validate, and execute DuckFlux workflows programmatically — no CLI required.

Common use cases:

  • Embedding a workflow engine inside an existing application
  • Running workflows as part of a build pipeline or test suite
  • Implementing a custom orchestrator backed by DuckFlux

The runtime is split into four packages:

PackageVersionPurpose
@duckflux/corenpmEngine, parser, CEL evaluator, in-memory event hub. The only required package.
@duckflux/runnernpmCLI binary (quack run, lint, validate). Exposes @duckflux/core in CLI.
@duckflux/hub-natsnpmOptional NATS JetStream event hub backend.
@duckflux/hub-redisnpmOptional Redis Streams event hub backend.

Terminal window
bun add @duckflux/core

import { parseWorkflow, executeWorkflow } from "@duckflux/core";
const yaml = `
name: greet
inputs:
name:
type: string
required: true
flow:
- type: exec
as: greeter
run: sh -c 'name=$(cat -); echo "Hello, $name!"'
input: workflow.inputs.name
output: greeter.output
`;
const workflow = parseWorkflow(yaml);
const result = await executeWorkflow(workflow, { name: "World" });
console.log(result.output); // "Hello, World!"
console.log(result.success); // true

import { parseWorkflow } from "@duckflux/core";
const workflow = parseWorkflow(yamlString);

parseWorkflow is synchronous and returns a Workflow object. It parses YAML but does not validate — call validateSchema afterwards if needed.

import { parseWorkflowFile } from "@duckflux/core";
const workflow = await parseWorkflowFile("./workflows/process.yaml");

parseWorkflowFile reads and parses a YAML file asynchronously.

interface Workflow {
id?: string;
name?: string;
version?: string | number;
defaults?: WorkflowDefaults;
inputs?: Record<string, InputDefinition | null>;
participants?: Record<string, Participant>;
flow: FlowStep[];
output?: WorkflowOutput;
}

Validation is optional but recommended before execution, especially when workflows come from user input or external sources.

Checks that the workflow conforms to the DuckFlux JSON schema. Synchronous.

import { validateSchema } from "@duckflux/core";
const result = validateSchema(workflow);
if (!result.valid) {
console.error(result.errors);
}

Checks participant references, variable bindings, and imported sub-workflow paths. Async (reads the filesystem for call participants).

import { validateSemantic } from "@duckflux/core";
const result = await validateSemantic(workflow, basePath);
if (!result.valid) {
console.error(result.errors);
}

Validates runtime inputs against the workflow’s declared inputs schema.

import { validateInputs } from "@duckflux/core";
const result = validateInputs({ name: "World" }, workflow.inputs);
if (!result.valid) {
console.error(result.errors);
}

executeWorkflow also validates inputs automatically before running — this function is useful for pre-flight checks.

interface ValidationResult {
valid: boolean;
errors: ValidationError[];
}
interface ValidationError {
path: string;
message: string;
}

async function executeWorkflow(
workflow: Workflow,
inputs: Record<string, unknown> = {},
basePath: string = process.cwd(),
options: ExecuteOptions = {},
): Promise<WorkflowResult>
ParameterDescription
workflowParsed Workflow object.
inputsRuntime inputs passed to the workflow. Must satisfy workflow.inputs if defined.
basePathRoot path used to resolve relative file references (call participant paths, exec participant cwd). Defaults to process.cwd().
optionsExecution options — see below.
interface ExecuteOptions {
hub?: EventHub; // Event hub for emit/wait.event steps.
cwd?: string; // Override working directory for exec participants.
executionNumber?: number; // Execution counter; available as execution.number in CEL. Defaults to 1.
verbose?: boolean; // Reserved for CLI use.
quiet?: boolean; // Reserved for CLI use.
}

basePath affects two things:

  1. call participants — the sub-workflow path is resolved relative to basePath. When the sub-workflow runs, its own basePath becomes the directory containing that file, so nested relative references work correctly.
  2. exec participant cwd — if cwd is a relative path, it is resolved against basePath.
// Workflow in /app/workflows/main.yaml calls ./helpers/fetch.yaml
await executeWorkflow(workflow, inputs, "/app/workflows");
// ^ basePath

Convenience wrapper that parses a file and executes it in one call.

import { runWorkflowFromFile } from "@duckflux/core";
const result = await runWorkflowFromFile("./workflow.yaml", { name: "World" }, {
cwd: "/workspace",
});

interface WorkflowResult {
success: boolean; // false if any step failed with strategy "fail"
output: unknown; // Resolved workflow output (string, object, null, …)
steps: Record<string, StepResult>; // Results keyed by step name (the as: field)
duration: number; // Total execution time in milliseconds
}
interface StepResult {
status: "success" | "failure" | "skipped";
output: string; // Raw output (usually a JSON string)
parsedOutput?: unknown; // Parsed JSON when applicable
error?: string; // Error message if status === "failure"
duration: number; // Step execution time in ms
startedAt?: string; // ISO 8601 timestamp
finishedAt?: string; // ISO 8601 timestamp
retries?: number; // Retry attempts made
cwd?: string; // Working directory used (exec participants)
httpStatus?: number; // HTTP status code (http participants)
responseBody?: string; // HTTP response body on failure (http participants)
}
const result = await executeWorkflow(workflow, inputs, basePath);
if (!result.success) {
for (const [name, step] of Object.entries(result.steps)) {
if (step.status === "failure") {
console.error(`Step "${name}" failed: ${step.error}`);
}
}
}
console.log(`Finished in ${result.duration}ms`);
console.log("Output:", result.output);

executeWorkflow throws in two cases:

SituationWhat is thrown
Input validation failsError("input validation failed: ...")
A step fails with onError: fail (the default)WorkflowError
import { WorkflowError } from "@duckflux/core";
try {
await executeWorkflow(workflow, inputs, basePath);
} catch (err) {
if (err instanceof WorkflowError) {
console.error(`Step "${err.stepName}" failed after ${err.retriesAttempted} retries`);
console.error(`Strategy: ${err.strategy}`);
}
}
class WorkflowError extends Error {
stepName: string; // Name of the step that failed
strategy: ErrorStrategy; // Error strategy applied ("fail", "skip", "retry")
retriesAttempted: number; // Number of retry attempts made
}

Steps configured with onError: skip do not throw — they appear in result.steps with status: "skipped" and execution continues.


Inject a hub via ExecuteOptions to enable emit and wait.event steps. The same hub instance is automatically propagated to all sub-workflows, loops, and parallel branches.

import { parseWorkflow, executeWorkflow, MemoryHub } from "@duckflux/core";
const hub = new MemoryHub();
const result = await executeWorkflow(workflow, inputs, basePath, { hub });
await hub.close(); // Always close when done

For distributed use cases, see Event hub providers for NATS and Redis backends.

Because the hub is shared across the entire execution tree, a parent workflow can wait for an event emitted by a child:

parent.yaml
flow:
- call: child.yaml
- wait:
event: child.done
timeout: 10s
output: event.result
child.yaml
flow:
- type: emit
event: child.done
payload:
result: '"from child"'
const hub = new MemoryHub();
const result = await runWorkflowFromFile("parent.yaml", {}, { hub });
// result.output === "from child"
await hub.close();

You can implement your own backend — for example, backed by Kafka, an in-house message broker, or a mock for testing — by implementing the EventHub interface.

import type { EventHub, EventEnvelope } from "@duckflux/core";
interface EventHub {
publish(event: string, payload: unknown): Promise<void>;
publishAndWaitAck(event: string, payload: unknown, timeoutMs: number): Promise<void>;
subscribe(event: string, signal?: AbortSignal): AsyncIterable<EventEnvelope>;
close(): Promise<void>;
}
interface EventEnvelope {
name: string; // Event name
payload: unknown; // Event payload (any JSON-serializable value)
}
MethodRequired behavior
publishDeliver payload to all active subscribers of event. Fire-and-forget.
publishAndWaitAckSame as publish, but wait for broker acknowledgment up to timeoutMs ms before resolving. Throw on timeout.
subscribeReturn an AsyncIterable that yields EventEnvelope objects as they arrive. Stop yielding when signal is aborted or close() is called.
closeRelease all connections and internal resources. Stop any active subscribers.
import type { EventHub, EventEnvelope } from "@duckflux/core";
type Listener = (envelope: EventEnvelope) => void;
export class MyCustomHub implements EventHub {
private listeners = new Map<string, Set<Listener>>();
private closed = false;
async publish(event: string, payload: unknown): Promise<void> {
if (this.closed) throw new Error("hub is closed");
const envelope: EventEnvelope = { name: event, payload };
for (const listener of this.listeners.get(event) ?? []) {
listener(envelope);
}
}
async publishAndWaitAck(event: string, payload: unknown, _timeoutMs: number): Promise<void> {
await this.publish(event, payload); // no ack in this backend
}
async *subscribe(event: string, signal?: AbortSignal): AsyncIterable<EventEnvelope> {
if (this.closed) return;
const queue: EventEnvelope[] = [];
let resolve: (() => void) | null = null;
const listener: Listener = (envelope) => {
queue.push(envelope);
resolve?.();
resolve = null;
};
if (!this.listeners.has(event)) this.listeners.set(event, new Set());
this.listeners.get(event)!.add(listener);
const cleanup = () => {
this.listeners.get(event)?.delete(listener);
resolve?.();
};
signal?.addEventListener("abort", cleanup);
try {
while (!this.closed && !signal?.aborted) {
if (queue.length > 0) {
yield queue.shift()!;
} else {
await new Promise<void>((r) => { resolve = r; });
}
}
} finally {
cleanup();
signal?.removeEventListener("abort", cleanup);
}
}
async close(): Promise<void> {
this.closed = true;
this.listeners.clear();
}
}
import { executeWorkflow } from "@duckflux/core";
import { MyCustomHub } from "./my-custom-hub";
const hub = new MyCustomHub();
const result = await executeWorkflow(workflow, inputs, basePath, { hub });
await hub.close();

No other changes are needed — the runtime treats all EventHub implementations identically.

A custom hub is also useful for asserting that specific events were emitted during a test:

class CapturingHub extends MyCustomHub {
readonly emitted: { event: string; payload: unknown }[] = [];
override async publish(event: string, payload: unknown): Promise<void> {
this.emitted.push({ event, payload });
await super.publish(event, payload);
}
}
const hub = new CapturingHub();
await executeWorkflow(workflow, inputs, basePath, { hub });
expect(hub.emitted).toContainEqual({
event: "order.created",
payload: expect.objectContaining({ orderId: "123" }),
});
await hub.close();