diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdater.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdater.java index 9bb852987e656..0774b4ad3475b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdater.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdater.java @@ -58,6 +58,14 @@ void readValues( * should have already been filled, and fills the non-null slots using dictionary IDs from * `dictionaryIds`, together with Parquet `dictionary`. * + *
The default implementation delegates to {@link #decodeBatch}, which calls + * {@link #decodeSingleDictionaryId} per non-null element. Because this default method's + * bytecode is shared by every implementor, C2 sees a megamorphic call site for + * {@code decodeSingleDictionaryId} and cannot inline the per-element decode. Hot-path + * updaters (e.g. {@code IntegerUpdater}) override this method with an identical one-line + * delegation to {@code decodeBatch}; the per-class bytecode gives C2 a monomorphic call + * site, enabling full inlining of the decode expression. + * * @param total total number slots to process in `values` * @param offset starting offset in `values` * @param values destination value vector @@ -70,11 +78,7 @@ default void decodeDictionaryIds( WritableColumnVector values, WritableColumnVector dictionaryIds, Dictionary dictionary) { - for (int i = offset; i < offset + total; i++) { - if (!values.isNullAt(i)) { - decodeSingleDictionaryId(i, values, dictionaryIds, dictionary); - } - } + decodeBatch(total, offset, values, dictionaryIds, dictionary, this); } /** @@ -91,4 +95,33 @@ void decodeSingleDictionaryId( WritableColumnVector values, WritableColumnVector dictionaryIds, Dictionary dictionary); + + /** + * Batch-decodes dictionary IDs with a no-null fast path. + * + *
When {@code values.hasNull()} is false the per-element {@code isNullAt} check is
+ * skipped entirely. The loop body calls {@code updater.decodeSingleDictionaryId}, which
+ * C2 can devirtualize and inline when the {@code updater} reference has a known concrete
+ * type -- i.e. when this helper is called from a per-class override of
+ * {@link #decodeDictionaryIds} rather than from the shared default method.
+ */
+ static void decodeBatch(
+ int total,
+ int offset,
+ WritableColumnVector values,
+ WritableColumnVector dictionaryIds,
+ Dictionary dictionary,
+ ParquetVectorUpdater updater) {
+ if (!values.hasNull()) {
+ for (int i = offset; i < offset + total; i++) {
+ updater.decodeSingleDictionaryId(i, values, dictionaryIds, dictionary);
+ }
+ } else {
+ for (int i = offset; i < offset + total; i++) {
+ if (!values.isNullAt(i)) {
+ updater.decodeSingleDictionaryId(i, values, dictionaryIds, dictionary);
+ }
+ }
+ }
+ }
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index edca5528a190a..f89742f7c7255 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -346,6 +346,16 @@ public void readValue(
values.putInt(offset, valuesReader.readInteger());
}
+ @Override
+ public void decodeDictionaryIds(
+ int total,
+ int offset,
+ WritableColumnVector values,
+ WritableColumnVector dictionaryIds,
+ Dictionary dictionary) {
+ ParquetVectorUpdater.decodeBatch(total, offset, values, dictionaryIds, dictionary, this);
+ }
+
@Override
public void decodeSingleDictionaryId(
int offset,
@@ -379,6 +389,16 @@ public void readValue(
values.putLong(offset, valuesReader.readInteger());
}
+ @Override
+ public void decodeDictionaryIds(
+ int total,
+ int offset,
+ WritableColumnVector values,
+ WritableColumnVector dictionaryIds,
+ Dictionary dictionary) {
+ ParquetVectorUpdater.decodeBatch(total, offset, values, dictionaryIds, dictionary, this);
+ }
+
@Override
public void decodeSingleDictionaryId(
int offset,
@@ -667,6 +687,16 @@ public void readValue(
values.putLong(offset, valuesReader.readLong());
}
+ @Override
+ public void decodeDictionaryIds(
+ int total,
+ int offset,
+ WritableColumnVector values,
+ WritableColumnVector dictionaryIds,
+ Dictionary dictionary) {
+ ParquetVectorUpdater.decodeBatch(total, offset, values, dictionaryIds, dictionary, this);
+ }
+
@Override
public void decodeSingleDictionaryId(
int offset,
@@ -934,6 +964,16 @@ public void readValue(
values.putFloat(offset, valuesReader.readFloat());
}
+ @Override
+ public void decodeDictionaryIds(
+ int total,
+ int offset,
+ WritableColumnVector values,
+ WritableColumnVector dictionaryIds,
+ Dictionary dictionary) {
+ ParquetVectorUpdater.decodeBatch(total, offset, values, dictionaryIds, dictionary, this);
+ }
+
@Override
public void decodeSingleDictionaryId(
int offset,
@@ -967,6 +1007,16 @@ public void readValue(
values.putDouble(offset, valuesReader.readFloat());
}
+ @Override
+ public void decodeDictionaryIds(
+ int total,
+ int offset,
+ WritableColumnVector values,
+ WritableColumnVector dictionaryIds,
+ Dictionary dictionary) {
+ ParquetVectorUpdater.decodeBatch(total, offset, values, dictionaryIds, dictionary, this);
+ }
+
@Override
public void decodeSingleDictionaryId(
int offset,
@@ -1000,6 +1050,16 @@ public void readValue(
values.putDouble(offset, valuesReader.readDouble());
}
+ @Override
+ public void decodeDictionaryIds(
+ int total,
+ int offset,
+ WritableColumnVector values,
+ WritableColumnVector dictionaryIds,
+ Dictionary dictionary) {
+ ParquetVectorUpdater.decodeBatch(total, offset, values, dictionaryIds, dictionary, this);
+ }
+
@Override
public void decodeSingleDictionaryId(
int offset,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionaryDecodeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionaryDecodeBenchmark.scala
new file mode 100644
index 0000000000000..e7e2e6881fb1b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionaryDecodeBenchmark.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.time.ZoneOffset
+
+import org.apache.parquet.column.{ColumnDescriptor, Encoding}
+import org.apache.parquet.schema.{LogicalTypeAnnotation, Types}
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.Type.Repetition
+
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.types._
+
+/**
+ * Benchmark for `ParquetVectorUpdater.decodeDictionaryIds` -- the second pass of
+ * dictionary-encoded Parquet reads. After `VectorizedRleValuesReader.readIntegers`
+ * populates dictionary IDs and null markers, `decodeDictionaryIds` translates the IDs
+ * into decoded values.
+ *
+ * Coverage:
+ * A. Core primitive Updaters: Integer, Long, Float, Double.
+ * B. Type-converting Updaters: IntegerToLong, FloatToDouble.
+ *
+ * Each group is tested with three null fractions (0%, 10%, 50%) to exercise the
+ * no-null fast path and the per-element null-check path.
+ *
+ * The dictionary is an anonymous `org.apache.parquet.column.Dictionary` backed by
+ * pre-populated arrays (100 entries), matching the production decode-to-xxx methods.
+ * Dictionary IDs are uniform-random in [0, 100).
+ *
+ * JIT note: `decodeDictionaryIds` has two branches (no-null vs has-null). Running one
+ * branch extensively can bias the JIT against the other via uncommon-trap demotion.
+ * A global pre-warm phase interleaves both branches for every updater class before any
+ * measurement to ensure C2 compiles with balanced profiles.
+ *
+ * To run this benchmark:
+ * {{{
+ * 1. build/sbt "sql/Test/runMain