From a41984723eac88264c2a9135b0ab6f35fd0ca1d6 Mon Sep 17 00:00:00 2001 From: YangJie Date: Fri, 15 May 2026 10:03:35 +0200 Subject: [PATCH 1/2] [SPARK-56872][SQL] Fix NPE in DowncastLongUpdater.decodeSingleDictionaryId MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `DowncastLongUpdater.decodeSingleDictionaryId` calls `values.putLong(...)`, but `DowncastLongUpdater` is only chosen when the target is a 32-bit Decimal (precision <= 9), whose column vector stores into `intData`, not `longData`. So `putLong` NPEs whenever this path runs. Switch the call to `putInt` with the same `(int) longValue` narrowing cast already used by `readValue` and `readValues`. The bug has been latent since SPARK-35640 (Jun 2021) because the path is only reachable when all three conditions hold: 1. Parquet stores the column as INT64 + DECIMAL(p<=9). Spark's own writer emits INT32 for this case, so the file must come from another writer (Hive, Impala, ...). 2. Spark reads it as a Decimal with precision <= 9. 3. The vectorized reader has to eagerly drain buffered dictionary IDs — typically when parquet-mr writes the column as a mix of dictionary-encoded and PLAIN pages and a non-dict page follows a dict page in the same batch. The normal lazy-dictionary path decodes at row read time via `ParquetDictionary` and never touches this updater method. Without the fix, the new regression test fails with: ``` Cause: java.lang.NullPointerException: at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLong(OnHeapColumnVector.java:393) at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$DowncastLongUpdater.decodeSingleDictionaryId(ParquetVectorUpdaterFactory.java:713) at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdater.decodeDictionaryIds(ParquetVectorUpdater.java:75) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:288) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:406) ... ``` Yes. Reads that previously NPE'd now return correct values. New `ParquetIOSuite` test that writes an INT64 + DECIMAL(9, 2) column via parquet-mr's low-level writer with mix-cardinality data (4-value pool + unique-per-row) to force the dictionary -> PLAIN fallback. Without the fix it reproduces the NPE above; with the fix it passes. Full `ParquetIOSuite` is green locally. No Closes #55890 from LuciferYang/SPARK-downcast-long-dict-fix. Authored-by: YangJie Signed-off-by: Peter Toth (cherry picked from commit 9f17e186e57b962ad5b7a2205912aed91efd80a3) (cherry picked from commit bc519584e6c70231bace1c35a37aeabb8c18a9f9) --- .../parquet/ParquetVectorUpdaterFactory.java | 4 +- .../datasources/parquet/ParquetIOSuite.scala | 40 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 8c4fe20853879..523c4e1492fc6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -495,7 +495,9 @@ public void decodeSingleDictionaryId( WritableColumnVector values, WritableColumnVector dictionaryIds, Dictionary dictionary) { - values.putLong(offset, dictionary.decodeToLong(dictionaryIds.getDictId(offset))); + // 32-bit Decimal target (precision <= 9) is stored in `intData`; `longData` is + // unallocated, so use `putInt` with the same narrowing cast as `readValue`/`readValues`. + values.putInt(offset, (int) dictionary.decodeToLong(dictionaryIds.getDictId(offset))); } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 8ed9ef1630eb2..cdc0a57464d22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1593,6 +1593,46 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } } + + test("SPARK-56872: INT64 DECIMAL into 32-bit Decimal column with dictionary fallback") { + // `DowncastLongUpdater.decodeSingleDictionaryId` only runs when the vectorized reader has + // to eagerly drain buffered dictionary IDs, which happens when parquet-mr writes the + // column as a mix of dictionary-encoded and PLAIN pages. The mix-cardinality values below + // (4-value pool + unique-per-row) force that fallback; uniformly low- or high-cardinality + // data bypasses the path. INT64 DECIMAL(p<=9) is built via parquet-mr's low-level writer + // because Spark's own writer emits INT32 for that case. + val schema = MessageTypeParser.parseMessageType( + """message root { + | required int64 v (DECIMAL(9, 2)); + |}""".stripMargin) + def unscaledAt(i: Int): Long = i % 5 match { + case 0 => -999_999_999L + case 1 => -1L + case 2 => 0L + case 3 => 999_999_999L + case _ => i.toLong * 13L - 7L + } + withTempDir { dir => + val tablePath = new Path(s"${dir.getCanonicalPath}/dec.parquet") + val writer = createParquetWriter(schema, tablePath, dictionaryEnabled = true) + val numRecords = 5000 + (0 until numRecords).foreach { i => + val record = new SimpleGroup(schema) + record.add(0, unscaledAt(i)) + writer.write(record) + } + writer.close() + + withAllParquetReaders { + val readSchema = new StructType().add("v", DecimalType(9, 2), nullable = false) + val df = spark.read.schema(readSchema).parquet(tablePath.toString) + val expected = (0 until numRecords).map { i => + Row(java.math.BigDecimal.valueOf(unscaledAt(i), 2)) + } + checkAnswer(df, expected) + } + } + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) From 78654c6b676d2ee1158c02c928f1a1da74725c75 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Sat, 16 May 2026 14:33:17 +0800 Subject: [PATCH 2/2] [SPARK-56872][SQL][FOLLOW-UP] Fix Scala 2.12 compilation error in ParquetIOSuite Remove numeric underscore separators (Scala 2.13+ syntax) that cause compilation failure on Scala 2.12. --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index cdc0a57464d22..710ab21090ff9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1606,10 +1606,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession | required int64 v (DECIMAL(9, 2)); |}""".stripMargin) def unscaledAt(i: Int): Long = i % 5 match { - case 0 => -999_999_999L + case 0 => -999999999L case 1 => -1L case 2 => 0L - case 3 => 999_999_999L + case 3 => 999999999L case _ => i.toLong * 13L - 7L } withTempDir { dir =>