diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 8c4fe20853879..523c4e1492fc6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -495,7 +495,9 @@ public void decodeSingleDictionaryId( WritableColumnVector values, WritableColumnVector dictionaryIds, Dictionary dictionary) { - values.putLong(offset, dictionary.decodeToLong(dictionaryIds.getDictId(offset))); + // 32-bit Decimal target (precision <= 9) is stored in `intData`; `longData` is + // unallocated, so use `putInt` with the same narrowing cast as `readValue`/`readValues`. + values.putInt(offset, (int) dictionary.decodeToLong(dictionaryIds.getDictId(offset))); } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 8ed9ef1630eb2..710ab21090ff9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1593,6 +1593,46 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } } + + test("SPARK-56872: INT64 DECIMAL into 32-bit Decimal column with dictionary fallback") { + // `DowncastLongUpdater.decodeSingleDictionaryId` only runs when the vectorized reader has + // to eagerly drain buffered dictionary IDs, which happens when parquet-mr writes the + // column as a mix of dictionary-encoded and PLAIN pages. The mix-cardinality values below + // (4-value pool + unique-per-row) force that fallback; uniformly low- or high-cardinality + // data bypasses the path. INT64 DECIMAL(p<=9) is built via parquet-mr's low-level writer + // because Spark's own writer emits INT32 for that case. + val schema = MessageTypeParser.parseMessageType( + """message root { + | required int64 v (DECIMAL(9, 2)); + |}""".stripMargin) + def unscaledAt(i: Int): Long = i % 5 match { + case 0 => -999999999L + case 1 => -1L + case 2 => 0L + case 3 => 999999999L + case _ => i.toLong * 13L - 7L + } + withTempDir { dir => + val tablePath = new Path(s"${dir.getCanonicalPath}/dec.parquet") + val writer = createParquetWriter(schema, tablePath, dictionaryEnabled = true) + val numRecords = 5000 + (0 until numRecords).foreach { i => + val record = new SimpleGroup(schema) + record.add(0, unscaledAt(i)) + writer.write(record) + } + writer.close() + + withAllParquetReaders { + val readSchema = new StructType().add("v", DecimalType(9, 2), nullable = false) + val df = spark.read.schema(readSchema).parquet(tablePath.toString) + val expected = (0 until numRecords).map { i => + Row(java.math.BigDecimal.valueOf(unscaledAt(i), 2)) + } + checkAnswer(df, expected) + } + } + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)