diff --git a/Cargo.lock b/Cargo.lock index 895b3059f50c1..22401b2ec675b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2560,10 +2560,12 @@ name = "datafusion-pruning" version = "53.0.0" dependencies = [ "arrow", + "async-trait", "datafusion-common", "datafusion-datasource", "datafusion-expr", "datafusion-expr-common", + "datafusion-functions-aggregate", "datafusion-functions-nested", "datafusion-physical-expr", "datafusion-physical-expr-common", @@ -2571,6 +2573,7 @@ dependencies = [ "insta", "itertools 0.14.0", "log", + "tokio", ] [[package]] diff --git a/datafusion-examples/examples/data_io/parquet_index.rs b/datafusion-examples/examples/data_io/parquet_index.rs index e11a303f442a4..515dad7a51e17 100644 --- a/datafusion-examples/examples/data_io/parquet_index.rs +++ b/datafusion-examples/examples/data_io/parquet_index.rs @@ -462,7 +462,7 @@ impl PruningStatistics for ParquetMetadataIndex { } /// return the row counts for each file - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { Some(self.row_counts_ref().clone()) } diff --git a/datafusion-examples/examples/query_planning/pruning.rs b/datafusion-examples/examples/query_planning/pruning.rs index 33f3f8428a77f..7fdc4a7952d68 100644 --- a/datafusion-examples/examples/query_planning/pruning.rs +++ b/datafusion-examples/examples/query_planning/pruning.rs @@ -174,7 +174,7 @@ impl PruningStatistics for MyCatalog { None } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { // In this example, we know nothing about the number of rows in each file None } diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 27148de59a544..ebae23f0723a1 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -95,15 +95,17 @@ pub trait PruningStatistics { /// [`UInt64Array`]: arrow::array::UInt64Array fn null_counts(&self, column: &Column) -> Option; - /// Return the number of rows for the named column in each container - /// as an [`UInt64Array`]. + /// Return the number of rows in each container as an [`UInt64Array`]. + /// + /// Row counts are container-level (not column-specific) — the value + /// is the same regardless of which column is being considered. /// /// See [`Self::min_values`] for when to return `None` and null values. /// /// Note: the returned array must contain [`Self::num_containers`] rows /// /// [`UInt64Array`]: arrow::array::UInt64Array - fn row_counts(&self, column: &Column) -> Option; + fn row_counts(&self) -> Option; /// Returns [`BooleanArray`] where each row represents information known /// about specific literal `values` in a column. @@ -265,7 +267,7 @@ impl PruningStatistics for PartitionPruningStatistics { None } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { None } @@ -398,11 +400,7 @@ impl PruningStatistics for PrunableStatistics { } } - fn row_counts(&self, column: &Column) -> Option { - // If the column does not exist in the schema, return None - if self.schema.index_of(column.name()).is_err() { - return None; - } + fn row_counts(&self) -> Option { if self .statistics .iter() @@ -502,9 +500,9 @@ impl PruningStatistics for CompositePruningStatistics { None } - fn row_counts(&self, column: &Column) -> Option { + fn row_counts(&self) -> Option { for stats in &self.statistics { - if let Some(array) = stats.row_counts(column) { + if let Some(array) = stats.row_counts() { return Some(array); } } @@ -566,9 +564,9 @@ mod tests { // Partition values don't know anything about nulls or row counts assert!(partition_stats.null_counts(&column_a).is_none()); - assert!(partition_stats.row_counts(&column_a).is_none()); + assert!(partition_stats.row_counts().is_none()); assert!(partition_stats.null_counts(&column_b).is_none()); - assert!(partition_stats.row_counts(&column_b).is_none()); + assert!(partition_stats.row_counts().is_none()); // Min/max values are the same as the partition values let min_values_a = @@ -709,9 +707,9 @@ mod tests { // Partition values don't know anything about nulls or row counts assert!(partition_stats.null_counts(&column_a).is_none()); - assert!(partition_stats.row_counts(&column_a).is_none()); + assert!(partition_stats.row_counts().is_none()); assert!(partition_stats.null_counts(&column_b).is_none()); - assert!(partition_stats.row_counts(&column_b).is_none()); + assert!(partition_stats.row_counts().is_none()); // Min/max values are all missing assert!(partition_stats.min_values(&column_a).is_none()); @@ -814,13 +812,13 @@ mod tests { assert_eq!(null_counts_b, expected_null_counts_b); // Row counts are the same as the statistics - let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap()) + let row_counts_a = as_uint64_array(&pruning_stats.row_counts().unwrap()) .unwrap() .into_iter() .collect::>(); let expected_row_counts_a = vec![Some(100), Some(200)]; assert_eq!(row_counts_a, expected_row_counts_a); - let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap()) + let row_counts_b = as_uint64_array(&pruning_stats.row_counts().unwrap()) .unwrap() .into_iter() .collect::>(); @@ -845,7 +843,7 @@ mod tests { // This is debatable, personally I think `row_count` should not take a `Column` as an argument // at all since all columns should have the same number of rows. // But for now we just document the current behavior in this test. - let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap()) + let row_counts_c = as_uint64_array(&pruning_stats.row_counts().unwrap()) .unwrap() .into_iter() .collect::>(); @@ -853,12 +851,13 @@ mod tests { assert_eq!(row_counts_c, expected_row_counts_c); assert!(pruning_stats.contained(&column_c, &values).is_none()); - // Test with a column that doesn't exist + // Test with a column that doesn't exist — column-specific stats + // return None, but row_counts is container-level and still available let column_d = Column::new_unqualified("d"); assert!(pruning_stats.min_values(&column_d).is_none()); assert!(pruning_stats.max_values(&column_d).is_none()); assert!(pruning_stats.null_counts(&column_d).is_none()); - assert!(pruning_stats.row_counts(&column_d).is_none()); + assert!(pruning_stats.row_counts().is_some()); assert!(pruning_stats.contained(&column_d, &values).is_none()); } @@ -886,8 +885,8 @@ mod tests { assert!(pruning_stats.null_counts(&column_b).is_none()); // Row counts are all missing - assert!(pruning_stats.row_counts(&column_a).is_none()); - assert!(pruning_stats.row_counts(&column_b).is_none()); + assert!(pruning_stats.row_counts().is_none()); + assert!(pruning_stats.row_counts().is_none()); // Contained values are all empty let values = HashSet::from([ScalarValue::from(1i32)]); @@ -1027,13 +1026,11 @@ mod tests { let expected_null_counts_col_x = vec![Some(0), Some(10)]; assert_eq!(null_counts_col_x, expected_null_counts_col_x); - // Test row counts - only available from file statistics - assert!(composite_stats.row_counts(&part_a).is_none()); - let row_counts_col_x = - as_uint64_array(&composite_stats.row_counts(&col_x).unwrap()) - .unwrap() - .into_iter() - .collect::>(); + // Test row counts — container-level, available from file statistics + let row_counts_col_x = as_uint64_array(&composite_stats.row_counts().unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_row_counts = vec![Some(100), Some(200)]; assert_eq!(row_counts_col_x, expected_row_counts); @@ -1046,12 +1043,13 @@ mod tests { // File statistics don't implement contained assert!(composite_stats.contained(&col_x, &values).is_none()); - // Non-existent column should return None for everything + // Non-existent column should return None for column-specific stats, + // but row_counts is container-level and still available let non_existent = Column::new_unqualified("non_existent"); assert!(composite_stats.min_values(&non_existent).is_none()); assert!(composite_stats.max_values(&non_existent).is_none()); assert!(composite_stats.null_counts(&non_existent).is_none()); - assert!(composite_stats.row_counts(&non_existent).is_none()); + assert!(composite_stats.row_counts().is_some()); assert!(composite_stats.contained(&non_existent, &values).is_none()); // Verify num_containers matches @@ -1155,7 +1153,7 @@ mod tests { let expected_null_counts = vec![Some(0), Some(5)]; assert_eq!(null_counts, expected_null_counts); - let row_counts = as_uint64_array(&composite_stats.row_counts(&col_a).unwrap()) + let row_counts = as_uint64_array(&composite_stats.row_counts().unwrap()) .unwrap() .into_iter() .collect::>(); @@ -1195,11 +1193,10 @@ mod tests { let expected_null_counts = vec![Some(10), Some(20)]; assert_eq!(null_counts, expected_null_counts); - let row_counts = - as_uint64_array(&composite_stats_reversed.row_counts(&col_a).unwrap()) - .unwrap() - .into_iter() - .collect::>(); + let row_counts = as_uint64_array(&composite_stats_reversed.row_counts().unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_row_counts = vec![Some(1000), Some(2000)]; assert_eq!(row_counts, expected_row_counts); } diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index a212122401f98..02f1f79ef8135 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -27,8 +27,8 @@ use arrow::datatypes::{DataType, Schema}; /// Represents a value with a degree of certainty. `Precision` is used to /// propagate information the precision of statistical values. -#[derive(Clone, PartialEq, Eq, Default, Copy)] -pub enum Precision { +#[derive(Clone, Default, Copy)] +pub enum Precision { /// The exact value is known. Used for guaranteeing correctness. /// /// Comes from definitive sources such as: @@ -60,7 +60,7 @@ pub enum Precision { Absent, } -impl Precision { +impl Precision { /// If we have some value (exact or inexact), it returns that value. /// Otherwise, it returns `None`. pub fn get_value(&self) -> Option<&T> { @@ -75,7 +75,7 @@ impl Precision { pub fn map(self, f: F) -> Precision where F: Fn(T) -> U, - U: Debug + Clone + PartialEq + Eq + PartialOrd, + U: Debug + Clone, { match self { Precision::Exact(val) => Precision::Exact(f(val)), @@ -94,6 +94,16 @@ impl Precision { } } + /// Demotes the precision state from exact to inexact (if present). + pub fn to_inexact(self) -> Self { + match self { + Precision::Exact(value) => Precision::Inexact(value), + _ => self, + } + } +} + +impl Precision { /// Returns the maximum of two (possibly inexact) values, conservatively /// propagating exactness information. If one of the input values is /// [`Precision::Absent`], the result is `Absent` too. @@ -127,14 +137,6 @@ impl Precision { (_, _) => Precision::Absent, } } - - /// Demotes the precision state from exact to inexact (if present). - pub fn to_inexact(self) -> Self { - match self { - Precision::Exact(value) => Precision::Inexact(value), - _ => self, - } - } } impl Precision { @@ -318,7 +320,23 @@ impl Precision { } } -impl Debug for Precision { +impl PartialEq for Precision +where + T: PartialEq, +{ + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Precision::Exact(a), Precision::Exact(b)) => a == b, + (Precision::Inexact(a), Precision::Inexact(b)) => a == b, + (Precision::Absent, Precision::Absent) => true, + _ => false, + } + } +} + +impl Eq for Precision {} + +impl Debug for Precision { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Precision::Exact(inner) => write!(f, "Exact({inner:?})"), @@ -328,7 +346,7 @@ impl Debug for Precision { } } -impl Display for Precision { +impl Display for Precision { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Precision::Exact(inner) => write!(f, "Exact({inner:?})"), diff --git a/datafusion/datasource-parquet/src/page_filter.rs b/datafusion/datasource-parquet/src/page_filter.rs index 194e6e94fba3a..baef36ce147d4 100644 --- a/datafusion/datasource-parquet/src/page_filter.rs +++ b/datafusion/datasource-parquet/src/page_filter.rs @@ -509,7 +509,7 @@ impl PruningStatistics for PagesPruningStatistics<'_> { } } - fn row_counts(&self, _column: &datafusion_common::Column) -> Option { + fn row_counts(&self) -> Option { match self.converter.data_page_row_counts( self.offset_index, self.row_group_metadatas, diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 7a2ed8f2777e3..3f254c9f55282 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use super::{ParquetAccessPlan, ParquetFileMetrics}; -use arrow::array::{ArrayRef, BooleanArray}; +use arrow::array::{ArrayRef, BooleanArray, UInt64Array}; use arrow::datatypes::Schema; use datafusion_common::pruning::PruningStatistics; use datafusion_common::{Column, Result, ScalarValue}; @@ -536,7 +536,7 @@ impl PruningStatistics for BloomFilterStatistics { None } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { None } @@ -626,13 +626,13 @@ impl PruningStatistics for RowGroupPruningStatistics<'_> { .map(|counts| Arc::new(counts) as ArrayRef) } - fn row_counts(&self, column: &Column) -> Option { - // row counts are the same for all columns in a row group - self.statistics_converter(column) - .and_then(|c| Ok(c.row_group_row_counts(self.metadata_iter())?)) - .ok() - .flatten() - .map(|counts| Arc::new(counts) as ArrayRef) + fn row_counts(&self) -> Option { + // Row counts are container-level — read directly from row group metadata. + let counts: UInt64Array = self + .metadata_iter() + .map(|rg| Some(rg.num_rows() as u64)) + .collect(); + Some(Arc::new(counts) as ArrayRef) } fn contained( diff --git a/datafusion/pruning/Cargo.toml b/datafusion/pruning/Cargo.toml index e6f4bb6f273c9..9bf2cc8321320 100644 --- a/datafusion/pruning/Cargo.toml +++ b/datafusion/pruning/Cargo.toml @@ -17,9 +17,13 @@ workspace = true [dependencies] arrow = { workspace = true } +async-trait = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-datasource = { workspace = true } +datafusion-expr = { workspace = true, default-features = true } datafusion-expr-common = { workspace = true, default-features = true } +datafusion-functions-aggregate = { workspace = true, default-features = true } +datafusion-functions-nested = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } @@ -30,3 +34,4 @@ datafusion-expr = { workspace = true } datafusion-functions-nested = { workspace = true } insta = { workspace = true } itertools = { workspace = true } +tokio = { workspace = true } diff --git a/datafusion/pruning/src/lib.rs b/datafusion/pruning/src/lib.rs index be17f29eaafa0..b7f82e72d30f1 100644 --- a/datafusion/pruning/src/lib.rs +++ b/datafusion/pruning/src/lib.rs @@ -19,9 +19,16 @@ mod file_pruner; mod pruning_predicate; +mod statistics; pub use file_pruner::FilePruner; pub use pruning_predicate::{ PredicateRewriter, PruningPredicate, PruningStatistics, RequiredColumns, UnhandledPredicateHook, build_pruning_predicate, }; +pub use statistics::{ + ResolvedStatistics, StatisticsSource, stat_column_distinct_count, + stat_column_distinct_count_with_filter, stat_column_max, stat_column_max_with_filter, + stat_column_min, stat_column_min_with_filter, stat_column_null_count, + stat_column_null_count_with_filter, stat_row_count, stat_row_count_with_filter, +}; diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 978d79b1f2fb0..d541d6d8b7419 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use arrow::array::AsArray; use arrow::{ - array::{ArrayRef, BooleanArray, new_null_array}, + array::{ArrayRef, BooleanArray}, datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, }; @@ -521,54 +521,99 @@ impl PruningPredicate { &self, statistics: &S, ) -> Result> { - let mut builder = BoolVecBuilder::new(statistics.num_containers()); + let resolved = crate::statistics::resolve_all_sync( + statistics, + &self.all_required_expressions(), + ); + self.evaluate(&resolved) + } + + /// Evaluates the pruning predicate against pre-resolved statistics. + /// + /// This is the sync evaluation phase of the two-phase + /// resolve-then-evaluate pattern. Statistics are resolved ahead of + /// time (possibly asynchronously via [`StatisticsSource`]) into a + /// [`ResolvedStatistics`] cache, then this method evaluates the + /// predicate against that cache synchronously. + /// + /// Returns the same `true`/`false` semantics as [`Self::prune`]: + /// - `true`: There MAY be rows that match the predicate + /// - `false`: There are no rows that could possibly match + /// + /// Missing entries in `resolved` are treated as unknown (null arrays), + /// which is conservative — the container will not be pruned. + /// + /// [`StatisticsSource`]: crate::StatisticsSource + /// [`ResolvedStatistics`]: crate::ResolvedStatistics + pub fn evaluate( + &self, + resolved: &crate::statistics::ResolvedStatistics, + ) -> Result> { + let mut builder = BoolVecBuilder::new(resolved.num_containers()); - // Try to prove the predicate can't be true for the containers based on - // literal guarantees + // Phase 1: Literal guarantees (InList lookups) for literal_guarantee in &self.literal_guarantees { let LiteralGuarantee { column, guarantee, literals, } = literal_guarantee; - if let Some(results) = statistics.contained(column, literals) { - match guarantee { - // `In` means the values in the column must be one of the - // values in the set for the predicate to evaluate to true. - // If `contained` returns false, that means the column is - // not any of the values so we can prune the container - Guarantee::In => builder.combine_array(&results), - // `NotIn` means the values in the column must not be - // any of the values in the set for the predicate to - // evaluate to true. If `contained` returns true, it means the - // column is only in the set of values so we can prune the - // container - Guarantee::NotIn => { - builder.combine_array(&arrow::compute::not(&results)?) - } - } - // if all containers are pruned (has rows that DEFINITELY DO NOT pass the predicate) - // can return early without evaluating the rest of predicates. + + // Build the InList Expr that corresponds to this guarantee + let in_list_expr = literal_guarantee_to_in_list( + column, + literals, + matches!(guarantee, Guarantee::NotIn), + ); + + if let Some(array) = resolved.get(&in_list_expr) + && let Some(bool_arr) = array.as_any().downcast_ref::() + { + builder.combine_array(bool_arr); if builder.check_all_pruned() { return Ok(builder.build()); } } } - // Next, try to prove the predicate can't be true for the containers based - // on min/max values - - // build a RecordBatch that contains the min/max values in the - // appropriate statistics columns for the min/max predicate - let statistics_batch = - build_statistics_record_batch(statistics, &self.required_columns)?; - - // Evaluate the pruning predicate on that record batch and append any results to the builder + // Phase 2: Min/max/null_count/row_count predicate + let statistics_batch = build_statistics_record_batch_from_resolved( + resolved, + &self.required_columns, + )?; builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?); Ok(builder.build()) } + /// Returns all expressions needed to fully evaluate this predicate, + /// including both aggregate statistics and literal guarantee InLists. + /// + /// Pass these to [`StatisticsSource::expression_statistics`] or + /// [`ResolvedStatistics::resolve`] to pre-fetch the needed data. + /// + /// [`StatisticsSource::expression_statistics`]: crate::StatisticsSource::expression_statistics + /// [`ResolvedStatistics::resolve`]: crate::ResolvedStatistics::resolve + pub fn all_required_expressions(&self) -> Vec { + let mut exprs = Vec::new(); + + // Aggregate stats from RequiredColumns + for (column, statistics_type, _field) in self.required_columns.iter() { + exprs.push(stat_type_to_expr(column, *statistics_type)); + } + + // Literal guarantee InList expressions + for lg in &self.literal_guarantees { + exprs.push(literal_guarantee_to_in_list( + &lg.column, + &lg.literals, + matches!(lg.guarantee, Guarantee::NotIn), + )); + } + + exprs + } + /// Return a reference to the input schema pub fn schema(&self) -> &SchemaRef { &self.schema @@ -913,10 +958,12 @@ impl From> for RequiredColumns { /// -------+-------- /// 5 | 1000 /// ``` +#[cfg(test)] fn build_statistics_record_batch( statistics: &S, required_columns: &RequiredColumns, ) -> Result { + use arrow::array::new_null_array; let mut arrays = Vec::::new(); // For each needed statistics column: for (column, statistics_type, stat_field) in required_columns.iter() { @@ -929,7 +976,7 @@ fn build_statistics_record_batch( StatisticsType::Min => statistics.min_values(&column), StatisticsType::Max => statistics.max_values(&column), StatisticsType::NullCount => statistics.null_counts(&column), - StatisticsType::RowCount => statistics.row_counts(&column), + StatisticsType::RowCount => statistics.row_counts(), }; let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers)); @@ -1943,6 +1990,79 @@ fn wrap_null_count_check_expr( ))) } +/// Convert a [`StatisticsType`] + column into the corresponding logical expression. +fn stat_type_to_expr( + column: &phys_expr::Column, + stat_type: StatisticsType, +) -> datafusion_expr::Expr { + use crate::statistics::{ + stat_column_max, stat_column_min, stat_column_null_count, stat_row_count, + }; + match stat_type { + StatisticsType::Min => stat_column_min(column.name()), + StatisticsType::Max => stat_column_max(column.name()), + StatisticsType::NullCount => stat_column_null_count(column.name()), + StatisticsType::RowCount => stat_row_count(), + } +} + +/// Convert a [`LiteralGuarantee`] into an `Expr::InList`. +fn literal_guarantee_to_in_list( + column: &Column, + literals: &HashSet, + negated: bool, +) -> datafusion_expr::Expr { + datafusion_expr::Expr::InList(datafusion_expr::expr::InList::new( + Box::new(datafusion_expr::Expr::Column(column.clone())), + literals + .iter() + .map(|s| datafusion_expr::Expr::Literal(s.clone(), None)) + .collect(), + negated, + )) +} + +/// Build a statistics [`RecordBatch`] from a [`crate::ResolvedStatistics`] cache, +/// looking up each required column's expression and falling back to null +/// arrays for missing entries. +fn build_statistics_record_batch_from_resolved( + resolved: &crate::statistics::ResolvedStatistics, + required_columns: &RequiredColumns, +) -> Result { + let mut arrays = Vec::::new(); + let num_containers = resolved.num_containers(); + + for (column, statistics_type, stat_field) in required_columns.iter() { + let data_type = stat_field.data_type(); + let stat_expr = stat_type_to_expr(column, *statistics_type); + + let array = resolved.get_or_null(&stat_expr, data_type); + + assert_eq_or_internal_err!( + num_containers, + array.len(), + "mismatched statistics length. Expected {}, got {}", + num_containers, + array.len() + ); + + let array = arrow::compute::cast(&array, data_type)?; + arrays.push(array); + } + + let schema = Arc::new(required_columns.schema()); + let mut options = RecordBatchOptions::default(); + options.row_count = Some(num_containers); + + trace!( + "Creating statistics batch from resolved for {required_columns:#?} with {arrays:#?}" + ); + + RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| { + plan_datafusion_err!("Can not create statistics record batch: {err}") + }) +} + #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum StatisticsType { Min, @@ -2300,11 +2420,10 @@ mod tests { .unwrap_or(None) } - fn row_counts(&self, column: &Column) -> Option { + fn row_counts(&self) -> Option { self.stats - .get(column) - .map(|container_stats| container_stats.row_counts()) - .unwrap_or(None) + .values() + .find_map(|container_stats| container_stats.row_counts()) } fn contained( @@ -2342,7 +2461,7 @@ mod tests { None } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { None } @@ -5443,4 +5562,113 @@ mod tests { "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1"; assert_eq!(res.to_string(), expected); } + + /// Test that evaluate() produces the same results as prune() for basic predicates + #[test] + fn test_evaluate_matches_prune() { + // i > 5 with 3 containers + let schema = Schema::new(vec![Field::new("i", DataType::Int32, true)]); + let statistics = TestStatistics::new().with( + "i", + ContainerStats::new_i32( + vec![Some(1), Some(6), Some(3)], // min + vec![Some(4), Some(10), Some(8)], // max + ), + ); + + let expr = col("i").gt(lit(5i32)); + let p = + PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)) + .unwrap(); + + let prune_result = p.prune(&statistics).unwrap(); + let resolved = crate::statistics::resolve_all_sync( + &statistics, + &p.all_required_expressions(), + ); + let evaluate_result = p.evaluate(&resolved).unwrap(); + + assert_eq!(prune_result, evaluate_result); + // Container 0: max=4, 4 > 5 is false → prune + // Container 1: max=10, 10 > 5 is true → keep + // Container 2: max=8, 8 > 5 is true → keep + assert_eq!(evaluate_result, vec![false, true, true]); + } + + /// Test evaluate with null counts and row counts + #[test] + fn test_evaluate_with_null_counts() { + let schema = Schema::new(vec![Field::new("i", DataType::Int32, true)]); + let statistics = TestStatistics::new().with( + "i", + ContainerStats::new_i32(vec![Some(0), Some(0)], vec![Some(0), Some(0)]) + .with_null_counts(vec![Some(10), Some(0)]) + .with_row_counts(vec![Some(10), Some(10)]), + ); + + // i = 0: first container is all nulls, should be pruned + let expr = col("i").eq(lit(0i32)); + let p = + PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)) + .unwrap(); + + let prune_result = p.prune(&statistics).unwrap(); + let resolved = crate::statistics::resolve_all_sync( + &statistics, + &p.all_required_expressions(), + ); + let evaluate_result = p.evaluate(&resolved).unwrap(); + + assert_eq!(prune_result, evaluate_result); + } + + /// Test evaluate with missing cache entries (should produce null → conservative keep) + #[test] + fn test_evaluate_missing_cache_entries() { + let schema = Schema::new(vec![Field::new("i", DataType::Int32, true)]); + let _statistics = TestStatistics::new().with( + "i", + ContainerStats::new_i32(vec![Some(1), Some(6)], vec![Some(4), Some(10)]), + ); + + let expr = col("i").gt(lit(5i32)); + let p = + PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)) + .unwrap(); + + // Empty resolved stats — everything should be kept (conservative) + let resolved = crate::statistics::ResolvedStatistics::new_empty(2); + let evaluate_result = p.evaluate(&resolved).unwrap(); + assert_eq!(evaluate_result, vec![true, true]); + } + + /// Test that all_required_expressions() generates the right Expr types + #[test] + fn test_all_required_expressions() { + let schema = Schema::new(vec![Field::new("i", DataType::Int32, true)]); + let expr = col("i").eq(lit(5i32)); + let p = + PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)) + .unwrap(); + + let exprs = p.all_required_expressions(); + // i = 5 requires: min(i), max(i), count(*) filter (where i is null), + // count(*) filter (where i is not null) + assert!( + exprs.len() >= 2, + "Expected at least min and max, got {}", + exprs.len() + ); + + // Check that we have min and max expressions + let expr_strings: Vec = exprs.iter().map(|e| e.to_string()).collect(); + assert!( + expr_strings.iter().any(|s| s.contains("min")), + "Expected min expr in {expr_strings:?}" + ); + assert!( + expr_strings.iter().any(|s| s.contains("max")), + "Expected max expr in {expr_strings:?}" + ); + } } diff --git a/datafusion/pruning/src/statistics.rs b/datafusion/pruning/src/statistics.rs new file mode 100644 index 0000000000000..315eae872284b --- /dev/null +++ b/datafusion/pruning/src/statistics.rs @@ -0,0 +1,767 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, new_null_array}; +use datafusion_common::pruning::PruningStatistics; +use datafusion_common::{Column, ScalarValue}; +use datafusion_expr::{Expr, ExprFunctionExt}; +use std::collections::{HashMap, HashSet}; + +use datafusion_common::error::DataFusionError; + +// ── Well-known statistics expression helpers ──────────────────────────── +// +// These helpers produce the canonical [`Expr`] forms that +// [`StatisticsSource`] implementations should recognise when they +// receive expressions from [`PruningPredicate::all_required_expressions`]. +// +// [`PruningPredicate::all_required_expressions`]: crate::PruningPredicate::all_required_expressions + +/// Create a `min(column)` statistics expression. +/// +/// Returns an [`Expr`] requesting the minimum value of `column` in each +/// container (row group, file, partition, etc.). +/// +/// ```text +/// min(column) +/// ``` +/// +/// See also [`stat_column_min_with_filter`] for a filtered variant. +pub fn stat_column_min(column: &str) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr]) +} + +/// Create a `min(column) FILTER (WHERE …)` statistics expression. +/// +/// Returns an [`Expr`] requesting the minimum value of `column` in each +/// container, considering only the rows that satisfy `filter`. +/// +/// ```text +/// min(column) FILTER (WHERE filter) +/// ``` +/// +/// This is useful when a data source tracks per-partition or per-segment +/// statistics and the caller wants to scope the min to a subset of rows. +pub fn stat_column_min_with_filter(column: &str, filter: Expr) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + datafusion_functions_aggregate::min_max::min_udaf() + .call(vec![col_expr]) + .filter(filter) + .build() + .expect("building filtered min stat expr") +} + +/// Create a `max(column)` statistics expression. +/// +/// Returns an [`Expr`] requesting the maximum value of `column` in each +/// container (row group, file, partition, etc.). +/// +/// ```text +/// max(column) +/// ``` +/// +/// See also [`stat_column_max_with_filter`] for a filtered variant. +pub fn stat_column_max(column: &str) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + datafusion_functions_aggregate::min_max::max_udaf().call(vec![col_expr]) +} + +/// Create a `max(column) FILTER (WHERE …)` statistics expression. +/// +/// Returns an [`Expr`] requesting the maximum value of `column` in each +/// container, considering only the rows that satisfy `filter`. +/// +/// ```text +/// max(column) FILTER (WHERE filter) +/// ``` +/// +/// This is useful when a data source tracks per-partition or per-segment +/// statistics and the caller wants to scope the max to a subset of rows. +pub fn stat_column_max_with_filter(column: &str, filter: Expr) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + datafusion_functions_aggregate::min_max::max_udaf() + .call(vec![col_expr]) + .filter(filter) + .build() + .expect("building filtered max stat expr") +} + +/// Create a `count(*) FILTER (WHERE column IS NULL)` statistics expression. +/// +/// Returns an [`Expr`] requesting the number of NULL values in `column` +/// in each container. +/// +/// ```text +/// count(*) FILTER (WHERE column IS NULL) +/// ``` +/// +/// See also [`stat_column_null_count_with_filter`] for combining the +/// null check with an additional predicate. +pub fn stat_column_null_count(column: &str) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + count_star() + .filter(Expr::IsNull(Box::new(col_expr))) + .build() + .expect("building null count stat expr") +} + +/// Create a `count(*) FILTER (WHERE column IS NULL AND …)` statistics +/// expression. +/// +/// Returns an [`Expr`] requesting the number of NULL values in `column` +/// in each container, restricted to rows that also satisfy `filter`. +/// The resulting filter is the conjunction of `column IS NULL` and the +/// provided predicate. +/// +/// ```text +/// count(*) FILTER (WHERE column IS NULL AND filter) +/// ``` +pub fn stat_column_null_count_with_filter(column: &str, filter: Expr) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + let combined = Expr::IsNull(Box::new(col_expr)).and(filter); + count_star() + .filter(combined) + .build() + .expect("building filtered null count stat expr") +} + +/// Create a `count(*)` statistics expression for total row count. +/// +/// Returns an [`Expr`] requesting the total number of rows in each +/// container, regardless of null values. Unlike the column-specific +/// helpers, this expression is not tied to any particular column. +/// +/// ```text +/// count(*) +/// ``` +/// +/// See also [`stat_row_count_with_filter`] for a filtered variant. +pub fn stat_row_count() -> Expr { + count_star() +} + +/// Create a `count(*) FILTER (WHERE …)` statistics expression. +/// +/// Returns an [`Expr`] requesting the number of rows in each container +/// that satisfy `filter`. This can represent, for example, the number +/// of rows in a particular partition or the number of rows matching a +/// deletion vector. +/// +/// ```text +/// count(*) FILTER (WHERE filter) +/// ``` +pub fn stat_row_count_with_filter(filter: Expr) -> Expr { + count_star() + .filter(filter) + .build() + .expect("building filtered row count stat expr") +} + +/// Create a `count(DISTINCT column)` statistics expression. +/// +/// Returns an [`Expr`] requesting the number of distinct (unique) +/// non-null values of `column` in each container. This is useful for +/// cardinality estimation and join-order planning. +/// +/// ```text +/// count(DISTINCT column) +/// ``` +/// +/// See also [`stat_column_distinct_count_with_filter`] for a filtered +/// variant. +pub fn stat_column_distinct_count(column: &str) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + datafusion_functions_aggregate::count::count_udaf() + .call(vec![col_expr]) + .distinct() + .build() + .expect("building distinct count stat expr") +} + +/// Create a `count(DISTINCT column) FILTER (WHERE …)` statistics +/// expression. +/// +/// Returns an [`Expr`] requesting the number of distinct non-null values +/// of `column` in each container, restricted to rows that satisfy +/// `filter`. +/// +/// ```text +/// count(DISTINCT column) FILTER (WHERE filter) +/// ``` +pub fn stat_column_distinct_count_with_filter(column: &str, filter: Expr) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + datafusion_functions_aggregate::count::count_udaf() + .call(vec![col_expr]) + .distinct() + .filter(filter) + .build() + .expect("building filtered distinct count stat expr") +} + +/// Builds a bare `count(*)` aggregate expression (no filter, no distinct). +fn count_star() -> Expr { + datafusion_functions_aggregate::count::count_udaf() + .call(vec![Expr::Literal(ScalarValue::Boolean(Some(true)), None)]) +} + +/// A source of runtime statistical information for pruning. +/// +/// This trait accepts a set of [`Expr`] expressions and returns +/// statistics for those expressions that can be used for pruning. +/// +/// It is up to implementors to determine how to collect these statistics. +/// Some example use cases include: +/// 1. Matching on basic expressions like `min(column)` or `max(column)` +/// and returning statistics from file metadata. +/// 2. Sampling data at runtime to get more accurate statistics. +/// 3. Querying an external metastore for statistics. +/// +/// # Supported expression types +/// +/// The following expression types are meaningful for pruning: +/// +/// - **Aggregate functions**: `min(column)`, `max(column)`, +/// `count(*) FILTER (WHERE column IS NULL)`, `count(*)`, +/// `count(DISTINCT column)`, and filtered variants of all of these. +/// - **InList**: `column IN (v1, v2, ...)` — see [InList semantics] below. +/// +/// Use the `stat_*` helper functions ([`stat_column_min`], +/// [`stat_column_max`], [`stat_column_null_count`], [`stat_row_count`], +/// [`stat_column_distinct_count`], and their `_with_filter` variants) to +/// construct these expressions in a canonical form. +/// +/// Implementors return `None` for any expression they cannot answer. +/// +/// # InList semantics +/// +/// For `column IN (v1, v2, ..., vN)`, the returned `BooleanArray` has one +/// entry per container with three-valued logic: +/// +/// - `true` — the column in this container ONLY contains values in +/// `{v1, ..., vN}`. Every row in the container satisfies the `IN` +/// predicate (assuming non-null values; see below). +/// - `false` — the column in this container contains NONE of the values +/// in `{v1, ..., vN}`. No row can satisfy the `IN` predicate, so the +/// container can be pruned. +/// - `null` — it is not known whether the column contains any of the +/// values. The container cannot be pruned. +/// +/// ## Null handling +/// +/// - **Null values in the column**: SQL `IN` returns `NULL` when the +/// column value is `NULL`, regardless of the list contents. Containers +/// where the column has null values should return `null` (unknown) +/// unless the implementation can determine that all non-null values +/// still satisfy or violate the predicate. +/// - **Null values in the list** (`column IN (1, NULL, 3)`): Per SQL +/// semantics, `x IN (1, NULL, 3)` returns `TRUE` if `x` is 1 or 3, +/// `NULL` if `x` is any other non-null value (because `x = NULL` is +/// unknown), and `NULL` if `x` is `NULL`. Null literals in the list +/// therefore weaken pruning — a container can no longer return `false` +/// unless it can prove the column has no values at all. +/// - **`NOT IN` with nulls** (`column NOT IN (1, NULL, 3)`): This can +/// never return `TRUE` for non-null column values because `x != NULL` +/// is always unknown. A container can only be pruned if it is known +/// to contain exclusively values in the list. +#[async_trait::async_trait] +pub trait StatisticsSource: Send + Sync { + /// Returns the number of containers (row groups, files, etc.) that + /// statistics are provided for. All returned arrays must have this length. + fn num_containers(&self) -> usize; + + /// Returns statistics for each expression, or `None` for expressions + /// that cannot be answered. + async fn expression_statistics( + &self, + expressions: &[Expr], + ) -> Result>, DataFusionError>; +} + +/// Blanket implementation of [`StatisticsSource`] for types that implement +/// [`PruningStatistics`]. +/// +/// This allows any type that implements [`PruningStatistics`] to be used as +/// a [`StatisticsSource`] without needing to implement the trait directly. +/// +/// The implementation matches on expressions that can be directly answered +/// by the underlying [`PruningStatistics`]: +/// - `min(column)` → [`PruningStatistics::min_values`] +/// - `max(column)` → [`PruningStatistics::max_values`] +/// - `count(*)` → [`PruningStatistics::row_counts`] (total row count) +/// - `count(*) FILTER (WHERE column IS NULL)` → [`PruningStatistics::null_counts`] +/// - `column IN (lit1, lit2, ...)` → [`PruningStatistics::contained`] +/// +/// Any other expressions return `None`. +#[async_trait::async_trait] +impl StatisticsSource for T { + fn num_containers(&self) -> usize { + PruningStatistics::num_containers(self) + } + + async fn expression_statistics( + &self, + expressions: &[Expr], + ) -> Result>, DataFusionError> { + Ok(expressions + .iter() + .map(|expr| resolve_expression_sync(self, expr)) + .collect()) + } +} + +/// Pre-resolved statistics cache. Created asynchronously via +/// [`StatisticsSource`], evaluated synchronously by +/// [`PruningPredicate::evaluate`]. +/// +/// Keyed by [`Expr`] so that a single cache can serve multiple +/// [`PruningPredicate`](crate::PruningPredicate) instances (e.g., after dynamic filter changes +/// rebuild the predicate but reuse the same resolved stats). +/// Missing entries are treated as unknown — safe for pruning +/// (the predicate will conservatively keep the container). +/// +/// [`PruningPredicate::evaluate`]: crate::PruningPredicate::evaluate +pub struct ResolvedStatistics { + num_containers: usize, + cache: HashMap, +} + +impl ResolvedStatistics { + /// Create an empty cache with no resolved statistics. + /// All lookups will return `None`, causing `evaluate()` to use + /// null arrays (conservative — no pruning). + pub fn new_empty(num_containers: usize) -> Self { + Self { + num_containers, + cache: HashMap::new(), + } + } + + /// Resolve statistics for the given expressions from an async source. + pub async fn resolve( + source: &(impl StatisticsSource + ?Sized), + expressions: &[Expr], + ) -> Result { + let num_containers = source.num_containers(); + let arrays = source.expression_statistics(expressions).await?; + let cache = expressions + .iter() + .zip(arrays) + .filter_map(|(expr, arr)| arr.map(|a| (expr.clone(), a))) + .collect(); + Ok(Self { + num_containers, + cache, + }) + } + + /// Look up a resolved expression. Returns `None` if not in cache. + pub fn get(&self, expr: &Expr) -> Option<&ArrayRef> { + self.cache.get(expr) + } + + /// Look up a resolved expression, returning a null array of the given + /// type if the expression is not in the cache. + pub fn get_or_null( + &self, + expr: &Expr, + data_type: &arrow::datatypes::DataType, + ) -> ArrayRef { + self.cache + .get(expr) + .cloned() + .unwrap_or_else(|| new_null_array(data_type, self.num_containers)) + } + + /// Returns the number of containers these statistics cover. + pub fn num_containers(&self) -> usize { + self.num_containers + } +} + +/// Resolve a single expression synchronously against a [`PruningStatistics`] impl. +pub(crate) fn resolve_expression_sync( + stats: &(impl PruningStatistics + ?Sized), + expr: &Expr, +) -> Option { + match expr { + Expr::AggregateFunction(func) => resolve_aggregate_function(stats, func), + Expr::InList(in_list) => resolve_in_list(stats, in_list), + _ => None, + } +} + +/// Resolve all expressions synchronously against a [`PruningStatistics`] impl, +/// returning a [`ResolvedStatistics`] cache. +pub(crate) fn resolve_all_sync( + stats: &(impl PruningStatistics + ?Sized), + expressions: &[Expr], +) -> ResolvedStatistics { + let num_containers = stats.num_containers(); + let cache = expressions + .iter() + .filter_map(|expr| { + resolve_expression_sync(stats, expr).map(|arr| (expr.clone(), arr)) + }) + .collect(); + ResolvedStatistics { + num_containers, + cache, + } +} + +/// Resolve an aggregate function expression against [`PruningStatistics`]. +fn resolve_aggregate_function( + stats: &(impl PruningStatistics + ?Sized), + func: &datafusion_expr::expr::AggregateFunction, +) -> Option { + let name = func.func.name(); + + match name { + "min" | "MIN" => { + if func.params.filter.is_some() { + return None; + } + if let Some(Expr::Column(col)) = func.params.args.first() { + return stats.min_values(col); + } + } + "max" | "MAX" => { + if func.params.filter.is_some() { + return None; + } + if let Some(Expr::Column(col)) = func.params.args.first() { + return stats.max_values(col); + } + } + "count" | "COUNT" => { + if let Some(filter) = &func.params.filter { + // count(*) FILTER (WHERE col IS NULL) → null_counts + if let Expr::IsNull(inner) = filter.as_ref() + && let Expr::Column(col) = inner.as_ref() + { + return stats.null_counts(col); + } + } else { + // count(*) without filter → total row count + return stats.row_counts(); + } + } + _ => {} + } + + None +} + +/// Resolve an `IN` list expression against [`PruningStatistics::contained`]. +/// +/// Only supports `column IN (literal, literal, ...)`. Returns `None` for +/// expressions with non-column left-hand sides or non-literal list items. +/// +/// For `NOT IN`, the result is inverted: `true` becomes `false` and vice +/// versa, while `null` stays `null`. See the [InList semantics] section +/// on [`StatisticsSource`] for details on null handling. +fn resolve_in_list( + stats: &(impl PruningStatistics + ?Sized), + in_list: &datafusion_expr::expr::InList, +) -> Option { + // Only support `column IN (literal, literal, ...)` + let Expr::Column(col) = in_list.expr.as_ref() else { + return None; + }; + + let mut values = HashSet::with_capacity(in_list.list.len()); + for item in &in_list.list { + match item { + Expr::Literal(scalar, _) => { + values.insert(scalar.clone()); + } + _ => return None, // non-literal in list, can't resolve + } + } + + let result = stats.contained(col, &values)?; + if in_list.negated { + // NOT IN: invert the contained result + let inverted = arrow::compute::not(&result).ok()?; + Some(std::sync::Arc::new(inverted) as ArrayRef) + } else { + Some(std::sync::Arc::new(result) as ArrayRef) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{BooleanArray, Int64Array, UInt64Array}; + use arrow::datatypes::DataType; + use datafusion_common::pruning::PruningStatistics; + use datafusion_expr::ExprFunctionExt; + use std::sync::Arc; + + /// A simple mock PruningStatistics for testing. + #[derive(Debug)] + struct MockStats { + min: ArrayRef, + max: ArrayRef, + null_counts: ArrayRef, + row_counts: ArrayRef, + contained_result: Option, + } + + impl MockStats { + fn new() -> Self { + Self { + min: Arc::new(Int64Array::from(vec![Some(1), Some(10)])), + max: Arc::new(Int64Array::from(vec![Some(5), Some(20)])), + null_counts: Arc::new(UInt64Array::from(vec![0, 2])), + row_counts: Arc::new(UInt64Array::from(vec![100, 100])), + contained_result: None, + } + } + + fn with_contained(mut self, result: BooleanArray) -> Self { + self.contained_result = Some(result); + self + } + } + + impl PruningStatistics for MockStats { + fn min_values(&self, _column: &Column) -> Option { + Some(Arc::clone(&self.min)) + } + fn max_values(&self, _column: &Column) -> Option { + Some(Arc::clone(&self.max)) + } + fn num_containers(&self) -> usize { + self.min.len() + } + fn null_counts(&self, _column: &Column) -> Option { + Some(Arc::clone(&self.null_counts)) + } + fn row_counts(&self) -> Option { + Some(Arc::clone(&self.row_counts)) + } + fn contained( + &self, + _column: &Column, + _values: &HashSet, + ) -> Option { + self.contained_result.clone() + } + } + + fn col_expr(name: &str) -> Expr { + Expr::Column(Column::new_unqualified(name)) + } + + #[test] + fn test_resolve_min() { + let stats = MockStats::new(); + let expr = + datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr("a")]); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + let arr = result.unwrap(); + assert_eq!(arr.len(), 2); + } + + #[test] + fn test_resolve_max() { + let stats = MockStats::new(); + let expr = + datafusion_functions_aggregate::min_max::max_udaf().call(vec![col_expr("a")]); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + let arr = result.unwrap(); + assert_eq!(arr.len(), 2); + } + + #[test] + fn test_resolve_count_null() { + let stats = MockStats::new(); + let expr = datafusion_functions_aggregate::count::count_udaf() + .call(vec![Expr::Literal(ScalarValue::Boolean(Some(true)), None)]) + .filter(Expr::IsNull(Box::new(col_expr("a")))) + .build() + .unwrap(); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + } + + #[test] + fn test_resolve_row_count() { + let stats = MockStats::new(); + // count(*) without filter → row_counts (total row count) + let expr = stat_row_count(); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + let arr = result.unwrap(); + let uint_arr = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(uint_arr.value(0), 100); + assert_eq!(uint_arr.value(1), 100); + } + + #[test] + fn test_resolve_count_not_null_returns_none() { + // count(*) FILTER (WHERE col IS NOT NULL) is no longer a recognized + // statistics expression — it should return None. + let stats = MockStats::new(); + let expr = datafusion_functions_aggregate::count::count_udaf() + .call(vec![Expr::Literal(ScalarValue::Boolean(Some(true)), None)]) + .filter(Expr::IsNotNull(Box::new(col_expr("a")))) + .build() + .unwrap(); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_none()); + } + + #[test] + fn test_resolve_unsupported_returns_none() { + let stats = MockStats::new(); + // A plain column is not a supported expression for stats + let result = resolve_expression_sync(&stats, &col_expr("a")); + assert!(result.is_none()); + } + + #[test] + fn test_resolve_min_with_filter_returns_none() { + let stats = MockStats::new(); + // min(a) FILTER (WHERE a > 0) — not supported + let expr = datafusion_functions_aggregate::min_max::min_udaf() + .call(vec![col_expr("a")]) + .filter(col_expr("a").gt(Expr::Literal(ScalarValue::Int64(Some(0)), None))) + .build() + .unwrap(); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_none()); + } + + #[test] + fn test_resolve_in_list() { + let stats = MockStats::new() + .with_contained(BooleanArray::from(vec![Some(true), Some(false)])); + let expr = Expr::InList(datafusion_expr::expr::InList::new( + Box::new(col_expr("a")), + vec![ + Expr::Literal(ScalarValue::Int64(Some(1)), None), + Expr::Literal(ScalarValue::Int64(Some(2)), None), + ], + false, + )); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + let arr = result.unwrap(); + let bool_arr = arr.as_any().downcast_ref::().unwrap(); + assert!(bool_arr.value(0)); + assert!(!bool_arr.value(1)); + } + + #[test] + fn test_resolve_not_in_list() { + let stats = MockStats::new() + .with_contained(BooleanArray::from(vec![Some(true), Some(false)])); + let expr = Expr::InList(datafusion_expr::expr::InList::new( + Box::new(col_expr("a")), + vec![Expr::Literal(ScalarValue::Int64(Some(1)), None)], + true, // negated + )); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + let arr = result.unwrap(); + let bool_arr = arr.as_any().downcast_ref::().unwrap(); + // Inverted: true→false, false→true + assert!(!bool_arr.value(0)); + assert!(bool_arr.value(1)); + } + + #[test] + fn test_resolve_all_sync_builds_cache() { + let stats = MockStats::new(); + let exprs = vec![ + datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr("a")]), + datafusion_functions_aggregate::min_max::max_udaf().call(vec![col_expr("a")]), + col_expr("unsupported"), // should be missing from cache + ]; + + let resolved = resolve_all_sync(&stats, &exprs); + assert_eq!(resolved.num_containers(), 2); + assert!(resolved.get(&exprs[0]).is_some()); // min + assert!(resolved.get(&exprs[1]).is_some()); // max + assert!(resolved.get(&exprs[2]).is_none()); // unsupported + } + + #[test] + fn test_resolved_statistics_get_or_null() { + let stats = MockStats::new(); + let min_expr = + datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr("a")]); + let resolved = resolve_all_sync(&stats, std::slice::from_ref(&min_expr)); + + // Existing entry + let arr = resolved.get_or_null(&min_expr, &DataType::Int64); + assert_eq!(arr.len(), 2); + assert_eq!(arr.null_count(), 0); + + // Missing entry → null array + let missing = col_expr("missing"); + let arr = resolved.get_or_null(&missing, &DataType::Int32); + assert_eq!(arr.len(), 2); + assert_eq!(arr.null_count(), 2); + } + + #[tokio::test] + async fn test_resolved_statistics_resolve_async() { + let stats = MockStats::new(); + let exprs = vec![ + datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr("a")]), + datafusion_functions_aggregate::min_max::max_udaf().call(vec![col_expr("a")]), + ]; + + let resolved = ResolvedStatistics::resolve(&stats, &exprs).await.unwrap(); + assert_eq!(resolved.num_containers(), 2); + assert!(resolved.get(&exprs[0]).is_some()); + assert!(resolved.get(&exprs[1]).is_some()); + } + + #[test] + fn test_new_empty_resolved_statistics() { + let resolved = ResolvedStatistics::new_empty(5); + assert_eq!(resolved.num_containers(), 5); + let expr = col_expr("any"); + assert!(resolved.get(&expr).is_none()); + } + + #[test] + fn test_stat_helpers_match_resolved_expressions() { + // Verify that the helper functions produce expressions that are + // correctly resolved by the blanket PruningStatistics → StatisticsSource impl. + let stats = MockStats::new(); + + let min = stat_column_min("a"); + assert!(resolve_expression_sync(&stats, &min).is_some()); + + let max = stat_column_max("a"); + assert!(resolve_expression_sync(&stats, &max).is_some()); + + let nulls = stat_column_null_count("a"); + assert!(resolve_expression_sync(&stats, &nulls).is_some()); + + let rows = stat_row_count(); + assert!(resolve_expression_sync(&stats, &rows).is_some()); + } +}