[AURON #1840] Preserve collect_set first-occurrence order#2285
[AURON #1840] Preserve collect_set first-occurrence order#2285peter941221 wants to merge 7 commits into
Conversation
46ef225 to
f092709
Compare
|
Rebased this onto current master and kept the fix minimal. The change still just removes the size-based swap in AccSet::merge and adds order assertions for the merge cases. New CI is running on f092709. |
|
do we really need to keep this order? if i dont understand wrong, this order we be discarded in bucket-merge spill strategy even if we maintain it in updating |
|
@richox thanks for calling out the spill path. I checked that path and added a focused regression for it. The spill bucket merge still comes back through the same I also checked the spill round-trip itself. To make that concrete, I pushed So I think we still need this fix for the spill path too. |
There was a problem hiding this comment.
Pull request overview
This PR aims to match Spark’s collect_set no-shuffle behavior by preserving first-occurrence ordering during accumulator merges, avoiding RHS-first ordering caused by size-based swapping.
Changes:
- Removed
AccSet::merge’s size-based accumulator swap to preserve encounter order. - Added regression tests to validate first-occurrence order, including a spill/unspill merge path.
- Added a source-compatible overload for
NativeHelper.getDefaultNativeMetricsto support an in-progress keyed migration.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala | Adds a compatibility overload delegating to the keyed metrics API. |
| native-engine/datafusion-ext-plans/src/agg/collect.rs | Adjusts AccSet::merge behavior and expands tests to cover ordering expectations. |
Comments suppressed due to low confidence (1)
native-engine/datafusion-ext-plans/src/agg/collect.rs:567
AccSet::mergedrainsother.setbut leavesother.list.rawintact. This breaksAccSetinvariants (list contains values not reflected in the set), can cause merged-away values to be serialized/spilled later viaAccSetColumn::save_raw, and makesAccSetColumn::mem_usedaccounting incorrect becausemerge_itemssubtractsother_value_mem_sizeeven though the underlying list memory is still retained. Also, forInternalSet::Huge, iterating the hash table does not preserve first-occurrence order; sorting by the stored position offset restores encounter order for large sets.
pub fn merge(&mut self, other: &mut Self) {
for pos_len in std::mem::take(&mut other.set).into_iter() {
self.append_raw(other.list.ref_raw(pos_len));
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
weiqingy
left a comment
There was a problem hiding this comment.
Thanks for taking this on — the two regression tests are nicely targeted: ..._when_rhs_is_larger hits exactly the branch the old swap changed, and ..._after_rhs_spill answers the spill-path question directly. A couple of questions inline.
weiqingy
left a comment
There was a problem hiding this comment.
Thanks for the quick turnaround — dropping the metrics overload in 5b0ccd6e and naming the collect functions structs case in the PR body both look right. One new thing inline on the merge path.
What changed
AccSet::mergeno longer replays RHS uniques in hash-bucket order once the RHS partial accumulator has already promoted toInternalSet::Huge.The merge path now drains the RHS positions, sorts them by the stored raw-byte offset, and replays them in first-occurrence order before appending into the LHS accumulator. It also drains the RHS raw bytes at the same time, so the merged-away accumulator does not keep stale payload bytes after
merge.This PR adds three focused regressions in
native-engine/datafusion-ext-plans/src/agg/collect.rs:test_acc_set_merge_preserves_first_occurrence_order_when_rhs_is_largertest_acc_set_merge_preserves_first_occurrence_order_when_rhs_becomes_hugetest_acc_set_merge_preserves_first_occurrence_order_after_rhs_spillWhy
This is fixing a concrete Spark-compat failure from #1840, not a cosmetic ordering preference.
The failing no-shuffle scenario is
AuronDataFrameAggregateSuite->collect functions structs. In that case Spark seesa = 1, 2, 3in first-occurrence order and returns[1, 2, 3], while the old size-based swap could make the larger RHS partial accumulator lead the merge and return[2, 3, 1]instead.The same merge boundary also had a second gap on the large-set path. After
convert_to_huge_if_needed,HashTable::into_iter()walks buckets rather than insertion order, so replaying RHS uniques straight fromHugecould still reorder the appended suffix once a group had 4 or more distinct values.The spill path still comes back through the same merge boundary:
AggTable::output->partial_merge->merge_items->AccSet::mergeSo the PR keeps first-occurrence order at the merge point for the larger-RHS case, the
Hugecase, and the spill round-trip case.Testing
git diff --checkcargo +nightly-2025-05-09 test --locked --manifest-path native-engine/datafusion-ext-plans/Cargo.toml test_acc_set_merge_preserves_first_occurrence_order_when_rhs_becomes_huge -- --nocapturecargo +nightly-2025-05-09 test --locked --manifest-path native-engine/datafusion-ext-plans/Cargo.toml test_acc_set_merge_preserves_first_occurrence_order_after_rhs_spill -- --nocapturerdkafka-sysfailing to configurelibrdkafkawith%1 is not a valid Win32 application. (os error 193)