Skip to content

Generalize join and reduce into navigation-free drivers over pluggable tactics#773

Merged
frankmcsherry merged 4 commits into
TimelyDataflow:master-nextfrom
frankmcsherry:join-tactics
Jun 29, 2026
Merged

Generalize join and reduce into navigation-free drivers over pluggable tactics#773
frankmcsherry merged 4 commits into
TimelyDataflow:master-nextfrom
frankmcsherry:join-tactics

Conversation

@frankmcsherry

Copy link
Copy Markdown
Member

Summary

Generalizes join and reduce so each depends only on a pluggable tactic, with cursor-based navigation as one tactic rather than the blessed mechanism.

Builds on #771 and #772 (cursor-optional Trace/Batch), so it targets master-next.

Join (695253a7)

join_traces splits into a general driver, join_with_tactic, that needs only TraceReader of its inputs, and a JoinTactic trait that resolves the per-batch work.
The conventional cursor-based implementation, CursorTactic, becomes one tactic in a private mod cursors.
The driver extracts batches via the navigation-free TraceReader::batches_through (renamed from cursor_storage), so building and walking cursors is the tactic's concern.
A Fresh marker names the freshly-arrived input; it selects the accumulated side to advance by meet and routes the unit to one of two per-direction queues, preserving the fairness that keeps a burst on one input from starving the other.

Reduce (5ae94076)

The same shape: reduce_with_tactic is the navigation-free driver, ReduceTactic the trait, CursorTactic the implementor in mod cursors.
Reduce does not suspend: its output is at most linear in its input, so one retire runs the whole interval to completion (no fuel, no queue).
Capabilities stay with the operator and out of the trait — retire is handed the frontier of held times and returns output batches tagged with their times plus the new frontier of interesting times; the driver mints capabilities, ships, and downgrades.

Both JoinTactic and ReduceTactic are object-safe, so a non-cursor tactic could be boxed and selected at runtime.

Tests

cargo build --workspace and cargo test -p differential-dataflow are clean; join, reduce, and scc suites pass; the bfs example runs clean across update rounds.

Open for review

  • reduce adds a Bu: 'static bound to reduce_abelian/reduce_core (the cursor tactic carries the builder type to call its constructor). It is additive and every concrete builder satisfies it, but it is Mz-facing.
  • The tactic traits and drivers are pub; tightening to pub(crate) until a non-cursor consumer exists is an open call.

🤖 Generated with Claude Code

frankmcsherry and others added 4 commits June 28, 2026 15:19
Split join_traces into a general driver, join_with_tactic, that needs only
TraceReader of its inputs, and a JoinTactic trait that resolves the per-batch
work that arrives on each input.
The conventional cursor-based implementation, CursorTactic, becomes one tactic
rather than the blessed mechanism.

The driver extracts trace batches via the Navigable-free
TraceReader::batches_through (renamed from cursor_storage), so building and
walking cursors is the tactic's concern, not the operator's.

A Fresh marker names which input carried the freshly-arrived batch.
It selects the accumulated side to advance by meet before consolidation, and
routes the unit to one of two per-direction queues, restoring the fairness
discipline whereby a burst on one input cannot starve the other.

The cursor machinery (CursorTactic, Deferred, JoinThinker) is bracketed in a
private mod cursors.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Split reduce_trace into a general driver, reduce_with_tactic, that needs only
TraceReader of its input and Trace of its output, and a ReduceTactic trait that
retires each interval of work.
The conventional cursor-based implementation, CursorTactic, becomes one tactic
in a private mod cursors, mirroring the join split.

The driver extracts batches via the Navigable-free TraceReader::batches_through,
so building and walking cursors is the tactic's concern, not the operator's.

Capabilities stay with the operator and out of the trait: the tactic reasons
only about times.
retire is presented with the frontier of times the operator currently holds
capabilities for, and returns the output batches (each tagged with the time to
ship it at) and the new frontier of interesting times; the driver mints
capabilities, ships, and downgrades.

There is no suspension: one retire runs the whole interval to completion.
Reduce's output is bounded by its input, so there is no resource blowup for
suspension to relieve, unlike join.

The cursor tactic carries the output builder type to call its constructor,
which adds a Bu: 'static bound to reduce_abelian and reduce_core; every concrete
builder already satisfies it.

