From f46d52489e4472dbf4a847ba701ee8ba5b88618f Mon Sep 17 00:00:00 2001 From: shruti2522 Date: Fri, 10 Apr 2026 09:59:50 +0000 Subject: [PATCH] feat(parquet): support CDC writer path --- .../datasource-parquet/src/file_format.rs | 40 ++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index c4faedf571f6d..82990ea305398 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -101,6 +101,13 @@ const INITIAL_BUFFER_BYTES: usize = 1048576; /// this size, it is flushed to object store const BUFFER_FLUSH_BYTES: usize = 1024000; +/// CDC requires the sequential writer path because chunker state is maintained +/// inside `ArrowWriter` across row groups +fn should_use_sequential_writer(parquet_opts: &TableParquetOptions) -> bool { + !parquet_opts.global.allow_single_file_parallelism + || parquet_opts.global.use_content_defined_chunking.is_some() +} + #[derive(Default)] /// Factory struct used to create [ParquetFormat] pub struct ParquetFormatFactory { @@ -1370,11 +1377,7 @@ impl FileSink for ParquetSink { while let Some((path, mut rx)) = file_stream_rx.recv().await { let parquet_props = self.create_writer_props(&runtime, &path).await?; - // CDC requires the sequential writer: the chunker state lives in ArrowWriter - // and persists across row groups. The parallel path bypasses ArrowWriter entirely. - if !parquet_opts.global.allow_single_file_parallelism - || parquet_opts.global.use_content_defined_chunking.is_some() - { + if should_use_sequential_writer(parquet_opts) { let mut writer = self .create_async_arrow_writer( &path, @@ -1816,6 +1819,7 @@ mod tests { use parquet::arrow::parquet_to_arrow_schema; use super::*; + use datafusion_common::config::CdcOptions; use parquet::schema::parser::parse_message_type; @@ -2025,4 +2029,30 @@ mod tests { assert_eq!(result, expected_schema); } + + #[test] + fn sequential_when_parallel_off() { + let mut opts = TableParquetOptions::default(); + opts.global.allow_single_file_parallelism = false; + + assert!(should_use_sequential_writer(&opts)); + } + + #[test] + fn sequential_when_cdc_on() { + let mut opts = TableParquetOptions::default(); + opts.global.allow_single_file_parallelism = true; + opts.global.use_content_defined_chunking = Some(CdcOptions::default()); + + assert!(should_use_sequential_writer(&opts)); + } + + #[test] + fn parallel_when_allowed_and_cdc_off() { + let mut opts = TableParquetOptions::default(); + opts.global.allow_single_file_parallelism = true; + opts.global.use_content_defined_chunking = None; + + assert!(!should_use_sequential_writer(&opts)); + } }