From 181968e09241c8e75db014c56323b2ed703d3b40 Mon Sep 17 00:00:00 2001 From: shrirangmhalgi Date: Fri, 15 May 2026 09:19:37 -0700 Subject: [PATCH 1/3] [SPARK-51262][SQL] Fix exceptAll after dropDuplicates with subset ReplaceDeduplicateWithAggregate replaces Deduplicate with an Aggregate using First() for non-key columns, creating new attribute exprIds. When RewriteExceptAll ran first in the same optimizer batch, it captured the original exprIds in its Generate node. After ReplaceDeduplicateWithAggregate rewrote the Deduplicate, the Generate still referenced the old exprIds, causing INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND at execution time. Fix: reorder ReplaceDeduplicateWithAggregate before RewriteExceptAll in the Replace Operators batch so Deduplicate is already an Aggregate when RewriteExceptAll processes the plan. --- .../sql/catalyst/optimizer/Optimizer.scala | 4 +-- .../sql/DataFrameSetOperationsSuite.scala | 29 +++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ddfe80443d561..ec0d182012115 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -214,13 +214,13 @@ abstract class Optimizer(catalogManager: CatalogManager) OptimizeSubqueries, OptimizeOneRowRelationSubquery), Batch("Replace Operators", fixedPoint, + ReplaceDeduplicateWithAggregate, RewriteExceptAll, RewriteIntersectAll, ReplaceIntersectWithSemiJoin, ReplaceExceptWithFilter, ReplaceExceptWithAntiJoin, - ReplaceDistinctWithAggregate, - ReplaceDeduplicateWithAggregate), + ReplaceDistinctWithAggregate), Batch("Aggregate", fixedPoint, RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index e65942689bc06..ee08b9f856afb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -1642,6 +1642,35 @@ class DataFrameSetOperationsSuite extends SharedSparkSession with AdaptiveSparkP } } } + + test("SPARK-51262: exceptAll after dropDuplicates with subset should not throw") { + val df1 = spark.createDataFrame(Seq( + (1, "a", 100), + (1, "a", 200), + (2, "b", 300) + )).toDF("id", "name", "value") + + val df2 = spark.createDataFrame(Seq( + (1, "a", 100) + )).toDF("id", "name", "value") + + // dropDuplicates with subset keeps one row per (id, name) group + val deduped = df1.dropDuplicates("id", "name") + + // exceptAll should work without INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND + val result = deduped.exceptAll(df2) + assert(result.columns === Array("id", "name", "value")) + assert(result.count() === 1) + assert(result.collect().head.getInt(0) === 2) + + // Also verify except (non-all) works + val result2 = deduped.except(df2) + assert(result2.count() === 1) + + // intersectAll should also work + val result3 = deduped.intersectAll(df2) + assert(result3.count() <= 1) + } } case class UnionClass1a(a: Int, b: Long, nested: UnionClass2) From 5f80a4345235446ec9e33aa8f1c70127e70adc25 Mon Sep 17 00:00:00 2001 From: shrirangmhalgi Date: Fri, 15 May 2026 12:15:49 -0700 Subject: [PATCH 2/3] Address review: Add dependency comment and strengthen test assertions --- .../sql/catalyst/optimizer/Optimizer.scala | 4 ++++ .../sql/DataFrameSetOperationsSuite.scala | 23 ++++++++++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ec0d182012115..b99cbd0750ce4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -214,6 +214,10 @@ abstract class Optimizer(catalogManager: CatalogManager) OptimizeSubqueries, OptimizeOneRowRelationSubquery), Batch("Replace Operators", fixedPoint, + // SPARK-51262: ReplaceDeduplicateWithAggregate must run before RewriteExceptAll because + // it replaces Deduplicate with Aggregate(First(...)), creating new attribute exprIds. + // If RewriteExceptAll runs first, its Generate node captures stale exprIds that no + // longer exist after the Deduplicate-to-Aggregate rewrite. ReplaceDeduplicateWithAggregate, RewriteExceptAll, RewriteIntersectAll, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index ee08b9f856afb..fe70bd5786772 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -1660,16 +1660,27 @@ class DataFrameSetOperationsSuite extends SharedSparkSession with AdaptiveSparkP // exceptAll should work without INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND val result = deduped.exceptAll(df2) assert(result.columns === Array("id", "name", "value")) - assert(result.count() === 1) - assert(result.collect().head.getInt(0) === 2) + val rows = result.collect() + assert(rows.length === 1) + assert(rows.head.getInt(0) === 2) + assert(rows.head.getString(1) === "b") + assert(rows.head.getInt(2) === 300) - // Also verify except (non-all) works + // Also verify except (non-all) works and returns correct values val result2 = deduped.except(df2) - assert(result2.count() === 1) + val rows2 = result2.collect() + assert(rows2.length === 1) + assert(rows2.head.getInt(0) === 2) + assert(rows2.head.getString(1) === "b") + assert(rows2.head.getInt(2) === 300) - // intersectAll should also work + // intersectAll should also work and return the matching row val result3 = deduped.intersectAll(df2) - assert(result3.count() <= 1) + val rows3 = result3.collect() + assert(rows3.length === 1) + assert(rows3.head.getInt(0) === 1) + assert(rows3.head.getString(1) === "a") + assert(rows3.head.getInt(2) === 100) } } From b38ca02ce545836fd57327fa5ea16bf19659c645 Mon Sep 17 00:00:00 2001 From: shrirangmhalgi Date: Fri, 15 May 2026 13:20:09 -0700 Subject: [PATCH 3/3] Use unique test data to avoid non-deterministic First() behavior --- .../sql/DataFrameSetOperationsSuite.scala | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index fe70bd5786772..d838ba4c234f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -1644,43 +1644,40 @@ class DataFrameSetOperationsSuite extends SharedSparkSession with AdaptiveSparkP } test("SPARK-51262: exceptAll after dropDuplicates with subset should not throw") { + // Data where dropDuplicates(subset) produces deterministic results - to avoid test flakiness. val df1 = spark.createDataFrame(Seq( (1, "a", 100), - (1, "a", 200), - (2, "b", 300) + (2, "b", 200), + (3, "c", 300) )).toDF("id", "name", "value") val df2 = spark.createDataFrame(Seq( (1, "a", 100) )).toDF("id", "name", "value") - // dropDuplicates with subset keeps one row per (id, name) group + // dropDuplicates with subset - each (id, name) is already unique so output is deterministic val deduped = df1.dropDuplicates("id", "name") // exceptAll should work without INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND val result = deduped.exceptAll(df2) assert(result.columns === Array("id", "name", "value")) - val rows = result.collect() - assert(rows.length === 1) - assert(rows.head.getInt(0) === 2) - assert(rows.head.getString(1) === "b") - assert(rows.head.getInt(2) === 300) + val rows = result.collect().sortBy(_.getInt(0)) + assert(rows.length === 2) + assert(rows(0) === Row(2, "b", 200)) + assert(rows(1) === Row(3, "c", 300)) // Also verify except (non-all) works and returns correct values val result2 = deduped.except(df2) - val rows2 = result2.collect() - assert(rows2.length === 1) - assert(rows2.head.getInt(0) === 2) - assert(rows2.head.getString(1) === "b") - assert(rows2.head.getInt(2) === 300) + val rows2 = result2.collect().sortBy(_.getInt(0)) + assert(rows2.length === 2) + assert(rows2(0) === Row(2, "b", 200)) + assert(rows2(1) === Row(3, "c", 300)) // intersectAll should also work and return the matching row val result3 = deduped.intersectAll(df2) val rows3 = result3.collect() assert(rows3.length === 1) - assert(rows3.head.getInt(0) === 1) - assert(rows3.head.getString(1) === "a") - assert(rows3.head.getInt(2) === 100) + assert(rows3.head === Row(1, "a", 100)) } }