Skip to content

A real-world example

This example walks through a real agentic pipeline: a planner breaks work into tasks, then a loop pulls each task from a queue, a coder agent implements it, and a reviewer agent checks it. Everything is orchestrated by a single duckflux workflow.


planner
└─ breaks the goal into tasks, stores them in a task queue
loop (up to 3 iterations)
├─ http → fetch next task from the queue
├─ coder → implement the task (Claude Code)
└─ reviewer → review the code (Claude Code)

Each agent runs as an exec participant invoking claude CLI commands. The loop repeats until there are no more tasks or the iteration limit is reached.


id: agentic-coding-pipeline
name: Agentic Coding Pipeline
version: "0.3"
defaults:
timeout: 10m
cwd: ./repo
inputs:
goal:
type: string
required: true
description: "High-level description of what needs to be built"
taskQueueUrl:
type: string
required: true
description: "URL of the task queue API"
maxRounds:
type: integer
default: 3
minimum: 1
maximum: 10
participants:
planner:
type: exec
run: >
claude -p
"You are a software planner. Break the following goal into a list of
discrete coding tasks. Return a JSON array of task objects with fields
'id' and 'description'. Goal: " + workflow.inputs.goal
timeout: 5m
output:
type: array
items:
type: object
required: true
fetchTask:
type: http
url: workflow.inputs.taskQueueUrl + "/next"
method: GET
headers:
Accept: application/json
coder:
type: exec
run: >
claude -p
"You are a senior software engineer. Implement the following task in the
current repository. Apply the changes directly to the codebase.
Task: " + fetchTask.output.description
timeout: 15m
onError: retry
retry:
max: 2
backoff: 10s
reviewer:
type: exec
run: >
claude -p
"You are a code reviewer. Review the changes just made for the following
task. Check for correctness, edge cases, and code quality. Return a JSON
object with fields 'approved' (boolean) and 'feedback' (string).
Task: " + fetchTask.output.description
timeout: 10m
output:
approved:
type: boolean
required: true
feedback:
type: string
flow:
# Step 1 — planner breaks the goal into tasks
- planner
# Step 2 — loop up to maxRounds: fetch → code → review
- loop:
max: workflow.inputs.maxRounds
steps:
- fetchTask
- coder:
input:
task: fetchTask.output.description
- reviewer:
input:
task: fetchTask.output.description
output:
approved: reviewer.output.approved
feedback: reviewer.output.feedback
rounds: loop.iteration

The planner receives the high-level goal from workflow inputs and instructs Claude to break it down into tasks. It returns a JSON array, which the runtime parses automatically — making fields like planner.output[0].id accessible downstream.

- loop:
max: workflow.inputs.maxRounds
steps:
- fetchTask
- coder
- reviewer

Each iteration runs three steps in sequence. The loop runs at most maxRounds times (default: 3). loop.iteration tracks which round is running.

fetchTask:
type: http
url: workflow.inputs.taskQueueUrl + "/next"
method: GET

A simple GET request to a task queue API. The response (a task object with id and description) becomes fetchTask.output, available to the coder and reviewer in the same iteration.

Both are exec participants that invoke claude -p with a prompt built from the task description. The reviewer outputs structured JSON — the runtime parses it, so reviewer.output.approved and reviewer.output.feedback are accessible as typed fields.

The coder has onError: retry with a backoff, so transient failures (model timeouts, flaky shell state) get a second chance before the workflow stops.


A few natural next steps:

Stop when tasks are exhausted — add an until condition to the loop that checks if fetchTask.output is empty:

loop:
until: fetchTask.output == null
max: workflow.inputs.maxRounds

Re-queue rejected tasks — after the reviewer, add an http step that posts the task back to the queue if reviewer.output.approved == false.

Emit progress events — add an emit step inside the loop to broadcast each completed round to external listeners:

- as: notifyRound
type: emit
event: "round.completed"
payload:
round: loop.iteration
approved: reviewer.output.approved
feedback: reviewer.output.feedback