Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 123 additions & 37 deletions datafusion/datasource-parquet/src/opener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,10 @@ impl Morselizer for ParquetMorselizer {
/// PrepareFilters
/// |
/// v
/// LoadPageIndex
/// PruneWithStatistics
/// |
/// v
/// PruneWithStatistics
/// LoadPageIndex? (skipped when all surviving row groups are fully matched)
/// |
/// v
/// LoadBloomFilters
Expand Down Expand Up @@ -372,10 +372,10 @@ enum ParquetOpenState {
/// Specialize any filters for the actual file schema (only known after
/// metadata is loaded)
PrepareFilters(Box<MetadataLoadedParquetOpen>),
/// Loading [Parquet Page Index](https://parquet.apache.org/docs/file-format/pageindex/)
LoadPageIndex(BoxFuture<'static, Result<FiltersPreparedParquetOpen>>),
/// Pruning Row Groups
PruneWithStatistics(Box<FiltersPreparedParquetOpen>),
/// Loading [Parquet Page Index](https://parquet.apache.org/docs/file-format/pageindex/)
LoadPageIndex(BoxFuture<'static, Result<RowGroupsPrunedParquetOpen>>),
/// Loading bloom filters required for row-group pruning
LoadBloomFilters(BoxFuture<'static, Result<BloomFiltersLoadedParquetOpen>>),
/// Pruning with preloaded Bloom Filters
Expand Down Expand Up @@ -540,19 +540,31 @@ impl ParquetOpenState {
}
ParquetOpenState::PrepareFilters(loaded) => {
let prepared_filters = loaded.prepare_filters()?;
Ok(ParquetOpenState::LoadPageIndex(
prepared_filters.load_page_index().boxed(),
))
Ok(ParquetOpenState::PruneWithStatistics(Box::new(
prepared_filters,
)))
}
ParquetOpenState::PruneWithStatistics(prepared) => {
let prepared_row_groups = (*prepared).prune_row_groups()?;
if should_load_page_index(
prepared_row_groups.prepared.page_pruning_predicate.as_ref(),
&prepared_row_groups.row_groups,
) {
Ok(ParquetOpenState::LoadPageIndex(
prepared_row_groups.load_page_index().boxed(),
))
} else {
// Skip page index I/O: no page-pruning predicate, no surviving row
// groups, or every surviving row group is fully matched by row-group
// statistics alone (page index cannot prune further).
Ok(ParquetOpenState::LoadBloomFilters(
prepared_row_groups.load_bloom_filters().boxed(),
))
}
}
ParquetOpenState::LoadPageIndex(future) => {
Ok(ParquetOpenState::LoadPageIndex(future))
}
ParquetOpenState::PruneWithStatistics(prepared) => {
let prepared_row_groups = prepared.prune_row_groups()?;
Ok(ParquetOpenState::LoadBloomFilters(
prepared_row_groups.load_bloom_filters().boxed(),
))
}
ParquetOpenState::LoadBloomFilters(future) => {
Ok(ParquetOpenState::LoadBloomFilters(future))
}
Expand Down Expand Up @@ -666,9 +678,9 @@ impl MorselPlanner for ParquetMorselPlanner {
}
ParquetOpenState::LoadPageIndex(future) => {
Ok(Some(Self::schedule_io(async move {
Ok(ParquetOpenState::PruneWithStatistics(Box::new(
future.await?,
)))
Ok(ParquetOpenState::LoadBloomFilters(
future.await?.load_bloom_filters().boxed(),
))
})))
}
ParquetOpenState::LoadBloomFilters(future) => {
Expand Down Expand Up @@ -1047,27 +1059,6 @@ impl MetadataLoadedParquetOpen {
}

impl FiltersPreparedParquetOpen {
/// Load the page index if pruning requires it and metadata did not include it.
async fn load_page_index(mut self) -> Result<Self> {
// The page index is not stored inline in the parquet footer so the
// metadata load above may not have read the page index structures yet.
// If we need them for reading and they aren't yet loaded, we need to
// load them now.
if self.page_pruning_predicate.is_some() {
self.loaded.reader_metadata = load_page_index(
self.loaded.reader_metadata,
&mut self.loaded.prepared.async_file_reader,
self.loaded
.options
.clone()
.with_page_index_policy(PageIndexPolicy::Optional),
)
.await?;
}

Ok(self)
}

/// Prune row groups using file ranges and parquet metadata.
fn prune_row_groups(self) -> Result<RowGroupsPrunedParquetOpen> {
let loaded = &self.loaded;
Expand Down Expand Up @@ -1136,6 +1127,22 @@ impl FiltersPreparedParquetOpen {
}

impl RowGroupsPrunedParquetOpen {
/// Load the page index if pruning requires it and metadata did not include it.
async fn load_page_index(mut self) -> Result<Self> {
self.prepared.loaded.reader_metadata = load_page_index(
self.prepared.loaded.reader_metadata.clone(),
&mut self.prepared.loaded.prepared.async_file_reader,
self.prepared
.loaded
.options
.clone()
.with_page_index_policy(PageIndexPolicy::Optional),
)
.await?;

Ok(self)
}

/// Load bloom filters needed for pruning when enabled and a pruning predicate exists.
async fn load_bloom_filters(mut self) -> Result<BloomFiltersLoadedParquetOpen> {
let num_row_groups = self
Expand Down Expand Up @@ -1566,6 +1573,24 @@ pub(crate) fn build_pruning_predicates(
)
}

/// Returns true if the page index must be loaded for page-level pruning.
///
/// The page index can only prune when at least one surviving row group is not
/// fully matched by row-group statistics alone.
fn should_load_page_index(
page_pruning_predicate: Option<&Arc<PagePruningAccessPlanFilter>>,
row_groups: &RowGroupAccessPlanFilter,
) -> bool {
if page_pruning_predicate.is_none() || row_groups.is_empty() {
return false;
}

let is_fully_matched = row_groups.is_fully_matched();
!row_groups
.row_group_indexes()
.all(|idx| is_fully_matched[idx])
}

/// Returns a `ArrowReaderMetadata` with the page index loaded, loading
/// it from the underlying `AsyncFileReader` if necessary.
async fn load_page_index<T: AsyncFileReader>(
Expand Down Expand Up @@ -2808,6 +2833,67 @@ mod test {
);
}

#[test]
fn should_load_page_index_without_predicate() {
use crate::RowGroupAccessPlanFilter;
let row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
assert!(!should_load_page_index(None, &row_groups));
}

#[test]
fn should_load_page_index_when_surviving_row_groups_not_fully_matched() {
use crate::RowGroupAccessPlanFilter;
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let predicate = logical2physical(&col("a").gt(lit(50i32)), &schema);
let page_predicate = build_page_pruning_predicate(&predicate, &schema);
let row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
assert!(should_load_page_index(Some(&page_predicate), &row_groups));
}

#[tokio::test]
async fn test_page_index_skipped_when_row_groups_fully_matched() {
use parquet::file::properties::WriterProperties;

let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let values: Vec<i32> = (1..=100).collect();
let batch = record_batch!((
"a",
Int32,
values.iter().map(|v| Some(*v)).collect::<Vec<_>>()
))
.unwrap();
let props = WriterProperties::builder()
.set_data_page_row_count_limit(10)
.set_write_batch_size(10)
.build();
let schema = batch.schema();
let data_len = write_parquet_batches(
Arc::clone(&store),
"test.parquet",
vec![batch],
Some(props),
)
.await;

let file = PartitionedFile::new(
"test.parquet".to_string(),
u64::try_from(data_len).unwrap(),
);
let predicate = logical2physical(&col("a").is_not_null(), &schema);

let morselizer = ParquetMorselizerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_predicate(Arc::clone(&predicate))
.with_enable_page_index(true)
.with_pushdown_filters(false)
.build();

let (_, rows) =
count_batches_and_rows(open_file(&morselizer, file).await.unwrap()).await;
assert_eq!(rows, 100);
}

async fn fully_matched_split_test_file(
store: Arc<dyn ObjectStore>,
) -> (SchemaRef, PartitionedFile) {
Expand Down
Loading