feat(experimental): ScalaUDF and Java UDF support via Janino codegen#4267
feat(experimental): ScalaUDF and Java UDF support via Janino codegen#4267mbutrovich wants to merge 93 commits into
Conversation
|
There are like 4 Spark SQL test failures that look like they might need updating, but otherwise it's looking good. Not gonna worry about them until we discuss moving forward. |
…ted body" on Spark 3.5
# Conflicts: # dev/diffs/3.4.3.diff # dev/diffs/3.5.8.diff
andygrove
left a comment
There was a problem hiding this comment.
I did a first pass review without using AI and this is looking great! It is disabled by default so I'd be fine with merging and keep iterating. I would like to run some benchmarks at scale.
I'll review again with AI assistance next.
|
AI review:
|
This is old from when I was testing with the feature enabled by default. I'll update the description and address the other feedback shortly. Thanks @andygrove! |
|
Thanks @andygrove for the careful review and the AI pass, both turned up real issues. Changes pushed:
The AI review caught two real correctness bugs the previous design had:
The other AI suggestions turned out to be moot:
PR description updated to drop the stale |
…, update user guide
Which issue does this PR close?
Closes #.
Rationale for this change
Builds on the JVM UDF bridge (#4232) and the per-task-attempt-id bridge cache (#4345). Adds
CometScalaUDFCodegen, aCometUDFthat compiles a specialized Arrow batch kernel per boundScalaUDFand input schema via Janino. Without it, any plan containing aScalaUDFfalls back to Spark for the enclosing operator.ScalaUDFwhose argument and return types are in the supported surface routes through native, no hand-writtenCometUDFrequired.upper(s),concat(c1, c2),monotonically_increasing_id(), HOFs liketransform/filter/array_max) compile into the same per-row loop as the user function.Opt-in via
spark.comet.exec.scalaUDF.codegen.enabled(defaultfalse, experimental).Iceberg ships ScalaUDFs that real workloads hit:
IcebergSpark.registerBucketUDF/registerTruncateUDFfor partition-aligned predicates, andRewriteDataFileswithsort-strategy=zorderfor compaction. With this PR enabled, those run natively and the surrounding project / exchange / sort stay on the Comet path. Without it, the operator falls back and the shuffle gets demoted fromCometExchangetoCometColumnarExchange.The dispatcher is one of potentially many
CometUDFimplementations the bridge can route to. Hand-writtenCometUDFs for specific expression families remain a parallel path, dispatched by class name from the proto.What changes are included in this PR?
spark/src/main/scala/org/apache/comet/codegen/,.../udf/codegen/,.../serde/CometScalaUDF.scalaspark/src/test/scala/org/apache/comet/CometCodegen*.scalaspark/src/main/spark-{3.x,4.0,4.1,4.2,4.x}/.../shims/docs/source/user-guide/latest/scala_java_udfs.mdpr_build_*.ymlWhere to focus review
CometBatchKernelCodegenplus the input/output emitters.CometScalaUDFCodegen.CometScalaUDF.CometCodegenAssertions.How are these changes tested?
CometCodegenSourceSuite: generated-source assertions for each optimization, complex-type shapes, null-guard contract per Struct / Array / Map element and field, andCacheKeydiscrimination onArrowColumnSpec.nullable.CometCodegenSuite: end-to-end correctness across scalar and complex type surfaces, composed UDF trees, subquery reuse,TaskContextpropagation, per-task cache isolation, kernel-cache reuse across batches, ScalaUDF as a child of a native Spark expression,maxFieldsplan-time gate, null-guard contract viaarray_max(flatten(...))for Binary / String / Decimal short / Decimal long.CometCodegenHOFSuite:ArrayTransform/ArrayFilterregressions plus a per-task isolation regression that runs the same HOF query twice and asserts each matches Spark.CometCodegenFuzzSuite: schema-driven fuzz over random parquet: identity ScalaUDF on every primitive column, cardinality probe on every complex column, per-columnarray_maxelement fuzz,array_max(flatten(...))overArray<Array<primitive>>,array_max(map_keys / map_values(...))overMap<primitive, primitive>,array_distinctoverArray<Struct<primitives>>, randomized decimal sweep across theMAX_LONG_DIGITS=18boundary at varying null densities.