Skip to content

feat(experimental): ScalaUDF and Java UDF support via Janino codegen#4267

Open
mbutrovich wants to merge 93 commits into
apache:mainfrom
mbutrovich:codegen_scala_udf
Open

feat(experimental): ScalaUDF and Java UDF support via Janino codegen#4267
mbutrovich wants to merge 93 commits into
apache:mainfrom
mbutrovich:codegen_scala_udf

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich commented May 8, 2026

Which issue does this PR close?

Closes #.

Rationale for this change

Builds on the JVM UDF bridge (#4232) and the per-task-attempt-id bridge cache (#4345). Adds CometScalaUDFCodegen, a CometUDF that compiles a specialized Arrow batch kernel per bound ScalaUDF and input schema via Janino. Without it, any plan containing a ScalaUDF falls back to Spark for the enclosing operator.

  • Any ScalaUDF whose argument and return types are in the supported surface routes through native, no hand-written CometUDF required.
  • The whole argument tree binds together, so Catalyst sub-expressions (upper(s), concat(c1, c2), monotonically_increasing_id(), HOFs like transform / filter / array_max) compile into the same per-row loop as the user function.
  • Surrounding native operators stay native. The UDF is no longer a whole-operator fallback boundary.

Opt-in via spark.comet.exec.scalaUDF.codegen.enabled (default false, experimental).

Iceberg ships ScalaUDFs that real workloads hit: IcebergSpark.registerBucketUDF / registerTruncateUDF for partition-aligned predicates, and RewriteDataFiles with sort-strategy=zorder for compaction. With this PR enabled, those run natively and the surrounding project / exchange / sort stay on the Comet path. Without it, the operator falls back and the shuffle gets demoted from CometExchange to CometColumnarExchange.

The dispatcher is one of potentially many CometUDF implementations the bridge can route to. Hand-written CometUDFs for specific expression families remain a parallel path, dispatched by class name from the proto.

What changes are included in this PR?

Area Where
Core codegen + dispatcher + serde spark/src/main/scala/org/apache/comet/codegen/, .../udf/codegen/, .../serde/CometScalaUDF.scala
Tests 4 suites + shared assertions under spark/src/test/scala/org/apache/comet/CometCodegen*.scala
Cross-version shims spark/src/main/spark-{3.x,4.0,4.1,4.2,4.x}/.../shims/
Native (Rust) FFI / planner / jni-bridge cleanup
Docs docs/source/user-guide/latest/scala_java_udfs.md
CI New suite names in pr_build_*.yml

Where to focus review

  • Codegen template: CometBatchKernelCodegen plus the input/output emitters.
  • Dispatcher lifetime, caching, synchronization: CometScalaUDFCodegen.
  • Plan-time gating and bound-tree serialization: CometScalaUDF.
  • Test helpers: CometCodegenAssertions.

How are these changes tested?

  • CometCodegenSourceSuite: generated-source assertions for each optimization, complex-type shapes, null-guard contract per Struct / Array / Map element and field, and CacheKey discrimination on ArrowColumnSpec.nullable.
  • CometCodegenSuite: end-to-end correctness across scalar and complex type surfaces, composed UDF trees, subquery reuse, TaskContext propagation, per-task cache isolation, kernel-cache reuse across batches, ScalaUDF as a child of a native Spark expression, maxFields plan-time gate, null-guard contract via array_max(flatten(...)) for Binary / String / Decimal short / Decimal long.
  • CometCodegenHOFSuite: ArrayTransform / ArrayFilter regressions plus a per-task isolation regression that runs the same HOF query twice and asserts each matches Spark.
  • CometCodegenFuzzSuite: schema-driven fuzz over random parquet: identity ScalaUDF on every primitive column, cardinality probe on every complex column, per-column array_max element fuzz, array_max(flatten(...)) over Array<Array<primitive>>, array_max(map_keys / map_values(...)) over Map<primitive, primitive>, array_distinct over Array<Struct<primitives>>, randomized decimal sweep across the MAX_LONG_DIGITS=18 boundary at varying null densities.

@mbutrovich
Copy link
Copy Markdown
Contributor Author

There are like 4 Spark SQL test failures that look like they might need updating, but otherwise it's looking good. Not gonna worry about them until we discuss moving forward.

Comment thread docs/source/user-guide/latest/scala_java_udfs.md Outdated
Copy link
Copy Markdown
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a first pass review without using AI and this is looking great! It is disabled by default so I'd be fine with merging and keep iterating. I would like to run some benchmarks at scale.

I'll review again with AI assistance next.

@andygrove
Copy link
Copy Markdown
Member

andygrove commented May 19, 2026

AI review:

  • On CometScalaUDFCodegen.scala:149 (ensureKernel):
    "Could you walk me through what happens to Rand and MonotonicallyIncreasingID state when a partition has batches with different nullability profiles? The cache key includes per-batch nullable(v) =
    v.getNullCount != 0, so a nullability flip inside the partition will land on a different activeKey, drop the existing activeKernel, and re-init(partitionId) the new one. If batches flip back and forth,
    each transition reseeds XORShiftRandom and resets the ID counter, which could produce duplicate rand sequences or overlapping IDs. Is there a test that pins a single partition's rand /
    monotonically_increasing_id output across a nullability flip?"
  • On CometScalaUDFCodegen.scala:213 (nullable(v)):
    "Arrow Java's getNullCountcan return-1for vectors whose null count hasn't been computed yet (depending on how the vector was constructed). On the FFI import path from native, is there a
    guaranteegetNullCountis materialized before we observe it here? A-1would compare!= 0 and the kernel would compile as nullable, which is harmless. The opposite direction (0` returned for a vector that does
    have nulls) is what would worry me."
  • On CometScalaUDF.scala:62 (expr.collect { case a: AttributeReference => a }.distinct):
    "AttributeReference equality includes exprId so this should dedupe correctly, but it might be worth a comment noting that the resulting ordinal order is determined by tree traversal order (so the executor
    side has to recompute the same bound positions from the same tree)."
  • On CometBatchKernelCodegen.scala:212-218 (freshReferences):
    "The freshReferences closure captures boundExpr and inputSchema and re-runs generateSource each time. The doc says this is microseconds versus milliseconds for Janino compile, which is fine, but if a
    partition flips between two kernels often this becomes per-flip work. Worth a follow-up TODO to cache the references array per cache entry and only regenerate when ScalaUDF stateful encoders force it?"
  • On the PR description (Spark test diffs bullet):
    "The PR description mentions dev/diffs/*.diff updates totaling 198 lines, but I don't see them in the current diff (looks like commit a159357 rolled them back). If Matt's earlier comment about "4 Spark
    SQL test failures" still applies, those diff updates might want to come back. If those failures resolved themselves with the rebase, the bullet in the description can drop."
  • General (no specific anchor):
    "All CI green, very thorough test coverage, well-documented internal invariants. Disabled by default makes the experimental landing low-risk. Once the rand/MonotonicallyIncreasingID lifetime question is
    settled (either with a test or with a code clarification) I think this is good to merge and iterate from main."

@mbutrovich
Copy link
Copy Markdown
Contributor Author

mbutrovich commented May 19, 2026

  • On the PR description (Spark test diffs bullet):
    "The PR description mentions dev/diffs/*.diff updates totaling 198 lines, but I don't see them in the current diff (looks like commit a159357 rolled them back). If Matt's earlier comment about "4 Spark
    SQL test failures" still applies, those diff updates might want to come back. If those failures resolved themselves with the rebase, the bullet in the description can drop."

This is old from when I was testing with the feature enabled by default. I'll update the description and address the other feedback shortly. Thanks @andygrove!

@mbutrovich
Copy link
Copy Markdown
Contributor Author

Thanks @andygrove for the careful review and the AI pass, both turned up real issues.

Changes pushed:

  • Doc: scalar UDF intro now links to Spark's scalar UDF guide and avoids the temporally fragile "no longer falls back" phrasing.
  • Dispatcher: stopped deriving spec nullability from per-batch getNullCount. The cache key is now a function of the bound expression bytes and the schema-stable input shapes only. BoundReference.nullable (Catalyst's schema-tracked flag, baked into the serialized bytes on the driver) is the sole source of nullability information, so schema-declared non-null columns still get full isNullAt elision via Spark's own doGenCode.
  • Dispatcher: replaced the single activeKernel slot with a kernel instance stashed in each CacheEntry. The instance is init(partitionId)'d once at compile time and reused for every batch of that (expression, schema). Removed ensureKernel, rewriteBoundReferences, and the active-slot bookkeeping.

The AI review caught two real correctness bugs the previous design had:

  • A nullability flip mid-partition (one batch has nulls, the next does not) reset the kernel and replayed any per-partition stateful counters (MonotonicallyIncreasingID, Rand's XORShiftRandom). Reproduced with a 200-row single-partition parquet at batch size 8 with a null range; the new test pins the invariant.
  • A plan with two distinct ScalaUDFs in one operator thrashed the single active-kernel slot per batch and reset state on every flip. Reproduced with two UDFs each wrapping monotonically_increasing_id(); new test pins it. Confirmed the same shape also fires when one UDF is applied to two columns of differing schema nullability (different BoundReference ordinals produce different cache keys), covered by a third test.

The other AI suggestions turned out to be moot:

  • getNullCount == -1 no longer matters since we no longer read getNullCount.
  • The freshReferences per-flip cost is no longer a concern since the kernel is instantiated exactly once per cache entry rather than per partition-flip.
  • The AttributeReference.collect.distinct ordering note: the existing comment already documents the load-bearing invariant (ordinals align with the data args we ship), and the framing the AI suggested would have been slightly misleading about what the executor does.

PR description updated to drop the stale dev/diffs/*.diff bullet (artifact of when this was enabled by default).

@mbutrovich mbutrovich requested a review from andygrove May 20, 2026 00:59
@mbutrovich mbutrovich marked this pull request as ready for review May 20, 2026 12:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In progress

Development

Successfully merging this pull request may close these issues.

2 participants