diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index 5b517663f9c03..1901fda2a55d6 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -334,10 +334,10 @@ impl Morselizer for ParquetMorselizer { /// PrepareFilters /// | /// v -/// LoadPageIndex +/// PruneWithStatistics /// | /// v -/// PruneWithStatistics +/// LoadPageIndex? (skipped when all surviving row groups are fully matched) /// | /// v /// LoadBloomFilters @@ -372,10 +372,10 @@ enum ParquetOpenState { /// Specialize any filters for the actual file schema (only known after /// metadata is loaded) PrepareFilters(Box), - /// Loading [Parquet Page Index](https://parquet.apache.org/docs/file-format/pageindex/) - LoadPageIndex(BoxFuture<'static, Result>), /// Pruning Row Groups PruneWithStatistics(Box), + /// Loading [Parquet Page Index](https://parquet.apache.org/docs/file-format/pageindex/) + LoadPageIndex(BoxFuture<'static, Result>), /// Loading bloom filters required for row-group pruning LoadBloomFilters(BoxFuture<'static, Result>), /// Pruning with preloaded Bloom Filters @@ -540,19 +540,31 @@ impl ParquetOpenState { } ParquetOpenState::PrepareFilters(loaded) => { let prepared_filters = loaded.prepare_filters()?; - Ok(ParquetOpenState::LoadPageIndex( - prepared_filters.load_page_index().boxed(), - )) + Ok(ParquetOpenState::PruneWithStatistics(Box::new( + prepared_filters, + ))) + } + ParquetOpenState::PruneWithStatistics(prepared) => { + let prepared_row_groups = (*prepared).prune_row_groups()?; + if should_load_page_index( + prepared_row_groups.prepared.page_pruning_predicate.as_ref(), + &prepared_row_groups.row_groups, + ) { + Ok(ParquetOpenState::LoadPageIndex( + prepared_row_groups.load_page_index().boxed(), + )) + } else { + // Skip page index I/O: no page-pruning predicate, no surviving row + // groups, or every surviving row group is fully matched by row-group + // statistics alone (page index cannot prune further). + Ok(ParquetOpenState::LoadBloomFilters( + prepared_row_groups.load_bloom_filters().boxed(), + )) + } } ParquetOpenState::LoadPageIndex(future) => { Ok(ParquetOpenState::LoadPageIndex(future)) } - ParquetOpenState::PruneWithStatistics(prepared) => { - let prepared_row_groups = prepared.prune_row_groups()?; - Ok(ParquetOpenState::LoadBloomFilters( - prepared_row_groups.load_bloom_filters().boxed(), - )) - } ParquetOpenState::LoadBloomFilters(future) => { Ok(ParquetOpenState::LoadBloomFilters(future)) } @@ -666,9 +678,9 @@ impl MorselPlanner for ParquetMorselPlanner { } ParquetOpenState::LoadPageIndex(future) => { Ok(Some(Self::schedule_io(async move { - Ok(ParquetOpenState::PruneWithStatistics(Box::new( - future.await?, - ))) + Ok(ParquetOpenState::LoadBloomFilters( + future.await?.load_bloom_filters().boxed(), + )) }))) } ParquetOpenState::LoadBloomFilters(future) => { @@ -1047,27 +1059,6 @@ impl MetadataLoadedParquetOpen { } impl FiltersPreparedParquetOpen { - /// Load the page index if pruning requires it and metadata did not include it. - async fn load_page_index(mut self) -> Result { - // The page index is not stored inline in the parquet footer so the - // metadata load above may not have read the page index structures yet. - // If we need them for reading and they aren't yet loaded, we need to - // load them now. - if self.page_pruning_predicate.is_some() { - self.loaded.reader_metadata = load_page_index( - self.loaded.reader_metadata, - &mut self.loaded.prepared.async_file_reader, - self.loaded - .options - .clone() - .with_page_index_policy(PageIndexPolicy::Optional), - ) - .await?; - } - - Ok(self) - } - /// Prune row groups using file ranges and parquet metadata. fn prune_row_groups(self) -> Result { let loaded = &self.loaded; @@ -1136,6 +1127,22 @@ impl FiltersPreparedParquetOpen { } impl RowGroupsPrunedParquetOpen { + /// Load the page index if pruning requires it and metadata did not include it. + async fn load_page_index(mut self) -> Result { + self.prepared.loaded.reader_metadata = load_page_index( + self.prepared.loaded.reader_metadata.clone(), + &mut self.prepared.loaded.prepared.async_file_reader, + self.prepared + .loaded + .options + .clone() + .with_page_index_policy(PageIndexPolicy::Optional), + ) + .await?; + + Ok(self) + } + /// Load bloom filters needed for pruning when enabled and a pruning predicate exists. async fn load_bloom_filters(mut self) -> Result { let num_row_groups = self @@ -1566,6 +1573,24 @@ pub(crate) fn build_pruning_predicates( ) } +/// Returns true if the page index must be loaded for page-level pruning. +/// +/// The page index can only prune when at least one surviving row group is not +/// fully matched by row-group statistics alone. +fn should_load_page_index( + page_pruning_predicate: Option<&Arc>, + row_groups: &RowGroupAccessPlanFilter, +) -> bool { + if page_pruning_predicate.is_none() || row_groups.is_empty() { + return false; + } + + let is_fully_matched = row_groups.is_fully_matched(); + !row_groups + .row_group_indexes() + .all(|idx| is_fully_matched[idx]) +} + /// Returns a `ArrowReaderMetadata` with the page index loaded, loading /// it from the underlying `AsyncFileReader` if necessary. async fn load_page_index( @@ -2808,6 +2833,67 @@ mod test { ); } + #[test] + fn should_load_page_index_without_predicate() { + use crate::RowGroupAccessPlanFilter; + let row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2)); + assert!(!should_load_page_index(None, &row_groups)); + } + + #[test] + fn should_load_page_index_when_surviving_row_groups_not_fully_matched() { + use crate::RowGroupAccessPlanFilter; + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let predicate = logical2physical(&col("a").gt(lit(50i32)), &schema); + let page_predicate = build_page_pruning_predicate(&predicate, &schema); + let row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2)); + assert!(should_load_page_index(Some(&page_predicate), &row_groups)); + } + + #[tokio::test] + async fn test_page_index_skipped_when_row_groups_fully_matched() { + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + let values: Vec = (1..=100).collect(); + let batch = record_batch!(( + "a", + Int32, + values.iter().map(|v| Some(*v)).collect::>() + )) + .unwrap(); + let props = WriterProperties::builder() + .set_data_page_row_count_limit(10) + .set_write_batch_size(10) + .build(); + let schema = batch.schema(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch], + Some(props), + ) + .await; + + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + let predicate = logical2physical(&col("a").is_not_null(), &schema); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_predicate(Arc::clone(&predicate)) + .with_enable_page_index(true) + .with_pushdown_filters(false) + .build(); + + let (_, rows) = + count_batches_and_rows(open_file(&morselizer, file).await.unwrap()).await; + assert_eq!(rows, 100); + } + async fn fully_matched_split_test_file( store: Arc, ) -> (SchemaRef, PartitionedFile) {