Library usage
@duckflux/core is a fully embeddable runtime library. You can parse, validate, and execute DuckFlux workflows programmatically — no CLI required.
Common use cases:
- Embedding a workflow engine inside an existing application
- Running workflows as part of a build pipeline or test suite
- Implementing a custom orchestrator backed by DuckFlux
Packages
Section titled “Packages”The runtime is split into four packages:
| Package | Version | Purpose |
|---|---|---|
@duckflux/core | Engine, parser, CEL evaluator, in-memory event hub. The only required package. | |
@duckflux/runner | CLI binary (quack run, lint, validate). Exposes @duckflux/core in CLI. | |
@duckflux/hub-nats | Optional NATS JetStream event hub backend. | |
@duckflux/hub-redis | Optional Redis Streams event hub backend. |
Installation
Section titled “Installation”bun add @duckflux/corenpm install @duckflux/coreQuick start
Section titled “Quick start”import { parseWorkflow, executeWorkflow } from "@duckflux/core";
const yaml = `name: greetinputs: name: type: string required: trueflow: - type: exec as: greeter run: sh -c 'name=$(cat -); echo "Hello, $name!"' input: workflow.inputs.nameoutput: greeter.output`;
const workflow = parseWorkflow(yaml);const result = await executeWorkflow(workflow, { name: "World" });
console.log(result.output); // "Hello, World!"console.log(result.success); // trueParsing workflows
Section titled “Parsing workflows”From a YAML string
Section titled “From a YAML string”import { parseWorkflow } from "@duckflux/core";
const workflow = parseWorkflow(yamlString);parseWorkflow is synchronous and returns a Workflow object. It parses YAML but does not validate — call validateSchema afterwards if needed.
From a file
Section titled “From a file”import { parseWorkflowFile } from "@duckflux/core";
const workflow = await parseWorkflowFile("./workflows/process.yaml");parseWorkflowFile reads and parses a YAML file asynchronously.
Workflow type
Section titled “Workflow type”interface Workflow { id?: string; name?: string; version?: string | number; defaults?: WorkflowDefaults; inputs?: Record<string, InputDefinition | null>; participants?: Record<string, Participant>; flow: FlowStep[]; output?: WorkflowOutput;}Validation
Section titled “Validation”Validation is optional but recommended before execution, especially when workflows come from user input or external sources.
Schema validation
Section titled “Schema validation”Checks that the workflow conforms to the DuckFlux JSON schema. Synchronous.
import { validateSchema } from "@duckflux/core";
const result = validateSchema(workflow);if (!result.valid) { console.error(result.errors);}Semantic validation
Section titled “Semantic validation”Checks participant references, variable bindings, and imported sub-workflow paths. Async (reads the filesystem for call participants).
import { validateSemantic } from "@duckflux/core";
const result = await validateSemantic(workflow, basePath);if (!result.valid) { console.error(result.errors);}Input validation
Section titled “Input validation”Validates runtime inputs against the workflow’s declared inputs schema.
import { validateInputs } from "@duckflux/core";
const result = validateInputs({ name: "World" }, workflow.inputs);if (!result.valid) { console.error(result.errors);}executeWorkflow also validates inputs automatically before running — this function is useful for pre-flight checks.
ValidationResult type
Section titled “ValidationResult type”interface ValidationResult { valid: boolean; errors: ValidationError[];}
interface ValidationError { path: string; message: string;}Executing workflows
Section titled “Executing workflows”executeWorkflow
Section titled “executeWorkflow”async function executeWorkflow( workflow: Workflow, inputs: Record<string, unknown> = {}, basePath: string = process.cwd(), options: ExecuteOptions = {},): Promise<WorkflowResult>| Parameter | Description |
|---|---|
workflow | Parsed Workflow object. |
inputs | Runtime inputs passed to the workflow. Must satisfy workflow.inputs if defined. |
basePath | Root path used to resolve relative file references (call participant paths, exec participant cwd). Defaults to process.cwd(). |
options | Execution options — see below. |
ExecuteOptions
Section titled “ExecuteOptions”interface ExecuteOptions { hub?: EventHub; // Event hub for emit/wait.event steps. cwd?: string; // Override working directory for exec participants. executionNumber?: number; // Execution counter; available as execution.number in CEL. Defaults to 1. verbose?: boolean; // Reserved for CLI use. quiet?: boolean; // Reserved for CLI use.}basePath and relative paths
Section titled “basePath and relative paths”basePath affects two things:
callparticipants — the sub-workflow path is resolved relative tobasePath. When the sub-workflow runs, its ownbasePathbecomes the directory containing that file, so nested relative references work correctly.execparticipantcwd— ifcwdis a relative path, it is resolved againstbasePath.
// Workflow in /app/workflows/main.yaml calls ./helpers/fetch.yamlawait executeWorkflow(workflow, inputs, "/app/workflows");// ^ basePathrunWorkflowFromFile
Section titled “runWorkflowFromFile”Convenience wrapper that parses a file and executes it in one call.
import { runWorkflowFromFile } from "@duckflux/core";
const result = await runWorkflowFromFile("./workflow.yaml", { name: "World" }, { cwd: "/workspace",});Reading results
Section titled “Reading results”WorkflowResult
Section titled “WorkflowResult”interface WorkflowResult { success: boolean; // false if any step failed with strategy "fail" output: unknown; // Resolved workflow output (string, object, null, …) steps: Record<string, StepResult>; // Results keyed by step name (the as: field) duration: number; // Total execution time in milliseconds}StepResult
Section titled “StepResult”interface StepResult { status: "success" | "failure" | "skipped"; output: string; // Raw output (usually a JSON string) parsedOutput?: unknown; // Parsed JSON when applicable error?: string; // Error message if status === "failure" duration: number; // Step execution time in ms startedAt?: string; // ISO 8601 timestamp finishedAt?: string; // ISO 8601 timestamp retries?: number; // Retry attempts made cwd?: string; // Working directory used (exec participants) httpStatus?: number; // HTTP status code (http participants) responseBody?: string; // HTTP response body on failure (http participants)}Example
Section titled “Example”const result = await executeWorkflow(workflow, inputs, basePath);
if (!result.success) { for (const [name, step] of Object.entries(result.steps)) { if (step.status === "failure") { console.error(`Step "${name}" failed: ${step.error}`); } }}
console.log(`Finished in ${result.duration}ms`);console.log("Output:", result.output);Error handling
Section titled “Error handling”executeWorkflow throws in two cases:
| Situation | What is thrown |
|---|---|
| Input validation fails | Error("input validation failed: ...") |
A step fails with onError: fail (the default) | WorkflowError |
WorkflowError
Section titled “WorkflowError”import { WorkflowError } from "@duckflux/core";
try { await executeWorkflow(workflow, inputs, basePath);} catch (err) { if (err instanceof WorkflowError) { console.error(`Step "${err.stepName}" failed after ${err.retriesAttempted} retries`); console.error(`Strategy: ${err.strategy}`); }}class WorkflowError extends Error { stepName: string; // Name of the step that failed strategy: ErrorStrategy; // Error strategy applied ("fail", "skip", "retry") retriesAttempted: number; // Number of retry attempts made}Steps configured with onError: skip do not throw — they appear in result.steps with status: "skipped" and execution continues.
Using an event hub
Section titled “Using an event hub”Inject a hub via ExecuteOptions to enable emit and wait.event steps. The same hub instance is automatically propagated to all sub-workflows, loops, and parallel branches.
import { parseWorkflow, executeWorkflow, MemoryHub } from "@duckflux/core";
const hub = new MemoryHub();
const result = await executeWorkflow(workflow, inputs, basePath, { hub });
await hub.close(); // Always close when doneFor distributed use cases, see Event hub providers for NATS and Redis backends.
Sub-workflow event propagation
Section titled “Sub-workflow event propagation”Because the hub is shared across the entire execution tree, a parent workflow can wait for an event emitted by a child:
flow: - call: child.yaml - wait: event: child.done timeout: 10soutput: event.resultflow: - type: emit event: child.done payload: result: '"from child"'const hub = new MemoryHub();const result = await runWorkflowFromFile("parent.yaml", {}, { hub });// result.output === "from child"await hub.close();Implementing a custom event hub
Section titled “Implementing a custom event hub”You can implement your own backend — for example, backed by Kafka, an in-house message broker, or a mock for testing — by implementing the EventHub interface.
The interface
Section titled “The interface”import type { EventHub, EventEnvelope } from "@duckflux/core";
interface EventHub { publish(event: string, payload: unknown): Promise<void>; publishAndWaitAck(event: string, payload: unknown, timeoutMs: number): Promise<void>; subscribe(event: string, signal?: AbortSignal): AsyncIterable<EventEnvelope>; close(): Promise<void>;}
interface EventEnvelope { name: string; // Event name payload: unknown; // Event payload (any JSON-serializable value)}Contract
Section titled “Contract”| Method | Required behavior |
|---|---|
publish | Deliver payload to all active subscribers of event. Fire-and-forget. |
publishAndWaitAck | Same as publish, but wait for broker acknowledgment up to timeoutMs ms before resolving. Throw on timeout. |
subscribe | Return an AsyncIterable that yields EventEnvelope objects as they arrive. Stop yielding when signal is aborted or close() is called. |
close | Release all connections and internal resources. Stop any active subscribers. |
Minimal example
Section titled “Minimal example”import type { EventHub, EventEnvelope } from "@duckflux/core";
type Listener = (envelope: EventEnvelope) => void;
export class MyCustomHub implements EventHub { private listeners = new Map<string, Set<Listener>>(); private closed = false;
async publish(event: string, payload: unknown): Promise<void> { if (this.closed) throw new Error("hub is closed"); const envelope: EventEnvelope = { name: event, payload }; for (const listener of this.listeners.get(event) ?? []) { listener(envelope); } }
async publishAndWaitAck(event: string, payload: unknown, _timeoutMs: number): Promise<void> { await this.publish(event, payload); // no ack in this backend }
async *subscribe(event: string, signal?: AbortSignal): AsyncIterable<EventEnvelope> { if (this.closed) return;
const queue: EventEnvelope[] = []; let resolve: (() => void) | null = null;
const listener: Listener = (envelope) => { queue.push(envelope); resolve?.(); resolve = null; };
if (!this.listeners.has(event)) this.listeners.set(event, new Set()); this.listeners.get(event)!.add(listener);
const cleanup = () => { this.listeners.get(event)?.delete(listener); resolve?.(); };
signal?.addEventListener("abort", cleanup);
try { while (!this.closed && !signal?.aborted) { if (queue.length > 0) { yield queue.shift()!; } else { await new Promise<void>((r) => { resolve = r; }); } } } finally { cleanup(); signal?.removeEventListener("abort", cleanup); } }
async close(): Promise<void> { this.closed = true; this.listeners.clear(); }}Using your custom hub
Section titled “Using your custom hub”import { executeWorkflow } from "@duckflux/core";import { MyCustomHub } from "./my-custom-hub";
const hub = new MyCustomHub();const result = await executeWorkflow(workflow, inputs, basePath, { hub });await hub.close();No other changes are needed — the runtime treats all EventHub implementations identically.
Testing tip
Section titled “Testing tip”A custom hub is also useful for asserting that specific events were emitted during a test:
class CapturingHub extends MyCustomHub { readonly emitted: { event: string; payload: unknown }[] = [];
override async publish(event: string, payload: unknown): Promise<void> { this.emitted.push({ event, payload }); await super.publish(event, payload); }}
const hub = new CapturingHub();await executeWorkflow(workflow, inputs, basePath, { hub });
expect(hub.emitted).toContainEqual({ event: "order.created", payload: expect.objectContaining({ orderId: "123" }),});
await hub.close();