Generalize join and reduce into navigation-free drivers over pluggable tactics#773
Conversation
Split join_traces into a general driver, join_with_tactic, that needs only TraceReader of its inputs, and a JoinTactic trait that resolves the per-batch work that arrives on each input. The conventional cursor-based implementation, CursorTactic, becomes one tactic rather than the blessed mechanism. The driver extracts trace batches via the Navigable-free TraceReader::batches_through (renamed from cursor_storage), so building and walking cursors is the tactic's concern, not the operator's. A Fresh marker names which input carried the freshly-arrived batch. It selects the accumulated side to advance by meet before consolidation, and routes the unit to one of two per-direction queues, restoring the fairness discipline whereby a burst on one input cannot starve the other. The cursor machinery (CursorTactic, Deferred, JoinThinker) is bracketed in a private mod cursors. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Split reduce_trace into a general driver, reduce_with_tactic, that needs only TraceReader of its input and Trace of its output, and a ReduceTactic trait that retires each interval of work. The conventional cursor-based implementation, CursorTactic, becomes one tactic in a private mod cursors, mirroring the join split. The driver extracts batches via the Navigable-free TraceReader::batches_through, so building and walking cursors is the tactic's concern, not the operator's. Capabilities stay with the operator and out of the trait: the tactic reasons only about times. retire is presented with the frontier of times the operator currently holds capabilities for, and returns the output batches (each tagged with the time to ship it at) and the new frontier of interesting times; the driver mints capabilities, ships, and downgrades. There is no suspension: one retire runs the whole interval to completion. Reduce's output is bounded by its input, so there is no resource blowup for suspension to relieve, unlike join. The cursor tactic carries the output builder type to call its constructor, which adds a Bu: 'static bound to reduce_abelian and reduce_core; every concrete builder already satisfies it. The per-key history replayer and sort_dedup live inside mod cursors alongside the tactic. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
minimize_keys and minimize_vals tracked the least keys and values across the constituent cursors with a flat_map over get_key / get_val. After the reduce tactic split, the adapter's inner FlattenCompat::next stopped inlining and surfaced as a hot leaf, regressing reduce on reduce-heavy workloads. The cost was lost inlining, not extra work: the algorithm and inputs are unchanged, and an #[inline] hint on the helpers did not help because the un-inlined call sat inside the adapter. Rewriting the two helpers as explicit min-tracking loops, with identical logic, removes the dependence on iterator-adapter inlining surviving the operator boundary. CursorList is shared, so this also de-risks the join path and any future tactic. Behavior is unchanged: the full suite passes and output is bit-identical. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
JoinTactic, Fresh, join_with_tactic, ReduceTactic, and reduce_with_tactic are not used outside the crate, so they need not be public. Only the established entry points, join_traces and reduce_trace, stay pub. This is reversible to pub the moment a non-cursor tactic consumer wants to implement or drive a tactic from outside the crate. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Hi Frank 👋. I've continued enjoying working with DD (and Claude has been quite helpful when the Rust generics get a bit crazy 😅). I've got a related question about this - I've been wanting an efficient "top-k" implementation for each key in reduce (or even top-1 for simplicity). The implementation for each invocation would be fairly simple (even with partial order timestamps): for each changed key, seek to the first value and emit that Is this implementation moving in that direction, or would you still recommend building a custom operator for now? |
|
Ah, that's an interesting take, and it wasn't the sort of thing that motivated this work, but it could align! The goal here was mostly to unlock alternate implementations of the same logic, but it might also be a fine way to allow other implementations to slot in when you know you want something different. The The intent was definitely to allow one to introduce alternate implementations at a "chunky" enough moment that there wouldn't need to be per-update dispatch. I guess I didn't realize that some of the uses. I would definitely check out whether the framework helps you or not: if your custom operator was hung up on organizing batches and interacting with the trace, perhaps this skeleton helps and you just need to re-do the inner logic. If that was never the problem, perhaps there isn't much difference between this operator and a custom operator (all DD operators are essentially "custom", in that they use the same TD operator builders you would use). In your case, you could imagine wanting to borrow most of the logic, but then rather than fully forming all values for each key, just move the cursor far enough forward to find the first non-zero element. Still a bit tricky, and might want to copy/paste a bit still, but might make sense! |
|
Neat, thanks for the insight. I spiked out a top-k including partial order handling (I combined the total order and partial order implementations for "simplicity"). There's still quite a lot to handle which Claude helped with a lot. I do like the new The computation of |
Summary
Generalizes
joinandreduceso each depends only on a pluggable tactic, with cursor-based navigation as one tactic rather than the blessed mechanism.Builds on #771 and #772 (cursor-optional
Trace/Batch), so it targetsmaster-next.Join (
695253a7)join_tracessplits into a general driver,join_with_tactic, that needs onlyTraceReaderof its inputs, and aJoinTactictrait that resolves the per-batch work.The conventional cursor-based implementation,
CursorTactic, becomes one tactic in a privatemod cursors.The driver extracts batches via the navigation-free
TraceReader::batches_through(renamed fromcursor_storage), so building and walking cursors is the tactic's concern.A
Freshmarker names the freshly-arrived input; it selects the accumulated side to advance bymeetand routes the unit to one of two per-direction queues, preserving the fairness that keeps a burst on one input from starving the other.Reduce (
5ae94076)The same shape:
reduce_with_tacticis the navigation-free driver,ReduceTacticthe trait,CursorTacticthe implementor inmod cursors.Reduce does not suspend: its output is at most linear in its input, so one
retireruns the whole interval to completion (no fuel, no queue).Capabilities stay with the operator and out of the trait —
retireis handed the frontier of held times and returns output batches tagged with their times plus the new frontier of interesting times; the driver mints capabilities, ships, and downgrades.Both
JoinTacticandReduceTacticare object-safe, so a non-cursor tactic could be boxed and selected at runtime.Tests
cargo build --workspaceandcargo test -p differential-datafloware clean;join,reduce, andsccsuites pass; thebfsexample runs clean across update rounds.Open for review
reduceadds aBu: 'staticbound toreduce_abelian/reduce_core(the cursor tactic carries the builder type to call its constructor). It is additive and every concrete builder satisfies it, but it is Mz-facing.pub; tightening topub(crate)until a non-cursor consumer exists is an open call.🤖 Generated with Claude Code