Skip to content

[AURON #1840] Preserve collect_set first-occurrence order#2285

Open
peter941221 wants to merge 7 commits into
apache:masterfrom
peter941221:fix/auron-1840-collect-set-order
Open

[AURON #1840] Preserve collect_set first-occurrence order#2285
peter941221 wants to merge 7 commits into
apache:masterfrom
peter941221:fix/auron-1840-collect-set-order

Conversation

@peter941221

@peter941221 peter941221 commented May 25, 2026

Copy link
Copy Markdown
Contributor

What changed

AccSet::merge no longer replays RHS uniques in hash-bucket order once the RHS partial accumulator has already promoted to InternalSet::Huge.

The merge path now drains the RHS positions, sorts them by the stored raw-byte offset, and replays them in first-occurrence order before appending into the LHS accumulator. It also drains the RHS raw bytes at the same time, so the merged-away accumulator does not keep stale payload bytes after merge.

This PR adds three focused regressions in native-engine/datafusion-ext-plans/src/agg/collect.rs:

  • test_acc_set_merge_preserves_first_occurrence_order_when_rhs_is_larger
  • test_acc_set_merge_preserves_first_occurrence_order_when_rhs_becomes_huge
  • test_acc_set_merge_preserves_first_occurrence_order_after_rhs_spill

Why

This is fixing a concrete Spark-compat failure from #1840, not a cosmetic ordering preference.

The failing no-shuffle scenario is AuronDataFrameAggregateSuite -> collect functions structs. In that case Spark sees a = 1, 2, 3 in first-occurrence order and returns [1, 2, 3], while the old size-based swap could make the larger RHS partial accumulator lead the merge and return [2, 3, 1] instead.

The same merge boundary also had a second gap on the large-set path. After convert_to_huge_if_needed, HashTable::into_iter() walks buckets rather than insertion order, so replaying RHS uniques straight from Huge could still reorder the appended suffix once a group had 4 or more distinct values.

The spill path still comes back through the same merge boundary:
AggTable::output -> partial_merge -> merge_items -> AccSet::merge

So the PR keeps first-occurrence order at the merge point for the larger-RHS case, the Huge case, and the spill round-trip case.

Testing

  • git diff --check
  • WSL pinned-nightly focused regressions:
    • cargo +nightly-2025-05-09 test --locked --manifest-path native-engine/datafusion-ext-plans/Cargo.toml test_acc_set_merge_preserves_first_occurrence_order_when_rhs_becomes_huge -- --nocapture
    • cargo +nightly-2025-05-09 test --locked --manifest-path native-engine/datafusion-ext-plans/Cargo.toml test_acc_set_merge_preserves_first_occurrence_order_after_rhs_spill -- --nocapture
  • Current Windows-side rerun of the same crate is still blocked before the target crate by rdkafka-sys failing to configure librdkafka with %1 is not a valid Win32 application. (os error 193)

@peter941221 peter941221 marked this pull request as ready for review May 29, 2026 10:17
@peter941221 peter941221 force-pushed the fix/auron-1840-collect-set-order branch from 46ef225 to f092709 Compare June 1, 2026 22:43
@peter941221

Copy link
Copy Markdown
Contributor Author

Rebased this onto current master and kept the fix minimal. The change still just removes the size-based swap in AccSet::merge and adds order assertions for the merge cases. New CI is running on f092709.

@richox

richox commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

do we really need to keep this order? if i dont understand wrong, this order we be discarded in bucket-merge spill strategy even if we maintain it in updating

@peter941221

Copy link
Copy Markdown
Contributor Author

@richox thanks for calling out the spill path. I checked that path and added a focused regression for it.

The spill bucket merge still comes back through the same collect_set merge path this PR changes:
AggTable::output bucket merge in agg_table.rs
-> partial_merge
-> merge_items in collect.rs
-> AccSet::merge in collect.rs

I also checked the spill round-trip itself. AccSetColumn::save_raw writes list.raw as-is, and unspill rebuilds it in order through load_raw / append_item, so spill does not discard the internal encounter order by itself.

To make that concrete, I pushed 645b3236, which adds test_acc_set_merge_preserves_first_occurrence_order_after_rhs_spill. It spills the larger RHS accumulator, unspills it, merges it into the LHS, and still gets [1, 2, 3] on the repo's pinned nightly-2025-05-09.

So I think we still need this fix for the spill path too.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to match Spark’s collect_set no-shuffle behavior by preserving first-occurrence ordering during accumulator merges, avoiding RHS-first ordering caused by size-based swapping.

Changes:

  • Removed AccSet::merge’s size-based accumulator swap to preserve encounter order.
  • Added regression tests to validate first-occurrence order, including a spill/unspill merge path.
  • Added a source-compatible overload for NativeHelper.getDefaultNativeMetrics to support an in-progress keyed migration.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala Adds a compatibility overload delegating to the keyed metrics API.
native-engine/datafusion-ext-plans/src/agg/collect.rs Adjusts AccSet::merge behavior and expands tests to cover ordering expectations.
Comments suppressed due to low confidence (1)

native-engine/datafusion-ext-plans/src/agg/collect.rs:567

  • AccSet::merge drains other.set but leaves other.list.raw intact. This breaks AccSet invariants (list contains values not reflected in the set), can cause merged-away values to be serialized/spilled later via AccSetColumn::save_raw, and makes AccSetColumn::mem_used accounting incorrect because merge_items subtracts other_value_mem_size even though the underlying list memory is still retained. Also, for InternalSet::Huge, iterating the hash table does not preserve first-occurrence order; sorting by the stored position offset restores encounter order for large sets.
    pub fn merge(&mut self, other: &mut Self) {
        for pos_len in std::mem::take(&mut other.set).into_iter() {
            self.append_raw(other.list.ref_raw(pos_len));
        }
    }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@weiqingy weiqingy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this on — the two regression tests are nicely targeted: ..._when_rhs_is_larger hits exactly the branch the old swap changed, and ..._after_rhs_spill answers the spill-path question directly. A couple of questions inline.

Comment thread spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala Outdated
Comment thread native-engine/datafusion-ext-plans/src/agg/collect.rs
@github-actions github-actions Bot removed the spark label Jun 20, 2026

@weiqingy weiqingy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick turnaround — dropping the metrics overload in 5b0ccd6e and naming the collect functions structs case in the PR body both look right. One new thing inline on the merge path.

Comment thread native-engine/datafusion-ext-plans/src/agg/collect.rs Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants