[SPARK-51262][SQL] Fix exceptAll after dropDuplicates with subset#55905
[SPARK-51262][SQL] Fix exceptAll after dropDuplicates with subset#55905shrirangmhalgi wants to merge 3 commits into
Conversation
ReplaceDeduplicateWithAggregate replaces Deduplicate with an Aggregate using First() for non-key columns, creating new attribute exprIds. When RewriteExceptAll ran first in the same optimizer batch, it captured the original exprIds in its Generate node. After ReplaceDeduplicateWithAggregate rewrote the Deduplicate, the Generate still referenced the old exprIds, causing INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND at execution time. Fix: reorder ReplaceDeduplicateWithAggregate before RewriteExceptAll in the Replace Operators batch so Deduplicate is already an Aggregate when RewriteExceptAll processes the plan.
|
@holdenk / @dongjoon-hyun Could you please review |
| ReplaceExceptWithFilter, | ||
| ReplaceExceptWithAntiJoin, | ||
| ReplaceDistinctWithAggregate, | ||
| ReplaceDeduplicateWithAggregate), |
There was a problem hiding this comment.
Can we document this dependency relation?
There was a problem hiding this comment.
Thank you @holdenk for the review. I added a comment explaining the dependency.
| assert(result.count() === 1) | ||
| assert(result.collect().head.getInt(0) === 2) | ||
|
|
||
| // Also verify except (non-all) works | ||
| val result2 = deduped.except(df2) | ||
| assert(result2.count() === 1) | ||
|
|
||
| // intersectAll should also work | ||
| val result3 = deduped.intersectAll(df2) | ||
| assert(result3.count() <= 1) |
There was a problem hiding this comment.
A bit silly but it might be nice to check that the correct values survive, not just the expected number of values ;)
There was a problem hiding this comment.
Good call - updated the test to assert actual row values (id, name, value) for all three operations. Thanks!
There was a problem hiding this comment.
I realized that the test dataframe can cause non deterministic rows to be picked up. To avoid test flakiness i modified the test data to produce deterministic results keeping all the dataframe rows unique. 😊
What changes were proposed in this pull request?
Reorder
ReplaceDeduplicateWithAggregatebeforeRewriteExceptAllin the "Replace Operators" optimizer batch.Why are the changes needed?
dropDuplicates("id", "name").exceptAll(other)throwsINTERNAL_ERROR_ATTRIBUTE_NOT_FOUNDat execution time. The root cause is thatRewriteExceptAllcaptures attribute references fromleft.outputbeforeReplaceDeduplicateWithAggregatehas replaced the Deduplicate node with an Aggregate(First(...)). The First() alias creates new exprIds that don't match whatRewriteExceptAllbaked into its Generate node.Does this PR introduce any user-facing change?
Yes.
exceptAll (and intersectAll)now work correctly afterdropDuplicateswith a column subset.How was this patch tested?
Added a test in
DataFrameSetOperationsSuiteverifyingexceptAll,except, andintersectAllafterdropDuplicates(subset).Was this patch authored or co-authored using generative AI tooling?
Yes.