From 1845d8f6c53a2be9ec5cf9ae5d8cd32b1eeff3fc Mon Sep 17 00:00:00 2001 From: Ratul Dawar Date: Tue, 9 Jun 2026 00:39:38 +0530 Subject: [PATCH 1/3] Skip loading Parquet page index when row-group stats prove it cannot prune. Reorder the opener so row-group statistics pruning runs before the page index load, and skip that I/O when every surviving row group is fully matched. Co-authored-by: Cursor --- datafusion/datasource-parquet/src/opener.rs | 158 +++++++++++++++----- 1 file changed, 121 insertions(+), 37 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 1db055880d7d4..f6e3b96cca157 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -179,10 +179,10 @@ impl Morselizer for ParquetMorselizer { /// PrepareFilters /// | /// v -/// LoadPageIndex +/// PruneWithStatistics /// | /// v -/// PruneWithStatistics +/// LoadPageIndex? (skipped when all surviving row groups are fully matched) /// | /// v /// LoadBloomFilters @@ -217,10 +217,10 @@ enum ParquetOpenState { /// Specialize any filters for the actual file schema (only known after /// metadata is loaded) PrepareFilters(Box), - /// Loading [Parquet Page Index](https://parquet.apache.org/docs/file-format/pageindex/) - LoadPageIndex(BoxFuture<'static, Result>), /// Pruning Row Groups PruneWithStatistics(Box), + /// Loading [Parquet Page Index](https://parquet.apache.org/docs/file-format/pageindex/) + LoadPageIndex(BoxFuture<'static, Result>), /// Loading bloom filters required for row-group pruning LoadBloomFilters(BoxFuture<'static, Result>), /// Pruning with preloaded Bloom Filters @@ -378,19 +378,28 @@ impl ParquetOpenState { } ParquetOpenState::PrepareFilters(loaded) => { let prepared_filters = loaded.prepare_filters()?; - Ok(ParquetOpenState::LoadPageIndex( - prepared_filters.load_page_index().boxed(), - )) + Ok(ParquetOpenState::PruneWithStatistics(Box::new( + prepared_filters, + ))) + } + ParquetOpenState::PruneWithStatistics(prepared) => { + let prepared_row_groups = (*prepared).prune_row_groups()?; + if should_load_page_index( + prepared_row_groups.prepared.page_pruning_predicate.as_ref(), + &prepared_row_groups.row_groups, + ) { + Ok(ParquetOpenState::LoadPageIndex( + prepared_row_groups.load_page_index().boxed(), + )) + } else { + Ok(ParquetOpenState::LoadBloomFilters( + prepared_row_groups.load_bloom_filters().boxed(), + )) + } } ParquetOpenState::LoadPageIndex(future) => { Ok(ParquetOpenState::LoadPageIndex(future)) } - ParquetOpenState::PruneWithStatistics(prepared) => { - let prepared_row_groups = prepared.prune_row_groups()?; - Ok(ParquetOpenState::LoadBloomFilters( - prepared_row_groups.load_bloom_filters().boxed(), - )) - } ParquetOpenState::LoadBloomFilters(future) => { Ok(ParquetOpenState::LoadBloomFilters(future)) } @@ -504,9 +513,9 @@ impl MorselPlanner for ParquetMorselPlanner { } ParquetOpenState::LoadPageIndex(future) => { Ok(Some(Self::schedule_io(async move { - Ok(ParquetOpenState::PruneWithStatistics(Box::new( - future.await?, - ))) + Ok(ParquetOpenState::LoadBloomFilters( + future.await?.load_bloom_filters().boxed(), + )) }))) } ParquetOpenState::LoadBloomFilters(future) => { @@ -853,27 +862,6 @@ impl MetadataLoadedParquetOpen { } impl FiltersPreparedParquetOpen { - /// Load the page index if pruning requires it and metadata did not include it. - async fn load_page_index(mut self) -> Result { - // The page index is not stored inline in the parquet footer so the - // metadata load above may not have read the page index structures yet. - // If we need them for reading and they aren't yet loaded, we need to - // load them now. - if self.page_pruning_predicate.is_some() { - self.loaded.reader_metadata = load_page_index( - self.loaded.reader_metadata, - &mut self.loaded.prepared.async_file_reader, - self.loaded - .options - .clone() - .with_page_index_policy(PageIndexPolicy::Optional), - ) - .await?; - } - - Ok(self) - } - /// Prune row groups using file ranges and parquet metadata. fn prune_row_groups(self) -> Result { let loaded = &self.loaded; @@ -942,6 +930,22 @@ impl FiltersPreparedParquetOpen { } impl RowGroupsPrunedParquetOpen { + /// Load the page index if pruning requires it and metadata did not include it. + async fn load_page_index(mut self) -> Result { + self.prepared.loaded.reader_metadata = load_page_index( + self.prepared.loaded.reader_metadata.clone(), + &mut self.prepared.loaded.prepared.async_file_reader, + self.prepared + .loaded + .options + .clone() + .with_page_index_policy(PageIndexPolicy::Optional), + ) + .await?; + + Ok(self) + } + /// Load bloom filters needed for pruning when enabled and a pruning predicate exists. async fn load_bloom_filters(mut self) -> Result { let num_row_groups = self @@ -1581,6 +1585,28 @@ pub(crate) fn build_pruning_predicates( ) } +/// Returns true if the page index must be loaded for page-level pruning. +/// +/// The page index can only prune when at least one surviving row group is not +/// fully matched by row-group statistics alone. +fn should_load_page_index( + page_pruning_predicate: Option<&Arc>, + row_groups: &RowGroupAccessPlanFilter, +) -> bool { + if page_pruning_predicate.is_none() { + return false; + } + + if row_groups.is_empty() { + return false; + } + + let is_fully_matched = row_groups.is_fully_matched(); + !row_groups + .row_group_indexes() + .all(|idx| is_fully_matched[idx]) +} + /// Returns a `ArrowReaderMetadata` with the page index loaded, loading /// it from the underlying `AsyncFileReader` if necessary. async fn load_page_index( @@ -2712,4 +2738,62 @@ mod test { "without page index all rows are returned" ); } + + #[test] + fn should_load_page_index_without_predicate() { + use crate::RowGroupAccessPlanFilter; + let row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2)); + assert!(!should_load_page_index(None, &row_groups)); + } + + #[test] + fn should_load_page_index_when_surviving_row_groups_not_fully_matched() { + use crate::RowGroupAccessPlanFilter; + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let predicate = logical2physical(&col("a").gt(lit(50i32)), &schema); + let page_predicate = build_page_pruning_predicate(&predicate, &schema); + let row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2)); + assert!(should_load_page_index(Some(&page_predicate), &row_groups)); + } + + #[tokio::test] + async fn test_page_index_skipped_when_row_groups_fully_matched() { + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + let values: Vec = (1..=100).collect(); + let batch = record_batch!(( + "a", + Int32, + values.iter().map(|v| Some(*v)).collect::>() + )) + .unwrap(); + let props = WriterProperties::builder() + .set_data_page_row_count_limit(10) + .set_write_batch_size(10) + .build(); + let schema = batch.schema(); + let data_size = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch], + Some(props), + ) + .await; + + let file = PartitionedFile::new("test.parquet".to_string(), data_size as u64); + let predicate = logical2physical(&col("a").is_not_null(), &schema); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_predicate(Arc::clone(&predicate)) + .with_enable_page_index(true) + .with_pushdown_filters(false) + .build(); + + let (_, rows) = + count_batches_and_rows(open_file(&morselizer, file).await.unwrap()).await; + assert_eq!(rows, 100); + } } From 63234231188909296653717b1d7f724c38473734 Mon Sep 17 00:00:00 2001 From: Ratul Dawar Date: Tue, 9 Jun 2026 23:18:06 +0530 Subject: [PATCH 2/3] Combine early-return conditions in should_load_page_index. Co-authored-by: Cursor --- datafusion/datasource-parquet/src/opener.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f6e3b96cca157..e1ea7a51f08e6 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1593,11 +1593,7 @@ fn should_load_page_index( page_pruning_predicate: Option<&Arc>, row_groups: &RowGroupAccessPlanFilter, ) -> bool { - if page_pruning_predicate.is_none() { - return false; - } - - if row_groups.is_empty() { + if page_pruning_predicate.is_none() || row_groups.is_empty() { return false; } From e256fc5cfb2ac3f6f41c386256f2fc7b5cca9f3c Mon Sep 17 00:00:00 2001 From: Ratul Dawar Date: Tue, 9 Jun 2026 23:29:33 +0530 Subject: [PATCH 3/3] Document when page index loading is skipped in the opener. Co-authored-by: Cursor --- datafusion/datasource-parquet/src/opener.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index e1ea7a51f08e6..a66eddcb987e1 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -392,6 +392,9 @@ impl ParquetOpenState { prepared_row_groups.load_page_index().boxed(), )) } else { + // Skip page index I/O: no page-pruning predicate, no surviving row + // groups, or every surviving row group is fully matched by row-group + // statistics alone (page index cannot prune further). Ok(ParquetOpenState::LoadBloomFilters( prepared_row_groups.load_bloom_filters().boxed(), ))