Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,9 @@ public void decodeSingleDictionaryId(
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
values.putLong(offset, dictionary.decodeToLong(dictionaryIds.getDictId(offset)));
// 32-bit Decimal target (precision <= 9) is stored in `intData`; `longData` is
// unallocated, so use `putInt` with the same narrowing cast as `readValue`/`readValues`.
values.putInt(offset, (int) dictionary.decodeToLong(dictionaryIds.getDictId(offset)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,46 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}
}

test("SPARK-56872: INT64 DECIMAL into 32-bit Decimal column with dictionary fallback") {
// `DowncastLongUpdater.decodeSingleDictionaryId` only runs when the vectorized reader has
// to eagerly drain buffered dictionary IDs, which happens when parquet-mr writes the
// column as a mix of dictionary-encoded and PLAIN pages. The mix-cardinality values below
// (4-value pool + unique-per-row) force that fallback; uniformly low- or high-cardinality
// data bypasses the path. INT64 DECIMAL(p<=9) is built via parquet-mr's low-level writer
// because Spark's own writer emits INT32 for that case.
val schema = MessageTypeParser.parseMessageType(
"""message root {
| required int64 v (DECIMAL(9, 2));
|}""".stripMargin)
def unscaledAt(i: Int): Long = i % 5 match {
case 0 => -999999999L
case 1 => -1L
case 2 => 0L
case 3 => 999999999L
case _ => i.toLong * 13L - 7L
}
withTempDir { dir =>
val tablePath = new Path(s"${dir.getCanonicalPath}/dec.parquet")
val writer = createParquetWriter(schema, tablePath, dictionaryEnabled = true)
val numRecords = 5000
(0 until numRecords).foreach { i =>
val record = new SimpleGroup(schema)
record.add(0, unscaledAt(i))
writer.write(record)
}
writer.close()

withAllParquetReaders {
val readSchema = new StructType().add("v", DecimalType(9, 2), nullable = false)
val df = spark.read.schema(readSchema).parquet(tablePath.toString)
val expected = (0 until numRecords).map { i =>
Row(java.math.BigDecimal.valueOf(unscaledAt(i), 2))
}
checkAnswer(df, expected)
}
}
}
}

class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
Expand Down