[SPARK-50593][SQL] SPJ: Support truncate transform via generalized ReducibleFunction API#55885
[SPARK-50593][SQL] SPJ: Support truncate transform via generalized ReducibleFunction API#55885metanil wants to merge 3 commits into
Conversation
…cibleParameters container
… Joins by generalizing parameter handling
…cibleParameters backward compatibility
| transform.children.size == 1 && isReference(transform.children.head) | ||
| // TransformExpression.collectLeaves() only returns column references, not literals. | ||
| // We need exactly one column reference per transform. | ||
| transform.collectLeaves().size == 1 |
There was a problem hiding this comment.
[P1] This widens support from transforms with one direct reference child to any transform whose collectLeaves() returns one leaf. Because Transform arguments can themselves be nested Transforms, V2ExpressionUtils can materialize shapes like outer(years(k)) and outer(days(k)). The new gate accepts both, keyPositions later maps them only by leaf k, and TransformExpression.isSameFunction compares only the outer function name plus literals. That can make storage partitionings with different nested child semantics look compatible and let SPJ skip a required shuffle, which can drop join matches. Keep the old direct-reference constraint, or compare the full non-literal child semantics before admitting these transforms.
[ 🤖 posted by Codex on behalf of sunchao 🤖 ]
| private def extractParameters(expr: TransformExpression): ReducibleParameters = { | ||
| import scala.jdk.CollectionConverters._ | ||
| val values = expr.literalChildren.map { | ||
| case Literal(value, _) => value.asInstanceOf[AnyRef] |
There was a problem hiding this comment.
[P1] extractParameters forwards raw Catalyst Literal.value objects into ReducibleParameters. For string literals, Spark stores UTF8String, while the new public API documents string parameters and exposes getString() as a java.lang.String cast. A connector implementing the documented string-parameter case will get ClassCastException from getString(0) or be forced to depend on Spark internals. Convert literal values to connector-facing external values by dataType before constructing ReducibleParameters.
[ 🤖 posted by Codex on behalf of sunchao 🤖 ]
| val thisParams = extractParameters(thisExpr) | ||
| val otherParams = extractParameters(otherExpr) | ||
|
|
||
| val res = if (!thisParams.isEmpty && !otherParams.isEmpty) { |
There was a problem hiding this comment.
[P2] The new generalized reducer API accepts ReducibleParameters on both sides, and this file even models zero-literal transforms as ReducibleParameters([]). But this dispatch only invokes the generalized overload when both sides are non-empty. Mixed cases such as parameterized-vs-zero-parameter transforms instead fall back to reducer(otherFunction), so connectors that correctly implement the new generalized overload for those cases are never invoked; the default legacy path can even throw UnsupportedOperationException. If mixed arity is meant to be unsupported, the new API/docs should say that explicitly. Otherwise this should dispatch through the generalized overload whenever either side wants that path.
[ 🤖 posted by Codex on behalf of sunchao 🤖 ]
What changes were proposed in this pull request?
This PR adds Storage Partitioned Join (SPJ) support for the
truncatepartition transform. The approach generalizes theReducibleFunctionAPI to accept arbitrary parameters via a newReducibleParameterscontainer, so SPJ can reason about any parameterized transform (bucket, truncate, future ones) through one code path.Key changes:
ReducibleParametersinorg.apache.spark.sql.connector.catalog.functions— a typed parameter container.ReducibleFunction.reducer(ReducibleParameters, ReducibleFunction, ReducibleParameters). The oldreducer(int, ..., int)is marked@Deprecatedbut preserved as the default fallback, so existing connector implementations (e.g., Iceberg 1.10.0) continue to work unchanged.TransformExpressionrefactor: literal parameters (e.g., bucketnumBuckets, truncatewidth) now live insidechildrenrather than a bespokenumBucketsOpt: Option[Int]field.collectLeaves()is overridden to filter literal parameters and return only column references.TransformExpressionthat extractsReducibleParametersfrom literal children and delegates to the new API; compatibility checks (isCompatible,reducers) work uniformly for bucket, truncate, etc.Why are the changes needed?
Today a join on tables partitioned by
truncate(col, N)always shuffles, even when both sides share identical partitioning. The write-side was fixed by SPARK-40295 (Allow v2 functions with literal args in write distribution and ordering), but the read/join side was never enabled.Previous work in #49211 (@szehon-ho) explored direct support for transforms with literal arguments by adjusting the SPJ paths to recognize them. This PR generalizes the reducer API so the compatibility check is function-agnostic, with a default method that delegates to the deprecated single-int signature for backward compatibility.
Does this PR introduce any user-facing change?
Yes, for connector/catalog authors:
ReducibleParameters.ReducibleFunction.reducer(ReducibleParameters, ...)with a default that delegates to the deprecated single-int signature for backward compatibility.LegacyBucketFunctiontest fixture.For end users, queries joining tables partitioned by compatible
truncatetransforms (identical widths, or reducible pairs liketruncate(3)andtruncate(5)) now avoid shuffle via SPJ.How was this patch tested?
5 New tests in
KeyGroupedPartitioningSuitecc @szehon-ho @aokolnychyi @sunchao @peter-toth
Was this patch authored or co-authored using generative AI tooling?
Yes — used only for test cases and Javadoc/Scaladoc comments.
Generated-by: Claude Code (Opus 4.7)