The per-key history replayer and sort_dedup live inside mod cursors alongside
the tactic.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
minimize_keys and minimize_vals tracked the least keys and values across the
constituent cursors with a flat_map over get_key / get_val.
After the reduce tactic split, the adapter's inner FlattenCompat::next stopped
inlining and surfaced as a hot leaf, regressing reduce on reduce-heavy
workloads.
The cost was lost inlining, not extra work: the algorithm and inputs are
unchanged, and an #[inline] hint on the helpers did not help because the
un-inlined call sat inside the adapter.

Rewriting the two helpers as explicit min-tracking loops, with identical logic,
removes the dependence on iterator-adapter inlining surviving the operator
boundary.
CursorList is shared, so this also de-risks the join path and any future tactic.

Behavior is unchanged: the full suite passes and output is bit-identical.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
JoinTactic, Fresh, join_with_tactic, ReduceTactic, and reduce_with_tactic are
not used outside the crate, so they need not be public.
Only the established entry points, join_traces and reduce_trace, stay pub.

This is reversible to pub the moment a non-cursor tactic consumer wants to
implement or drive a tactic from outside the crate.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@frankmcsherry frankmcsherry merged commit 204906a into TimelyDataflow:master-next Jun 29, 2026
6 checks passed
@frankmcsherry frankmcsherry deleted the join-tactics branch June 29, 2026 01:41
@oli-w

oli-w commented Jun 29, 2026

Copy link
Copy Markdown

Hi Frank 👋. I've continued enjoying working with DD (and Claude has been quite helpful when the Rust generics get a bit crazy 😅).

I've got a related question about this - I've been wanting an efficient "top-k" implementation for each key in reduce (or even top-1 for simplicity). The implementation for each invocation would be fairly simple (even with partial order timestamps): for each changed key, seek to the first value and emit that (value, 1) pair as output (plus edge cases which may require further value stepping if the diffs for a given timestamp cancel out after consolidation). This avoids having to collect all (value, diff) pairs for each changed key into a slice, only to grab the first one (imagine ~1 million distinct values per key).
I see there was already an issue you created about this a while ago - #29

Is this implementation moving in that direction, or would you still recommend building a custom operator for now?

@frankmcsherry

Copy link
Copy Markdown
Member Author

Ah, that's an interesting take, and it wasn't the sort of thing that motivated this work, but it could align!

The goal here was mostly to unlock alternate implementations of the same logic, but it might also be a fine way to allow other implementations to slot in when you know you want something different. The *Tactic types are dyn-safe, and they are intended to be boxed and passed in; you could certainly imagine slotting in a TopK specialized implementation, when you know that this is the right thing to do. Similarly, perhaps: if you know that a timestamp is totally ordered, you could hand in a different (usually easier) implementation that doesn't worry about the interesting times.

The intent was definitely to allow one to introduce alternate implementations at a "chunky" enough moment that there wouldn't need to be per-update dispatch. I guess I didn't realize that some of the uses.

I would definitely check out whether the framework helps you or not: if your custom operator was hung up on organizing batches and interacting with the trace, perhaps this skeleton helps and you just need to re-do the inner logic. If that was never the problem, perhaps there isn't much difference between this operator and a custom operator (all DD operators are essentially "custom", in that they use the same TD operator builders you would use). In your case, you could imagine wanting to borrow most of the logic, but then rather than fully forming all values for each key, just move the cursor far enough forward to find the first non-zero element. Still a bit tricky, and might want to copy/paste a bit still, but might make sense!

@oli-w

oli-w commented Jun 30, 2026

Copy link
Copy Markdown

Neat, thanks for the insight. I spiked out a top-k including partial order handling (I combined the total order and partial order implementations for "simplicity"). There's still quite a lot to handle which Claude helped with a lot. I do like the new *Tactic mental model - given these input+output batches and time interval, produce the relevant output batches.

The computation of new_topk, old_topk, and then calling emit_topk_diff reads quite naturally. There's still quite a lot of boilerplate, but the ReduceTactic is a helpful abstraction over writing a raw operator and doing capability management. Here's the code for reference: https://github.com/oli-w/dd-topk-spike/blob/main/src/min_tactic.rs#L180

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants