From 91fe94e7d65fbe22d78e7f64f1ee4d4bc65aa0e3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 25 Mar 2026 15:31:35 -0500 Subject: [PATCH 1/5] feat(pruning): add StatisticsSource trait with two-phase resolve/evaluate API Introduces a new expression-based statistics API for pruning that separates async data resolution from sync predicate evaluation. - StatisticsSource trait: accepts &[Expr], returns Vec> - ResolvedStatistics: HashMap cache for pre-resolved stats - PruningPredicate::evaluate(): sync evaluation against pre-resolved cache - PruningPredicate::all_required_expressions(): exposes needed Expr list - Blanket impl bridges existing PruningStatistics implementations - prune() refactored to delegate through resolve_all_sync + evaluate This enables async statistics sources (external metastores, runtime sampling) while keeping the evaluation path synchronous for use in Stream::poll_next() contexts like EarlyStoppingStream. Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 3 + datafusion/pruning/Cargo.toml | 5 + datafusion/pruning/src/lib.rs | 2 + datafusion/pruning/src/pruning_predicate.rs | 293 +++++++++-- datafusion/pruning/src/statistics.rs | 544 ++++++++++++++++++++ 5 files changed, 817 insertions(+), 30 deletions(-) create mode 100644 datafusion/pruning/src/statistics.rs diff --git a/Cargo.lock b/Cargo.lock index 895b3059f50c1..22401b2ec675b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2560,10 +2560,12 @@ name = "datafusion-pruning" version = "53.0.0" dependencies = [ "arrow", + "async-trait", "datafusion-common", "datafusion-datasource", "datafusion-expr", "datafusion-expr-common", + "datafusion-functions-aggregate", "datafusion-functions-nested", "datafusion-physical-expr", "datafusion-physical-expr-common", @@ -2571,6 +2573,7 @@ dependencies = [ "insta", "itertools 0.14.0", "log", + "tokio", ] [[package]] diff --git a/datafusion/pruning/Cargo.toml b/datafusion/pruning/Cargo.toml index e6f4bb6f273c9..9bf2cc8321320 100644 --- a/datafusion/pruning/Cargo.toml +++ b/datafusion/pruning/Cargo.toml @@ -17,9 +17,13 @@ workspace = true [dependencies] arrow = { workspace = true } +async-trait = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-datasource = { workspace = true } +datafusion-expr = { workspace = true, default-features = true } datafusion-expr-common = { workspace = true, default-features = true } +datafusion-functions-aggregate = { workspace = true, default-features = true } +datafusion-functions-nested = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } @@ -30,3 +34,4 @@ datafusion-expr = { workspace = true } datafusion-functions-nested = { workspace = true } insta = { workspace = true } itertools = { workspace = true } +tokio = { workspace = true } diff --git a/datafusion/pruning/src/lib.rs b/datafusion/pruning/src/lib.rs index be17f29eaafa0..4fa95cc211a97 100644 --- a/datafusion/pruning/src/lib.rs +++ b/datafusion/pruning/src/lib.rs @@ -19,9 +19,11 @@ mod file_pruner; mod pruning_predicate; +mod statistics; pub use file_pruner::FilePruner; pub use pruning_predicate::{ PredicateRewriter, PruningPredicate, PruningStatistics, RequiredColumns, UnhandledPredicateHook, build_pruning_predicate, }; +pub use statistics::{ResolvedStatistics, StatisticsSource}; diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 978d79b1f2fb0..033b3c3157bab 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use arrow::array::AsArray; use arrow::{ - array::{ArrayRef, BooleanArray, new_null_array}, + array::{ArrayRef, BooleanArray}, datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, }; @@ -41,6 +41,7 @@ use datafusion_common::{ ScalarValue, internal_datafusion_err, plan_datafusion_err, plan_err, tree_node::{Transformed, TreeNode}, }; +use datafusion_expr::ExprFunctionExt; use datafusion_expr_common::operator::Operator; use datafusion_physical_expr::expressions::CastColumnExpr; use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee}; @@ -521,54 +522,97 @@ impl PruningPredicate { &self, statistics: &S, ) -> Result> { - let mut builder = BoolVecBuilder::new(statistics.num_containers()); + let resolved = crate::statistics::resolve_all_sync( + statistics, + &self.all_required_expressions(), + ); + self.evaluate(&resolved) + } + + /// Evaluates the pruning predicate against pre-resolved statistics. + /// + /// This is the sync evaluation phase of the two-phase + /// resolve-then-evaluate pattern. Statistics are resolved ahead of + /// time (possibly asynchronously via [`StatisticsSource`]) into a + /// [`ResolvedStatistics`] cache, then this method evaluates the + /// predicate against that cache synchronously. + /// + /// Returns the same `true`/`false` semantics as [`Self::prune`]: + /// - `true`: There MAY be rows that match the predicate + /// - `false`: There are no rows that could possibly match + /// + /// Missing entries in `resolved` are treated as unknown (null arrays), + /// which is conservative — the container will not be pruned. + /// + /// [`StatisticsSource`]: crate::StatisticsSource + /// [`ResolvedStatistics`]: crate::ResolvedStatistics + pub fn evaluate( + &self, + resolved: &crate::statistics::ResolvedStatistics, + ) -> Result> { + let mut builder = BoolVecBuilder::new(resolved.num_containers()); - // Try to prove the predicate can't be true for the containers based on - // literal guarantees + // Phase 1: Literal guarantees (InList lookups) for literal_guarantee in &self.literal_guarantees { let LiteralGuarantee { column, guarantee, literals, } = literal_guarantee; - if let Some(results) = statistics.contained(column, literals) { - match guarantee { - // `In` means the values in the column must be one of the - // values in the set for the predicate to evaluate to true. - // If `contained` returns false, that means the column is - // not any of the values so we can prune the container - Guarantee::In => builder.combine_array(&results), - // `NotIn` means the values in the column must not be - // any of the values in the set for the predicate to - // evaluate to true. If `contained` returns true, it means the - // column is only in the set of values so we can prune the - // container - Guarantee::NotIn => { - builder.combine_array(&arrow::compute::not(&results)?) - } - } - // if all containers are pruned (has rows that DEFINITELY DO NOT pass the predicate) - // can return early without evaluating the rest of predicates. + + // Build the InList Expr that corresponds to this guarantee + let in_list_expr = literal_guarantee_to_in_list( + column, + literals, + matches!(guarantee, Guarantee::NotIn), + ); + + if let Some(array) = resolved.get(&in_list_expr) + && let Some(bool_arr) = array.as_any().downcast_ref::() + { + builder.combine_array(bool_arr); if builder.check_all_pruned() { return Ok(builder.build()); } } } - // Next, try to prove the predicate can't be true for the containers based - // on min/max values - - // build a RecordBatch that contains the min/max values in the - // appropriate statistics columns for the min/max predicate + // Phase 2: Min/max/null_count/row_count predicate let statistics_batch = - build_statistics_record_batch(statistics, &self.required_columns)?; - - // Evaluate the pruning predicate on that record batch and append any results to the builder + build_statistics_record_batch_from_resolved(resolved, &self.required_columns)?; builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?); Ok(builder.build()) } + /// Returns all expressions needed to fully evaluate this predicate, + /// including both aggregate statistics and literal guarantee InLists. + /// + /// Pass these to [`StatisticsSource::expression_statistics`] or + /// [`ResolvedStatistics::resolve`] to pre-fetch the needed data. + /// + /// [`StatisticsSource::expression_statistics`]: crate::StatisticsSource::expression_statistics + /// [`ResolvedStatistics::resolve`]: crate::ResolvedStatistics::resolve + pub fn all_required_expressions(&self) -> Vec { + let mut exprs = Vec::new(); + + // Aggregate stats from RequiredColumns + for (column, statistics_type, _field) in self.required_columns.iter() { + exprs.push(stat_type_to_expr(column, *statistics_type)); + } + + // Literal guarantee InList expressions + for lg in &self.literal_guarantees { + exprs.push(literal_guarantee_to_in_list( + &lg.column, + &lg.literals, + matches!(lg.guarantee, Guarantee::NotIn), + )); + } + + exprs + } + /// Return a reference to the input schema pub fn schema(&self) -> &SchemaRef { &self.schema @@ -913,10 +957,12 @@ impl From> for RequiredColumns { /// -------+-------- /// 5 | 1000 /// ``` +#[cfg(test)] fn build_statistics_record_batch( statistics: &S, required_columns: &RequiredColumns, ) -> Result { + use arrow::array::new_null_array; let mut arrays = Vec::::new(); // For each needed statistics column: for (column, statistics_type, stat_field) in required_columns.iter() { @@ -1943,6 +1989,96 @@ fn wrap_null_count_check_expr( ))) } +/// Convert a [`StatisticsType`] + column into the corresponding logical [`Expr`]. +fn stat_type_to_expr( + column: &phys_expr::Column, + stat_type: StatisticsType, +) -> datafusion_expr::Expr { + use datafusion_expr::Expr as LExpr; + let col_expr = LExpr::Column(Column::new_unqualified(column.name())); + match stat_type { + StatisticsType::Min => { + datafusion_functions_aggregate::min_max::min_udaf() + .call(vec![col_expr]) + } + StatisticsType::Max => { + datafusion_functions_aggregate::min_max::max_udaf() + .call(vec![col_expr]) + } + StatisticsType::NullCount => { + let count_expr = datafusion_functions_aggregate::count::count_udaf() + .call(vec![LExpr::Literal(ScalarValue::Boolean(Some(true)), None)]); + count_expr + .filter(LExpr::IsNull(Box::new(col_expr))) + .build() + .expect("building count filter expr") + } + StatisticsType::RowCount => { + let count_expr = datafusion_functions_aggregate::count::count_udaf() + .call(vec![LExpr::Literal(ScalarValue::Boolean(Some(true)), None)]); + count_expr + .filter(LExpr::IsNotNull(Box::new(col_expr))) + .build() + .expect("building count filter expr") + } + } +} + +/// Convert a [`LiteralGuarantee`] into an `Expr::InList`. +fn literal_guarantee_to_in_list( + column: &Column, + literals: &HashSet, + negated: bool, +) -> datafusion_expr::Expr { + datafusion_expr::Expr::InList(datafusion_expr::expr::InList::new( + Box::new(datafusion_expr::Expr::Column(column.clone())), + literals + .iter() + .map(|s| datafusion_expr::Expr::Literal(s.clone(), None)) + .collect(), + negated, + )) +} + +/// Build a statistics [`RecordBatch`] from a [`ResolvedStatistics`] cache, +/// looking up each required column's expression and falling back to null +/// arrays for missing entries. +fn build_statistics_record_batch_from_resolved( + resolved: &crate::statistics::ResolvedStatistics, + required_columns: &RequiredColumns, +) -> Result { + let mut arrays = Vec::::new(); + let num_containers = resolved.num_containers(); + + for (column, statistics_type, stat_field) in required_columns.iter() { + let data_type = stat_field.data_type(); + let stat_expr = stat_type_to_expr(column, *statistics_type); + + let array = resolved.get_or_null(&stat_expr, data_type); + + assert_eq_or_internal_err!( + num_containers, + array.len(), + "mismatched statistics length. Expected {}, got {}", + num_containers, + array.len() + ); + + let array = arrow::compute::cast(&array, data_type)?; + arrays.push(array); + } + + let schema = Arc::new(required_columns.schema()); + let mut options = RecordBatchOptions::default(); + options.row_count = Some(num_containers); + + trace!("Creating statistics batch from resolved for {required_columns:#?} with {arrays:#?}"); + + RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| { + plan_datafusion_err!("Can not create statistics record batch: {err}") + }) +} + #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum StatisticsType { Min, @@ -5443,4 +5579,101 @@ mod tests { "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1"; assert_eq!(res.to_string(), expected); } + + /// Test that evaluate() produces the same results as prune() for basic predicates + #[test] + fn test_evaluate_matches_prune() { + // i > 5 with 3 containers + let schema = Schema::new(vec![Field::new("i", DataType::Int32, true)]); + let statistics = TestStatistics::new().with( + "i", + ContainerStats::new_i32( + vec![Some(1), Some(6), Some(3)], // min + vec![Some(4), Some(10), Some(8)], // max + ), + ); + + let expr = col("i").gt(lit(5i32)); + let p = PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)).unwrap(); + + let prune_result = p.prune(&statistics).unwrap(); + let resolved = crate::statistics::resolve_all_sync( + &statistics, + &p.all_required_expressions(), + ); + let evaluate_result = p.evaluate(&resolved).unwrap(); + + assert_eq!(prune_result, evaluate_result); + // Container 0: max=4, 4 > 5 is false → prune + // Container 1: max=10, 10 > 5 is true → keep + // Container 2: max=8, 8 > 5 is true → keep + assert_eq!(evaluate_result, vec![false, true, true]); + } + + /// Test evaluate with null counts and row counts + #[test] + fn test_evaluate_with_null_counts() { + let schema = Schema::new(vec![Field::new("i", DataType::Int32, true)]); + let statistics = TestStatistics::new().with( + "i", + ContainerStats::new_i32( + vec![Some(0), Some(0)], + vec![Some(0), Some(0)], + ) + .with_null_counts(vec![Some(10), Some(0)]) + .with_row_counts(vec![Some(10), Some(10)]), + ); + + // i = 0: first container is all nulls, should be pruned + let expr = col("i").eq(lit(0i32)); + let p = PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)).unwrap(); + + let prune_result = p.prune(&statistics).unwrap(); + let resolved = crate::statistics::resolve_all_sync( + &statistics, + &p.all_required_expressions(), + ); + let evaluate_result = p.evaluate(&resolved).unwrap(); + + assert_eq!(prune_result, evaluate_result); + } + + /// Test evaluate with missing cache entries (should produce null → conservative keep) + #[test] + fn test_evaluate_missing_cache_entries() { + let schema = Schema::new(vec![Field::new("i", DataType::Int32, true)]); + let statistics = TestStatistics::new().with( + "i", + ContainerStats::new_i32( + vec![Some(1), Some(6)], + vec![Some(4), Some(10)], + ), + ); + + let expr = col("i").gt(lit(5i32)); + let p = PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)).unwrap(); + + // Empty resolved stats — everything should be kept (conservative) + let resolved = crate::statistics::ResolvedStatistics::new_empty(2); + let evaluate_result = p.evaluate(&resolved).unwrap(); + assert_eq!(evaluate_result, vec![true, true]); + } + + /// Test that all_required_expressions() generates the right Expr types + #[test] + fn test_all_required_expressions() { + let schema = Schema::new(vec![Field::new("i", DataType::Int32, true)]); + let expr = col("i").eq(lit(5i32)); + let p = PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)).unwrap(); + + let exprs = p.all_required_expressions(); + // i = 5 requires: min(i), max(i), count(*) filter (where i is null), + // count(*) filter (where i is not null) + assert!(exprs.len() >= 2, "Expected at least min and max, got {}", exprs.len()); + + // Check that we have min and max expressions + let expr_strings: Vec = exprs.iter().map(|e| e.to_string()).collect(); + assert!(expr_strings.iter().any(|s| s.contains("min")), "Expected min expr in {:?}", expr_strings); + assert!(expr_strings.iter().any(|s| s.contains("max")), "Expected max expr in {:?}", expr_strings); + } } diff --git a/datafusion/pruning/src/statistics.rs b/datafusion/pruning/src/statistics.rs new file mode 100644 index 0000000000000..a5809b8b87375 --- /dev/null +++ b/datafusion/pruning/src/statistics.rs @@ -0,0 +1,544 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{new_null_array, ArrayRef}; +use datafusion_common::pruning::PruningStatistics; +use datafusion_expr::Expr; +use std::collections::{HashMap, HashSet}; + +use datafusion_common::error::DataFusionError; + +/// A source of runtime statistical information for pruning. +/// +/// This trait accepts a set of [`Expr`] expressions and returns +/// statistics for those expressions that can be used for pruning. +/// +/// It is up to implementors to determine how to collect these statistics. +/// Some example use cases include: +/// 1. Matching on basic expressions like `min(column)` or `max(column)` +/// and returning statistics from file metadata. +/// 2. Sampling data at runtime to get more accurate statistics. +/// 3. Querying an external metastore for statistics. +/// +/// # Supported expression types +/// +/// The following expression types are meaningful for pruning: +/// +/// - **Aggregate functions**: `min(column)`, `max(column)`, +/// `count(*) FILTER (WHERE column IS NULL)`, +/// `count(*) FILTER (WHERE column IS NOT NULL)` +/// - **InList**: `column IN (v1, v2, ...)` — see [InList semantics] below. +/// +/// Implementors return `None` for any expression they cannot answer. +/// +/// # InList semantics +/// +/// For `column IN (v1, v2, ..., vN)`, the returned `BooleanArray` has one +/// entry per container with three-valued logic: +/// +/// - `true` — the column in this container ONLY contains values in +/// `{v1, ..., vN}`. Every row in the container satisfies the `IN` +/// predicate (assuming non-null values; see below). +/// - `false` — the column in this container contains NONE of the values +/// in `{v1, ..., vN}`. No row can satisfy the `IN` predicate, so the +/// container can be pruned. +/// - `null` — it is not known whether the column contains any of the +/// values. The container cannot be pruned. +/// +/// ## Null handling +/// +/// - **Null values in the column**: SQL `IN` returns `NULL` when the +/// column value is `NULL`, regardless of the list contents. Containers +/// where the column has null values should return `null` (unknown) +/// unless the implementation can determine that all non-null values +/// still satisfy or violate the predicate. +/// - **Null values in the list** (`column IN (1, NULL, 3)`): Per SQL +/// semantics, `x IN (1, NULL, 3)` returns `TRUE` if `x` is 1 or 3, +/// `NULL` if `x` is any other non-null value (because `x = NULL` is +/// unknown), and `NULL` if `x` is `NULL`. Null literals in the list +/// therefore weaken pruning — a container can no longer return `false` +/// unless it can prove the column has no values at all. +/// - **`NOT IN` with nulls** (`column NOT IN (1, NULL, 3)`): This can +/// never return `TRUE` for non-null column values because `x != NULL` +/// is always unknown. A container can only be pruned if it is known +/// to contain exclusively values in the list. +#[async_trait::async_trait] +pub trait StatisticsSource: Send + Sync { + /// Returns the number of containers (row groups, files, etc.) that + /// statistics are provided for. All returned arrays must have this length. + fn num_containers(&self) -> usize; + + /// Returns statistics for each expression, or `None` for expressions + /// that cannot be answered. + async fn expression_statistics( + &self, + expressions: &[Expr], + ) -> Result>, DataFusionError>; +} + +/// Blanket implementation of [`StatisticsSource`] for types that implement +/// [`PruningStatistics`]. +/// +/// This allows any type that implements [`PruningStatistics`] to be used as +/// a [`StatisticsSource`] without needing to implement the trait directly. +/// +/// The implementation matches on expressions that can be directly answered +/// by the underlying [`PruningStatistics`]: +/// - `min(column)` → [`PruningStatistics::min_values`] +/// - `max(column)` → [`PruningStatistics::max_values`] +/// - `count(*) FILTER (WHERE column IS NOT NULL)` → [`PruningStatistics::row_counts`] +/// - `count(*) FILTER (WHERE column IS NULL)` → [`PruningStatistics::null_counts`] +/// - `column IN (lit1, lit2, ...)` → [`PruningStatistics::contained`] +/// +/// Any other expressions return `None`. +#[async_trait::async_trait] +impl StatisticsSource for T { + fn num_containers(&self) -> usize { + PruningStatistics::num_containers(self) + } + + async fn expression_statistics( + &self, + expressions: &[Expr], + ) -> Result>, DataFusionError> { + Ok(expressions + .iter() + .map(|expr| resolve_expression_sync(self, expr)) + .collect()) + } +} + +/// Pre-resolved statistics cache. Created asynchronously via +/// [`StatisticsSource`], evaluated synchronously by +/// [`PruningPredicate::evaluate`]. +/// +/// Keyed by [`Expr`] so that a single cache can serve multiple +/// [`PruningPredicate`] instances (e.g., after dynamic filter changes +/// rebuild the predicate but reuse the same resolved stats). +/// Missing entries are treated as unknown — safe for pruning +/// (the predicate will conservatively keep the container). +/// +/// [`PruningPredicate::evaluate`]: crate::PruningPredicate::evaluate +pub struct ResolvedStatistics { + num_containers: usize, + cache: HashMap, +} + +impl ResolvedStatistics { + /// Create an empty cache with no resolved statistics. + /// All lookups will return `None`, causing `evaluate()` to use + /// null arrays (conservative — no pruning). + pub fn new_empty(num_containers: usize) -> Self { + Self { + num_containers, + cache: HashMap::new(), + } + } + + /// Resolve statistics for the given expressions from an async source. + pub async fn resolve( + source: &(impl StatisticsSource + ?Sized), + expressions: &[Expr], + ) -> Result { + let num_containers = source.num_containers(); + let arrays = source.expression_statistics(expressions).await?; + let cache = expressions + .iter() + .zip(arrays) + .filter_map(|(expr, arr)| arr.map(|a| (expr.clone(), a))) + .collect(); + Ok(Self { + num_containers, + cache, + }) + } + + /// Look up a resolved expression. Returns `None` if not in cache. + pub fn get(&self, expr: &Expr) -> Option<&ArrayRef> { + self.cache.get(expr) + } + + /// Look up a resolved expression, returning a null array of the given + /// type if the expression is not in the cache. + pub fn get_or_null( + &self, + expr: &Expr, + data_type: &arrow::datatypes::DataType, + ) -> ArrayRef { + self.cache + .get(expr) + .cloned() + .unwrap_or_else(|| new_null_array(data_type, self.num_containers)) + } + + /// Returns the number of containers these statistics cover. + pub fn num_containers(&self) -> usize { + self.num_containers + } +} + +/// Resolve a single expression synchronously against a [`PruningStatistics`] impl. +pub(crate) fn resolve_expression_sync( + stats: &(impl PruningStatistics + ?Sized), + expr: &Expr, +) -> Option { + match expr { + Expr::AggregateFunction(func) => resolve_aggregate_function(stats, func), + Expr::InList(in_list) => resolve_in_list(stats, in_list), + _ => None, + } +} + +/// Resolve all expressions synchronously against a [`PruningStatistics`] impl, +/// returning a [`ResolvedStatistics`] cache. +pub(crate) fn resolve_all_sync( + stats: &(impl PruningStatistics + ?Sized), + expressions: &[Expr], +) -> ResolvedStatistics { + let num_containers = stats.num_containers(); + let cache = expressions + .iter() + .filter_map(|expr| { + resolve_expression_sync(stats, expr).map(|arr| (expr.clone(), arr)) + }) + .collect(); + ResolvedStatistics { + num_containers, + cache, + } +} + +/// Resolve an aggregate function expression against [`PruningStatistics`]. +fn resolve_aggregate_function( + stats: &(impl PruningStatistics + ?Sized), + func: &datafusion_expr::expr::AggregateFunction, +) -> Option { + use datafusion_functions_aggregate::count::Count; + use datafusion_functions_aggregate::min_max::{Max, Min}; + + let udaf = func.func.inner(); + + if udaf.as_any().downcast_ref::().is_some() { + // min(column) — reject if there's a filter + if func.params.filter.is_some() { + return None; + } + if let Some(Expr::Column(col)) = func.params.args.first() { + return stats.min_values(col); + } + } else if udaf.as_any().downcast_ref::().is_some() { + // max(column) — reject if there's a filter + if func.params.filter.is_some() { + return None; + } + if let Some(Expr::Column(col)) = func.params.args.first() { + return stats.max_values(col); + } + } else if udaf.as_any().downcast_ref::().is_some() + && let Some(filter) = &func.params.filter + { + match filter.as_ref() { + // count(*) FILTER (WHERE col IS NOT NULL) → row_counts + Expr::IsNotNull(inner) => { + if let Expr::Column(col) = inner.as_ref() { + return stats.row_counts(col); + } + } + // count(*) FILTER (WHERE col IS NULL) → null_counts + Expr::IsNull(inner) => { + if let Expr::Column(col) = inner.as_ref() { + return stats.null_counts(col); + } + } + _ => {} + } + } + + None +} + +/// Resolve an `IN` list expression against [`PruningStatistics::contained`]. +/// +/// Only supports `column IN (literal, literal, ...)`. Returns `None` for +/// expressions with non-column left-hand sides or non-literal list items. +/// +/// For `NOT IN`, the result is inverted: `true` becomes `false` and vice +/// versa, while `null` stays `null`. See the [InList semantics] section +/// on [`StatisticsSource`] for details on null handling. +fn resolve_in_list( + stats: &(impl PruningStatistics + ?Sized), + in_list: &datafusion_expr::expr::InList, +) -> Option { + // Only support `column IN (literal, literal, ...)` + let Expr::Column(col) = in_list.expr.as_ref() else { + return None; + }; + + let mut values = HashSet::with_capacity(in_list.list.len()); + for item in &in_list.list { + match item { + Expr::Literal(scalar, _) => { + values.insert(scalar.clone()); + } + _ => return None, // non-literal in list, can't resolve + } + } + + let result = stats.contained(col, &values)?; + if in_list.negated { + // NOT IN: invert the contained result + let inverted = arrow::compute::not(&result).ok()?; + Some(std::sync::Arc::new(inverted) as ArrayRef) + } else { + Some(std::sync::Arc::new(result) as ArrayRef) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{BooleanArray, Int64Array, UInt64Array}; + use arrow::datatypes::DataType; + use datafusion_common::pruning::PruningStatistics; + use datafusion_common::{Column, ScalarValue}; + use datafusion_expr::ExprFunctionExt; + use std::sync::Arc; + + /// A simple mock PruningStatistics for testing. + #[derive(Debug)] + struct MockStats { + min: ArrayRef, + max: ArrayRef, + null_counts: ArrayRef, + row_counts: ArrayRef, + contained_result: Option, + } + + impl MockStats { + fn new() -> Self { + Self { + min: Arc::new(Int64Array::from(vec![Some(1), Some(10)])), + max: Arc::new(Int64Array::from(vec![Some(5), Some(20)])), + null_counts: Arc::new(UInt64Array::from(vec![0, 2])), + row_counts: Arc::new(UInt64Array::from(vec![100, 100])), + contained_result: None, + } + } + + fn with_contained(mut self, result: BooleanArray) -> Self { + self.contained_result = Some(result); + self + } + } + + impl PruningStatistics for MockStats { + fn min_values(&self, _column: &Column) -> Option { + Some(Arc::clone(&self.min)) + } + fn max_values(&self, _column: &Column) -> Option { + Some(Arc::clone(&self.max)) + } + fn num_containers(&self) -> usize { + self.min.len() + } + fn null_counts(&self, _column: &Column) -> Option { + Some(Arc::clone(&self.null_counts)) + } + fn row_counts(&self, _column: &Column) -> Option { + Some(Arc::clone(&self.row_counts)) + } + fn contained( + &self, + _column: &Column, + _values: &HashSet, + ) -> Option { + self.contained_result.clone() + } + } + + fn col_expr(name: &str) -> Expr { + Expr::Column(Column::new_unqualified(name)) + } + + #[test] + fn test_resolve_min() { + let stats = MockStats::new(); + let expr = datafusion_functions_aggregate::min_max::min_udaf() + .call(vec![col_expr("a")]); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + let arr = result.unwrap(); + assert_eq!(arr.len(), 2); + } + + #[test] + fn test_resolve_max() { + let stats = MockStats::new(); + let expr = datafusion_functions_aggregate::min_max::max_udaf() + .call(vec![col_expr("a")]); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + let arr = result.unwrap(); + assert_eq!(arr.len(), 2); + } + + #[test] + fn test_resolve_count_null() { + let stats = MockStats::new(); + let expr = datafusion_functions_aggregate::count::count_udaf() + .call(vec![Expr::Wildcard { + qualifier: None, + options: Box::default(), + }]) + .filter(Expr::IsNull(Box::new(col_expr("a")))) + .build() + .unwrap(); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + } + + #[test] + fn test_resolve_count_not_null() { + let stats = MockStats::new(); + let expr = datafusion_functions_aggregate::count::count_udaf() + .call(vec![Expr::Wildcard { + qualifier: None, + options: Box::default(), + }]) + .filter(Expr::IsNotNull(Box::new(col_expr("a")))) + .build() + .unwrap(); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + } + + #[test] + fn test_resolve_unsupported_returns_none() { + let stats = MockStats::new(); + // A plain column is not a supported expression for stats + let result = resolve_expression_sync(&stats, &col_expr("a")); + assert!(result.is_none()); + } + + #[test] + fn test_resolve_min_with_filter_returns_none() { + let stats = MockStats::new(); + // min(a) FILTER (WHERE a > 0) — not supported + let expr = datafusion_functions_aggregate::min_max::min_udaf() + .call(vec![col_expr("a")]) + .filter(col_expr("a").gt(Expr::Literal(ScalarValue::Int64(Some(0)), None))) + .build() + .unwrap(); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_none()); + } + + #[test] + fn test_resolve_in_list() { + let stats = + MockStats::new().with_contained(BooleanArray::from(vec![Some(true), Some(false)])); + let expr = Expr::InList(datafusion_expr::expr::InList::new( + Box::new(col_expr("a")), + vec![ + Expr::Literal(ScalarValue::Int64(Some(1)), None), + Expr::Literal(ScalarValue::Int64(Some(2)), None), + ], + false, + )); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + let arr = result.unwrap(); + let bool_arr = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(bool_arr.value(0), true); + assert_eq!(bool_arr.value(1), false); + } + + #[test] + fn test_resolve_not_in_list() { + let stats = + MockStats::new().with_contained(BooleanArray::from(vec![Some(true), Some(false)])); + let expr = Expr::InList(datafusion_expr::expr::InList::new( + Box::new(col_expr("a")), + vec![Expr::Literal(ScalarValue::Int64(Some(1)), None)], + true, // negated + )); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + let arr = result.unwrap(); + let bool_arr = arr.as_any().downcast_ref::().unwrap(); + // Inverted: true→false, false→true + assert_eq!(bool_arr.value(0), false); + assert_eq!(bool_arr.value(1), true); + } + + #[test] + fn test_resolve_all_sync_builds_cache() { + let stats = MockStats::new(); + let exprs = vec![ + datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr("a")]), + datafusion_functions_aggregate::min_max::max_udaf().call(vec![col_expr("a")]), + col_expr("unsupported"), // should be missing from cache + ]; + + let resolved = resolve_all_sync(&stats, &exprs); + assert_eq!(resolved.num_containers(), 2); + assert!(resolved.get(&exprs[0]).is_some()); // min + assert!(resolved.get(&exprs[1]).is_some()); // max + assert!(resolved.get(&exprs[2]).is_none()); // unsupported + } + + #[test] + fn test_resolved_statistics_get_or_null() { + let stats = MockStats::new(); + let min_expr = + datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr("a")]); + let resolved = resolve_all_sync(&stats, &[min_expr.clone()]); + + // Existing entry + let arr = resolved.get_or_null(&min_expr, &DataType::Int64); + assert_eq!(arr.len(), 2); + assert_eq!(arr.null_count(), 0); + + // Missing entry → null array + let missing = col_expr("missing"); + let arr = resolved.get_or_null(&missing, &DataType::Int32); + assert_eq!(arr.len(), 2); + assert_eq!(arr.null_count(), 2); + } + + #[tokio::test] + async fn test_resolved_statistics_resolve_async() { + let stats = MockStats::new(); + let exprs = vec![ + datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr("a")]), + datafusion_functions_aggregate::min_max::max_udaf().call(vec![col_expr("a")]), + ]; + + let resolved = ResolvedStatistics::resolve(&stats, &exprs).await.unwrap(); + assert_eq!(resolved.num_containers(), 2); + assert!(resolved.get(&exprs[0]).is_some()); + assert!(resolved.get(&exprs[1]).is_some()); + } + + #[test] + fn test_new_empty_resolved_statistics() { + let resolved = ResolvedStatistics::new_empty(5); + assert_eq!(resolved.num_containers(), 5); + let expr = col_expr("any"); + assert!(resolved.get(&expr).is_none()); + } +} From 314acd500b312534b4c9128fa0c8fdd652a772fb Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 25 Mar 2026 16:41:50 -0500 Subject: [PATCH 2/5] fix: address CI lint warnings (clippy, rustdoc, deprecated Wildcard) - Fix broken intra-doc links for Expr, ResolvedStatistics, PruningPredicate - Replace deprecated Expr::Wildcard with Expr::Literal in count expressions - Fix clippy: collapsible if, bool_assert_comparison, uninlined_format_args, cloned_ref_to_slice_refs - Fix unused variable warning in test Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/pruning/src/pruning_predicate.rs | 70 ++++++++++++--------- datafusion/pruning/src/statistics.rs | 40 +++++------- 2 files changed, 59 insertions(+), 51 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 033b3c3157bab..e1dcff24197b2 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -578,8 +578,10 @@ impl PruningPredicate { } // Phase 2: Min/max/null_count/row_count predicate - let statistics_batch = - build_statistics_record_batch_from_resolved(resolved, &self.required_columns)?; + let statistics_batch = build_statistics_record_batch_from_resolved( + resolved, + &self.required_columns, + )?; builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?); Ok(builder.build()) @@ -1989,7 +1991,7 @@ fn wrap_null_count_check_expr( ))) } -/// Convert a [`StatisticsType`] + column into the corresponding logical [`Expr`]. +/// Convert a [`StatisticsType`] + column into the corresponding logical expression. fn stat_type_to_expr( column: &phys_expr::Column, stat_type: StatisticsType, @@ -1998,12 +2000,10 @@ fn stat_type_to_expr( let col_expr = LExpr::Column(Column::new_unqualified(column.name())); match stat_type { StatisticsType::Min => { - datafusion_functions_aggregate::min_max::min_udaf() - .call(vec![col_expr]) + datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr]) } StatisticsType::Max => { - datafusion_functions_aggregate::min_max::max_udaf() - .call(vec![col_expr]) + datafusion_functions_aggregate::min_max::max_udaf().call(vec![col_expr]) } StatisticsType::NullCount => { let count_expr = datafusion_functions_aggregate::count::count_udaf() @@ -2040,7 +2040,7 @@ fn literal_guarantee_to_in_list( )) } -/// Build a statistics [`RecordBatch`] from a [`ResolvedStatistics`] cache, +/// Build a statistics [`RecordBatch`] from a [`crate::ResolvedStatistics`] cache, /// looking up each required column's expression and falling back to null /// arrays for missing entries. fn build_statistics_record_batch_from_resolved( @@ -2072,7 +2072,9 @@ fn build_statistics_record_batch_from_resolved( let mut options = RecordBatchOptions::default(); options.row_count = Some(num_containers); - trace!("Creating statistics batch from resolved for {required_columns:#?} with {arrays:#?}"); + trace!( + "Creating statistics batch from resolved for {required_columns:#?} with {arrays:#?}" + ); RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| { plan_datafusion_err!("Can not create statistics record batch: {err}") @@ -5589,12 +5591,14 @@ mod tests { "i", ContainerStats::new_i32( vec![Some(1), Some(6), Some(3)], // min - vec![Some(4), Some(10), Some(8)], // max + vec![Some(4), Some(10), Some(8)], // max ), ); let expr = col("i").gt(lit(5i32)); - let p = PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)).unwrap(); + let p = + PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)) + .unwrap(); let prune_result = p.prune(&statistics).unwrap(); let resolved = crate::statistics::resolve_all_sync( @@ -5616,17 +5620,16 @@ mod tests { let schema = Schema::new(vec![Field::new("i", DataType::Int32, true)]); let statistics = TestStatistics::new().with( "i", - ContainerStats::new_i32( - vec![Some(0), Some(0)], - vec![Some(0), Some(0)], - ) - .with_null_counts(vec![Some(10), Some(0)]) - .with_row_counts(vec![Some(10), Some(10)]), + ContainerStats::new_i32(vec![Some(0), Some(0)], vec![Some(0), Some(0)]) + .with_null_counts(vec![Some(10), Some(0)]) + .with_row_counts(vec![Some(10), Some(10)]), ); // i = 0: first container is all nulls, should be pruned let expr = col("i").eq(lit(0i32)); - let p = PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)).unwrap(); + let p = + PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)) + .unwrap(); let prune_result = p.prune(&statistics).unwrap(); let resolved = crate::statistics::resolve_all_sync( @@ -5642,16 +5645,15 @@ mod tests { #[test] fn test_evaluate_missing_cache_entries() { let schema = Schema::new(vec![Field::new("i", DataType::Int32, true)]); - let statistics = TestStatistics::new().with( + let _statistics = TestStatistics::new().with( "i", - ContainerStats::new_i32( - vec![Some(1), Some(6)], - vec![Some(4), Some(10)], - ), + ContainerStats::new_i32(vec![Some(1), Some(6)], vec![Some(4), Some(10)]), ); let expr = col("i").gt(lit(5i32)); - let p = PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)).unwrap(); + let p = + PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)) + .unwrap(); // Empty resolved stats — everything should be kept (conservative) let resolved = crate::statistics::ResolvedStatistics::new_empty(2); @@ -5664,16 +5666,28 @@ mod tests { fn test_all_required_expressions() { let schema = Schema::new(vec![Field::new("i", DataType::Int32, true)]); let expr = col("i").eq(lit(5i32)); - let p = PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)).unwrap(); + let p = + PruningPredicate::try_new(logical2physical(&expr, &schema), Arc::new(schema)) + .unwrap(); let exprs = p.all_required_expressions(); // i = 5 requires: min(i), max(i), count(*) filter (where i is null), // count(*) filter (where i is not null) - assert!(exprs.len() >= 2, "Expected at least min and max, got {}", exprs.len()); + assert!( + exprs.len() >= 2, + "Expected at least min and max, got {}", + exprs.len() + ); // Check that we have min and max expressions let expr_strings: Vec = exprs.iter().map(|e| e.to_string()).collect(); - assert!(expr_strings.iter().any(|s| s.contains("min")), "Expected min expr in {:?}", expr_strings); - assert!(expr_strings.iter().any(|s| s.contains("max")), "Expected max expr in {:?}", expr_strings); + assert!( + expr_strings.iter().any(|s| s.contains("min")), + "Expected min expr in {expr_strings:?}" + ); + assert!( + expr_strings.iter().any(|s| s.contains("max")), + "Expected max expr in {expr_strings:?}" + ); } } diff --git a/datafusion/pruning/src/statistics.rs b/datafusion/pruning/src/statistics.rs index a5809b8b87375..f965c23cef60b 100644 --- a/datafusion/pruning/src/statistics.rs +++ b/datafusion/pruning/src/statistics.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{new_null_array, ArrayRef}; +use arrow::array::{ArrayRef, new_null_array}; use datafusion_common::pruning::PruningStatistics; use datafusion_expr::Expr; use std::collections::{HashMap, HashSet}; @@ -127,7 +127,7 @@ impl StatisticsSource for T { /// [`PruningPredicate::evaluate`]. /// /// Keyed by [`Expr`] so that a single cache can serve multiple -/// [`PruningPredicate`] instances (e.g., after dynamic filter changes +/// [`PruningPredicate`](crate::PruningPredicate) instances (e.g., after dynamic filter changes /// rebuild the predicate but reuse the same resolved stats). /// Missing entries are treated as unknown — safe for pruning /// (the predicate will conservatively keep the container). @@ -377,8 +377,8 @@ mod tests { #[test] fn test_resolve_min() { let stats = MockStats::new(); - let expr = datafusion_functions_aggregate::min_max::min_udaf() - .call(vec![col_expr("a")]); + let expr = + datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr("a")]); let result = resolve_expression_sync(&stats, &expr); assert!(result.is_some()); let arr = result.unwrap(); @@ -388,8 +388,8 @@ mod tests { #[test] fn test_resolve_max() { let stats = MockStats::new(); - let expr = datafusion_functions_aggregate::min_max::max_udaf() - .call(vec![col_expr("a")]); + let expr = + datafusion_functions_aggregate::min_max::max_udaf().call(vec![col_expr("a")]); let result = resolve_expression_sync(&stats, &expr); assert!(result.is_some()); let arr = result.unwrap(); @@ -400,10 +400,7 @@ mod tests { fn test_resolve_count_null() { let stats = MockStats::new(); let expr = datafusion_functions_aggregate::count::count_udaf() - .call(vec![Expr::Wildcard { - qualifier: None, - options: Box::default(), - }]) + .call(vec![Expr::Literal(ScalarValue::Boolean(Some(true)), None)]) .filter(Expr::IsNull(Box::new(col_expr("a")))) .build() .unwrap(); @@ -415,10 +412,7 @@ mod tests { fn test_resolve_count_not_null() { let stats = MockStats::new(); let expr = datafusion_functions_aggregate::count::count_udaf() - .call(vec![Expr::Wildcard { - qualifier: None, - options: Box::default(), - }]) + .call(vec![Expr::Literal(ScalarValue::Boolean(Some(true)), None)]) .filter(Expr::IsNotNull(Box::new(col_expr("a")))) .build() .unwrap(); @@ -449,8 +443,8 @@ mod tests { #[test] fn test_resolve_in_list() { - let stats = - MockStats::new().with_contained(BooleanArray::from(vec![Some(true), Some(false)])); + let stats = MockStats::new() + .with_contained(BooleanArray::from(vec![Some(true), Some(false)])); let expr = Expr::InList(datafusion_expr::expr::InList::new( Box::new(col_expr("a")), vec![ @@ -463,14 +457,14 @@ mod tests { assert!(result.is_some()); let arr = result.unwrap(); let bool_arr = arr.as_any().downcast_ref::().unwrap(); - assert_eq!(bool_arr.value(0), true); - assert_eq!(bool_arr.value(1), false); + assert!(bool_arr.value(0)); + assert!(!bool_arr.value(1)); } #[test] fn test_resolve_not_in_list() { - let stats = - MockStats::new().with_contained(BooleanArray::from(vec![Some(true), Some(false)])); + let stats = MockStats::new() + .with_contained(BooleanArray::from(vec![Some(true), Some(false)])); let expr = Expr::InList(datafusion_expr::expr::InList::new( Box::new(col_expr("a")), vec![Expr::Literal(ScalarValue::Int64(Some(1)), None)], @@ -481,8 +475,8 @@ mod tests { let arr = result.unwrap(); let bool_arr = arr.as_any().downcast_ref::().unwrap(); // Inverted: true→false, false→true - assert_eq!(bool_arr.value(0), false); - assert_eq!(bool_arr.value(1), true); + assert!(!bool_arr.value(0)); + assert!(bool_arr.value(1)); } #[test] @@ -506,7 +500,7 @@ mod tests { let stats = MockStats::new(); let min_expr = datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr("a")]); - let resolved = resolve_all_sync(&stats, &[min_expr.clone()]); + let resolved = resolve_all_sync(&stats, std::slice::from_ref(&min_expr)); // Existing entry let arr = resolved.get_or_null(&min_expr, &DataType::Int64); From ed5d432f336f6654a5a6174ba05058800453a53b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:21:20 -0500 Subject: [PATCH 3/5] add change --- datafusion/common/src/stats.rs | 46 +++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index a212122401f98..02f1f79ef8135 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -27,8 +27,8 @@ use arrow::datatypes::{DataType, Schema}; /// Represents a value with a degree of certainty. `Precision` is used to /// propagate information the precision of statistical values. -#[derive(Clone, PartialEq, Eq, Default, Copy)] -pub enum Precision { +#[derive(Clone, Default, Copy)] +pub enum Precision { /// The exact value is known. Used for guaranteeing correctness. /// /// Comes from definitive sources such as: @@ -60,7 +60,7 @@ pub enum Precision { Absent, } -impl Precision { +impl Precision { /// If we have some value (exact or inexact), it returns that value. /// Otherwise, it returns `None`. pub fn get_value(&self) -> Option<&T> { @@ -75,7 +75,7 @@ impl Precision { pub fn map(self, f: F) -> Precision where F: Fn(T) -> U, - U: Debug + Clone + PartialEq + Eq + PartialOrd, + U: Debug + Clone, { match self { Precision::Exact(val) => Precision::Exact(f(val)), @@ -94,6 +94,16 @@ impl Precision { } } + /// Demotes the precision state from exact to inexact (if present). + pub fn to_inexact(self) -> Self { + match self { + Precision::Exact(value) => Precision::Inexact(value), + _ => self, + } + } +} + +impl Precision { /// Returns the maximum of two (possibly inexact) values, conservatively /// propagating exactness information. If one of the input values is /// [`Precision::Absent`], the result is `Absent` too. @@ -127,14 +137,6 @@ impl Precision { (_, _) => Precision::Absent, } } - - /// Demotes the precision state from exact to inexact (if present). - pub fn to_inexact(self) -> Self { - match self { - Precision::Exact(value) => Precision::Inexact(value), - _ => self, - } - } } impl Precision { @@ -318,7 +320,23 @@ impl Precision { } } -impl Debug for Precision { +impl PartialEq for Precision +where + T: PartialEq, +{ + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Precision::Exact(a), Precision::Exact(b)) => a == b, + (Precision::Inexact(a), Precision::Inexact(b)) => a == b, + (Precision::Absent, Precision::Absent) => true, + _ => false, + } + } +} + +impl Eq for Precision {} + +impl Debug for Precision { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Precision::Exact(inner) => write!(f, "Exact({inner:?})"), @@ -328,7 +346,7 @@ impl Debug for Precision { } } -impl Display for Precision { +impl Display for Precision { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Precision::Exact(inner) => write!(f, "Exact({inner:?})"), From d64de6d90915a1c143333fde71ff0ccc40134ef9 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 4 Apr 2026 10:50:19 -0500 Subject: [PATCH 4/5] fix(pruning): use count(*) for row count and add statistics expression helpers The row count expression was incorrectly encoded as `count(*) FILTER (WHERE column IS NOT NULL)`, which external StatisticsSource implementations would interpret as the non-null count rather than the total row count. This could cause the all-null guard (`null_count != row_count`) to fail, preventing valid containers from being pruned. Changes: - Row count is now represented as `count(*)` (no filter, no column), which unambiguously means total rows in a container. - Fix resolve_aggregate_function to match by function name instead of downcasting via as_any() (which did not compile). - Add public stat_* helper functions for constructing canonical statistics expressions: min, max, null_count, row_count, distinct_count, and filtered variants of each. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/pruning/src/lib.rs | 7 +- datafusion/pruning/src/pruning_predicate.rs | 41 +-- datafusion/pruning/src/statistics.rs | 311 +++++++++++++++++--- 3 files changed, 290 insertions(+), 69 deletions(-) diff --git a/datafusion/pruning/src/lib.rs b/datafusion/pruning/src/lib.rs index 4fa95cc211a97..b7f82e72d30f1 100644 --- a/datafusion/pruning/src/lib.rs +++ b/datafusion/pruning/src/lib.rs @@ -26,4 +26,9 @@ pub use pruning_predicate::{ PredicateRewriter, PruningPredicate, PruningStatistics, RequiredColumns, UnhandledPredicateHook, build_pruning_predicate, }; -pub use statistics::{ResolvedStatistics, StatisticsSource}; +pub use statistics::{ + ResolvedStatistics, StatisticsSource, stat_column_distinct_count, + stat_column_distinct_count_with_filter, stat_column_max, stat_column_max_with_filter, + stat_column_min, stat_column_min_with_filter, stat_column_null_count, + stat_column_null_count_with_filter, stat_row_count, stat_row_count_with_filter, +}; diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index e1dcff24197b2..3465084a72aaf 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -41,7 +41,6 @@ use datafusion_common::{ ScalarValue, internal_datafusion_err, plan_datafusion_err, plan_err, tree_node::{Transformed, TreeNode}, }; -use datafusion_expr::ExprFunctionExt; use datafusion_expr_common::operator::Operator; use datafusion_physical_expr::expressions::CastColumnExpr; use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee}; @@ -1996,31 +1995,14 @@ fn stat_type_to_expr( column: &phys_expr::Column, stat_type: StatisticsType, ) -> datafusion_expr::Expr { - use datafusion_expr::Expr as LExpr; - let col_expr = LExpr::Column(Column::new_unqualified(column.name())); + use crate::statistics::{ + stat_column_max, stat_column_min, stat_column_null_count, stat_row_count, + }; match stat_type { - StatisticsType::Min => { - datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr]) - } - StatisticsType::Max => { - datafusion_functions_aggregate::min_max::max_udaf().call(vec![col_expr]) - } - StatisticsType::NullCount => { - let count_expr = datafusion_functions_aggregate::count::count_udaf() - .call(vec![LExpr::Literal(ScalarValue::Boolean(Some(true)), None)]); - count_expr - .filter(LExpr::IsNull(Box::new(col_expr))) - .build() - .expect("building count filter expr") - } - StatisticsType::RowCount => { - let count_expr = datafusion_functions_aggregate::count::count_udaf() - .call(vec![LExpr::Literal(ScalarValue::Boolean(Some(true)), None)]); - count_expr - .filter(LExpr::IsNotNull(Box::new(col_expr))) - .build() - .expect("building count filter expr") - } + StatisticsType::Min => stat_column_min(column.name()), + StatisticsType::Max => stat_column_max(column.name()), + StatisticsType::NullCount => stat_column_null_count(column.name()), + StatisticsType::RowCount => stat_row_count(), } } @@ -2438,11 +2420,12 @@ mod tests { .unwrap_or(None) } - fn row_counts(&self, column: &Column) -> Option { + fn row_counts(&self, _column: &Column) -> Option { + // Row count is container-level, not column-specific. + // Return row counts from any column that has them. self.stats - .get(column) - .map(|container_stats| container_stats.row_counts()) - .unwrap_or(None) + .values() + .find_map(|container_stats| container_stats.row_counts()) } fn contained( diff --git a/datafusion/pruning/src/statistics.rs b/datafusion/pruning/src/statistics.rs index f965c23cef60b..09cc0e860df44 100644 --- a/datafusion/pruning/src/statistics.rs +++ b/datafusion/pruning/src/statistics.rs @@ -17,11 +17,208 @@ use arrow::array::{ArrayRef, new_null_array}; use datafusion_common::pruning::PruningStatistics; -use datafusion_expr::Expr; +use datafusion_common::{Column, ScalarValue}; +use datafusion_expr::{Expr, ExprFunctionExt}; use std::collections::{HashMap, HashSet}; use datafusion_common::error::DataFusionError; +// ── Well-known statistics expression helpers ──────────────────────────── +// +// These helpers produce the canonical [`Expr`] forms that +// [`StatisticsSource`] implementations should recognise when they +// receive expressions from [`PruningPredicate::all_required_expressions`]. +// +// [`PruningPredicate::all_required_expressions`]: crate::PruningPredicate::all_required_expressions + +/// Create a `min(column)` statistics expression. +/// +/// Returns an [`Expr`] requesting the minimum value of `column` in each +/// container (row group, file, partition, etc.). +/// +/// ```text +/// min(column) +/// ``` +/// +/// See also [`stat_column_min_with_filter`] for a filtered variant. +pub fn stat_column_min(column: &str) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + datafusion_functions_aggregate::min_max::min_udaf().call(vec![col_expr]) +} + +/// Create a `min(column) FILTER (WHERE …)` statistics expression. +/// +/// Returns an [`Expr`] requesting the minimum value of `column` in each +/// container, considering only the rows that satisfy `filter`. +/// +/// ```text +/// min(column) FILTER (WHERE filter) +/// ``` +/// +/// This is useful when a data source tracks per-partition or per-segment +/// statistics and the caller wants to scope the min to a subset of rows. +pub fn stat_column_min_with_filter(column: &str, filter: Expr) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + datafusion_functions_aggregate::min_max::min_udaf() + .call(vec![col_expr]) + .filter(filter) + .build() + .expect("building filtered min stat expr") +} + +/// Create a `max(column)` statistics expression. +/// +/// Returns an [`Expr`] requesting the maximum value of `column` in each +/// container (row group, file, partition, etc.). +/// +/// ```text +/// max(column) +/// ``` +/// +/// See also [`stat_column_max_with_filter`] for a filtered variant. +pub fn stat_column_max(column: &str) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + datafusion_functions_aggregate::min_max::max_udaf().call(vec![col_expr]) +} + +/// Create a `max(column) FILTER (WHERE …)` statistics expression. +/// +/// Returns an [`Expr`] requesting the maximum value of `column` in each +/// container, considering only the rows that satisfy `filter`. +/// +/// ```text +/// max(column) FILTER (WHERE filter) +/// ``` +/// +/// This is useful when a data source tracks per-partition or per-segment +/// statistics and the caller wants to scope the max to a subset of rows. +pub fn stat_column_max_with_filter(column: &str, filter: Expr) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + datafusion_functions_aggregate::min_max::max_udaf() + .call(vec![col_expr]) + .filter(filter) + .build() + .expect("building filtered max stat expr") +} + +/// Create a `count(*) FILTER (WHERE column IS NULL)` statistics expression. +/// +/// Returns an [`Expr`] requesting the number of NULL values in `column` +/// in each container. +/// +/// ```text +/// count(*) FILTER (WHERE column IS NULL) +/// ``` +/// +/// See also [`stat_column_null_count_with_filter`] for combining the +/// null check with an additional predicate. +pub fn stat_column_null_count(column: &str) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + count_star() + .filter(Expr::IsNull(Box::new(col_expr))) + .build() + .expect("building null count stat expr") +} + +/// Create a `count(*) FILTER (WHERE column IS NULL AND …)` statistics +/// expression. +/// +/// Returns an [`Expr`] requesting the number of NULL values in `column` +/// in each container, restricted to rows that also satisfy `filter`. +/// The resulting filter is the conjunction of `column IS NULL` and the +/// provided predicate. +/// +/// ```text +/// count(*) FILTER (WHERE column IS NULL AND filter) +/// ``` +pub fn stat_column_null_count_with_filter(column: &str, filter: Expr) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + let combined = Expr::IsNull(Box::new(col_expr)).and(filter); + count_star() + .filter(combined) + .build() + .expect("building filtered null count stat expr") +} + +/// Create a `count(*)` statistics expression for total row count. +/// +/// Returns an [`Expr`] requesting the total number of rows in each +/// container, regardless of null values. Unlike the column-specific +/// helpers, this expression is not tied to any particular column. +/// +/// ```text +/// count(*) +/// ``` +/// +/// See also [`stat_row_count_with_filter`] for a filtered variant. +pub fn stat_row_count() -> Expr { + count_star() +} + +/// Create a `count(*) FILTER (WHERE …)` statistics expression. +/// +/// Returns an [`Expr`] requesting the number of rows in each container +/// that satisfy `filter`. This can represent, for example, the number +/// of rows in a particular partition or the number of rows matching a +/// deletion vector. +/// +/// ```text +/// count(*) FILTER (WHERE filter) +/// ``` +pub fn stat_row_count_with_filter(filter: Expr) -> Expr { + count_star() + .filter(filter) + .build() + .expect("building filtered row count stat expr") +} + +/// Create a `count(DISTINCT column)` statistics expression. +/// +/// Returns an [`Expr`] requesting the number of distinct (unique) +/// non-null values of `column` in each container. This is useful for +/// cardinality estimation and join-order planning. +/// +/// ```text +/// count(DISTINCT column) +/// ``` +/// +/// See also [`stat_column_distinct_count_with_filter`] for a filtered +/// variant. +pub fn stat_column_distinct_count(column: &str) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + datafusion_functions_aggregate::count::count_udaf() + .call(vec![col_expr]) + .distinct() + .build() + .expect("building distinct count stat expr") +} + +/// Create a `count(DISTINCT column) FILTER (WHERE …)` statistics +/// expression. +/// +/// Returns an [`Expr`] requesting the number of distinct non-null values +/// of `column` in each container, restricted to rows that satisfy +/// `filter`. +/// +/// ```text +/// count(DISTINCT column) FILTER (WHERE filter) +/// ``` +pub fn stat_column_distinct_count_with_filter(column: &str, filter: Expr) -> Expr { + let col_expr = Expr::Column(Column::new_unqualified(column)); + datafusion_functions_aggregate::count::count_udaf() + .call(vec![col_expr]) + .distinct() + .filter(filter) + .build() + .expect("building filtered distinct count stat expr") +} + +/// Builds a bare `count(*)` aggregate expression (no filter, no distinct). +fn count_star() -> Expr { + datafusion_functions_aggregate::count::count_udaf() + .call(vec![Expr::Literal(ScalarValue::Boolean(Some(true)), None)]) +} + /// A source of runtime statistical information for pruning. /// /// This trait accepts a set of [`Expr`] expressions and returns @@ -39,10 +236,15 @@ use datafusion_common::error::DataFusionError; /// The following expression types are meaningful for pruning: /// /// - **Aggregate functions**: `min(column)`, `max(column)`, -/// `count(*) FILTER (WHERE column IS NULL)`, -/// `count(*) FILTER (WHERE column IS NOT NULL)` +/// `count(*) FILTER (WHERE column IS NULL)`, `count(*)`, +/// `count(DISTINCT column)`, and filtered variants of all of these. /// - **InList**: `column IN (v1, v2, ...)` — see [InList semantics] below. /// +/// Use the `stat_*` helper functions ([`stat_column_min`], +/// [`stat_column_max`], [`stat_column_null_count`], [`stat_row_count`], +/// [`stat_column_distinct_count`], and their `_with_filter` variants) to +/// construct these expressions in a canonical form. +/// /// Implementors return `None` for any expression they cannot answer. /// /// # InList semantics @@ -100,7 +302,7 @@ pub trait StatisticsSource: Send + Sync { /// by the underlying [`PruningStatistics`]: /// - `min(column)` → [`PruningStatistics::min_values`] /// - `max(column)` → [`PruningStatistics::max_values`] -/// - `count(*) FILTER (WHERE column IS NOT NULL)` → [`PruningStatistics::row_counts`] +/// - `count(*)` → [`PruningStatistics::row_counts`] (total row count) /// - `count(*) FILTER (WHERE column IS NULL)` → [`PruningStatistics::null_counts`] /// - `column IN (lit1, lit2, ...)` → [`PruningStatistics::contained`] /// @@ -227,45 +429,43 @@ fn resolve_aggregate_function( stats: &(impl PruningStatistics + ?Sized), func: &datafusion_expr::expr::AggregateFunction, ) -> Option { - use datafusion_functions_aggregate::count::Count; - use datafusion_functions_aggregate::min_max::{Max, Min}; + let name = func.func.name(); - let udaf = func.func.inner(); - - if udaf.as_any().downcast_ref::().is_some() { - // min(column) — reject if there's a filter - if func.params.filter.is_some() { - return None; - } - if let Some(Expr::Column(col)) = func.params.args.first() { - return stats.min_values(col); - } - } else if udaf.as_any().downcast_ref::().is_some() { - // max(column) — reject if there's a filter - if func.params.filter.is_some() { - return None; - } - if let Some(Expr::Column(col)) = func.params.args.first() { - return stats.max_values(col); + match name { + "min" | "MIN" => { + if func.params.filter.is_some() { + return None; + } + if let Some(Expr::Column(col)) = func.params.args.first() { + return stats.min_values(col); + } } - } else if udaf.as_any().downcast_ref::().is_some() - && let Some(filter) = &func.params.filter - { - match filter.as_ref() { - // count(*) FILTER (WHERE col IS NOT NULL) → row_counts - Expr::IsNotNull(inner) => { - if let Expr::Column(col) = inner.as_ref() { - return stats.row_counts(col); - } + "max" | "MAX" => { + if func.params.filter.is_some() { + return None; } - // count(*) FILTER (WHERE col IS NULL) → null_counts - Expr::IsNull(inner) => { - if let Expr::Column(col) = inner.as_ref() { + if let Some(Expr::Column(col)) = func.params.args.first() { + return stats.max_values(col); + } + } + "count" | "COUNT" => { + if let Some(filter) = &func.params.filter { + // count(*) FILTER (WHERE col IS NULL) → null_counts + if let Expr::IsNull(inner) = filter.as_ref() + && let Expr::Column(col) = inner.as_ref() + { return stats.null_counts(col); } + } else { + // count(*) without filter → total row count. + // PruningStatistics::row_counts takes a column parameter + // but the value is container-level (not column-specific), + // so all implementations return the same result regardless + // of column. + return stats.row_counts(&Column::new_unqualified("")); } - _ => {} } + _ => {} } None @@ -314,7 +514,6 @@ mod tests { use arrow::array::{BooleanArray, Int64Array, UInt64Array}; use arrow::datatypes::DataType; use datafusion_common::pruning::PruningStatistics; - use datafusion_common::{Column, ScalarValue}; use datafusion_expr::ExprFunctionExt; use std::sync::Arc; @@ -409,7 +608,22 @@ mod tests { } #[test] - fn test_resolve_count_not_null() { + fn test_resolve_row_count() { + let stats = MockStats::new(); + // count(*) without filter → row_counts (total row count) + let expr = stat_row_count(); + let result = resolve_expression_sync(&stats, &expr); + assert!(result.is_some()); + let arr = result.unwrap(); + let uint_arr = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(uint_arr.value(0), 100); + assert_eq!(uint_arr.value(1), 100); + } + + #[test] + fn test_resolve_count_not_null_returns_none() { + // count(*) FILTER (WHERE col IS NOT NULL) is no longer a recognized + // statistics expression — it should return None. let stats = MockStats::new(); let expr = datafusion_functions_aggregate::count::count_udaf() .call(vec![Expr::Literal(ScalarValue::Boolean(Some(true)), None)]) @@ -417,7 +631,7 @@ mod tests { .build() .unwrap(); let result = resolve_expression_sync(&stats, &expr); - assert!(result.is_some()); + assert!(result.is_none()); } #[test] @@ -535,4 +749,23 @@ mod tests { let expr = col_expr("any"); assert!(resolved.get(&expr).is_none()); } + + #[test] + fn test_stat_helpers_match_resolved_expressions() { + // Verify that the helper functions produce expressions that are + // correctly resolved by the blanket PruningStatistics → StatisticsSource impl. + let stats = MockStats::new(); + + let min = stat_column_min("a"); + assert!(resolve_expression_sync(&stats, &min).is_some()); + + let max = stat_column_max("a"); + assert!(resolve_expression_sync(&stats, &max).is_some()); + + let nulls = stat_column_null_count("a"); + assert!(resolve_expression_sync(&stats, &nulls).is_some()); + + let rows = stat_row_count(); + assert!(resolve_expression_sync(&stats, &rows).is_some()); + } } From dc7d68f371f8fad5ab1b4739319c9f3a85f27bcf Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 4 Apr 2026 12:34:40 -0500 Subject: [PATCH 5/5] fix(pruning): remove column param from PruningStatistics::row_counts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Row counts are container-level (same for all columns), but the trait method forced a `column: &Column` parameter. Every implementation either ignored it or used it unnecessarily (e.g. the Parquet impl constructed a StatisticsConverter just to call row_group_row_counts, which doesn't need the column). This caused problems for the new StatisticsSource API where the row count expression is `count(*)` — the blanket impl had no column to pass. Breaking change: `PruningStatistics::row_counts(&self, column: &Column)` is now `PruningStatistics::row_counts(&self)`. Also fixes RowGroupPruningStatistics to read row counts directly from row group metadata instead of routing through StatisticsConverter, and removes PrunableStatistics' column-exists validation for row counts. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../examples/data_io/parquet_index.rs | 2 +- .../examples/query_planning/pruning.rs | 2 +- datafusion/common/src/pruning.rs | 71 +++++++++---------- .../datasource-parquet/src/page_filter.rs | 2 +- .../src/row_group_filter.rs | 18 ++--- datafusion/pruning/src/pruning_predicate.rs | 8 +-- datafusion/pruning/src/statistics.rs | 10 +-- 7 files changed, 52 insertions(+), 61 deletions(-) diff --git a/datafusion-examples/examples/data_io/parquet_index.rs b/datafusion-examples/examples/data_io/parquet_index.rs index e11a303f442a4..515dad7a51e17 100644 --- a/datafusion-examples/examples/data_io/parquet_index.rs +++ b/datafusion-examples/examples/data_io/parquet_index.rs @@ -462,7 +462,7 @@ impl PruningStatistics for ParquetMetadataIndex { } /// return the row counts for each file - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { Some(self.row_counts_ref().clone()) } diff --git a/datafusion-examples/examples/query_planning/pruning.rs b/datafusion-examples/examples/query_planning/pruning.rs index 33f3f8428a77f..7fdc4a7952d68 100644 --- a/datafusion-examples/examples/query_planning/pruning.rs +++ b/datafusion-examples/examples/query_planning/pruning.rs @@ -174,7 +174,7 @@ impl PruningStatistics for MyCatalog { None } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { // In this example, we know nothing about the number of rows in each file None } diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 27148de59a544..ebae23f0723a1 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -95,15 +95,17 @@ pub trait PruningStatistics { /// [`UInt64Array`]: arrow::array::UInt64Array fn null_counts(&self, column: &Column) -> Option; - /// Return the number of rows for the named column in each container - /// as an [`UInt64Array`]. + /// Return the number of rows in each container as an [`UInt64Array`]. + /// + /// Row counts are container-level (not column-specific) — the value + /// is the same regardless of which column is being considered. /// /// See [`Self::min_values`] for when to return `None` and null values. /// /// Note: the returned array must contain [`Self::num_containers`] rows /// /// [`UInt64Array`]: arrow::array::UInt64Array - fn row_counts(&self, column: &Column) -> Option; + fn row_counts(&self) -> Option; /// Returns [`BooleanArray`] where each row represents information known /// about specific literal `values` in a column. @@ -265,7 +267,7 @@ impl PruningStatistics for PartitionPruningStatistics { None } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { None } @@ -398,11 +400,7 @@ impl PruningStatistics for PrunableStatistics { } } - fn row_counts(&self, column: &Column) -> Option { - // If the column does not exist in the schema, return None - if self.schema.index_of(column.name()).is_err() { - return None; - } + fn row_counts(&self) -> Option { if self .statistics .iter() @@ -502,9 +500,9 @@ impl PruningStatistics for CompositePruningStatistics { None } - fn row_counts(&self, column: &Column) -> Option { + fn row_counts(&self) -> Option { for stats in &self.statistics { - if let Some(array) = stats.row_counts(column) { + if let Some(array) = stats.row_counts() { return Some(array); } } @@ -566,9 +564,9 @@ mod tests { // Partition values don't know anything about nulls or row counts assert!(partition_stats.null_counts(&column_a).is_none()); - assert!(partition_stats.row_counts(&column_a).is_none()); + assert!(partition_stats.row_counts().is_none()); assert!(partition_stats.null_counts(&column_b).is_none()); - assert!(partition_stats.row_counts(&column_b).is_none()); + assert!(partition_stats.row_counts().is_none()); // Min/max values are the same as the partition values let min_values_a = @@ -709,9 +707,9 @@ mod tests { // Partition values don't know anything about nulls or row counts assert!(partition_stats.null_counts(&column_a).is_none()); - assert!(partition_stats.row_counts(&column_a).is_none()); + assert!(partition_stats.row_counts().is_none()); assert!(partition_stats.null_counts(&column_b).is_none()); - assert!(partition_stats.row_counts(&column_b).is_none()); + assert!(partition_stats.row_counts().is_none()); // Min/max values are all missing assert!(partition_stats.min_values(&column_a).is_none()); @@ -814,13 +812,13 @@ mod tests { assert_eq!(null_counts_b, expected_null_counts_b); // Row counts are the same as the statistics - let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap()) + let row_counts_a = as_uint64_array(&pruning_stats.row_counts().unwrap()) .unwrap() .into_iter() .collect::>(); let expected_row_counts_a = vec![Some(100), Some(200)]; assert_eq!(row_counts_a, expected_row_counts_a); - let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap()) + let row_counts_b = as_uint64_array(&pruning_stats.row_counts().unwrap()) .unwrap() .into_iter() .collect::>(); @@ -845,7 +843,7 @@ mod tests { // This is debatable, personally I think `row_count` should not take a `Column` as an argument // at all since all columns should have the same number of rows. // But for now we just document the current behavior in this test. - let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap()) + let row_counts_c = as_uint64_array(&pruning_stats.row_counts().unwrap()) .unwrap() .into_iter() .collect::>(); @@ -853,12 +851,13 @@ mod tests { assert_eq!(row_counts_c, expected_row_counts_c); assert!(pruning_stats.contained(&column_c, &values).is_none()); - // Test with a column that doesn't exist + // Test with a column that doesn't exist — column-specific stats + // return None, but row_counts is container-level and still available let column_d = Column::new_unqualified("d"); assert!(pruning_stats.min_values(&column_d).is_none()); assert!(pruning_stats.max_values(&column_d).is_none()); assert!(pruning_stats.null_counts(&column_d).is_none()); - assert!(pruning_stats.row_counts(&column_d).is_none()); + assert!(pruning_stats.row_counts().is_some()); assert!(pruning_stats.contained(&column_d, &values).is_none()); } @@ -886,8 +885,8 @@ mod tests { assert!(pruning_stats.null_counts(&column_b).is_none()); // Row counts are all missing - assert!(pruning_stats.row_counts(&column_a).is_none()); - assert!(pruning_stats.row_counts(&column_b).is_none()); + assert!(pruning_stats.row_counts().is_none()); + assert!(pruning_stats.row_counts().is_none()); // Contained values are all empty let values = HashSet::from([ScalarValue::from(1i32)]); @@ -1027,13 +1026,11 @@ mod tests { let expected_null_counts_col_x = vec![Some(0), Some(10)]; assert_eq!(null_counts_col_x, expected_null_counts_col_x); - // Test row counts - only available from file statistics - assert!(composite_stats.row_counts(&part_a).is_none()); - let row_counts_col_x = - as_uint64_array(&composite_stats.row_counts(&col_x).unwrap()) - .unwrap() - .into_iter() - .collect::>(); + // Test row counts — container-level, available from file statistics + let row_counts_col_x = as_uint64_array(&composite_stats.row_counts().unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_row_counts = vec![Some(100), Some(200)]; assert_eq!(row_counts_col_x, expected_row_counts); @@ -1046,12 +1043,13 @@ mod tests { // File statistics don't implement contained assert!(composite_stats.contained(&col_x, &values).is_none()); - // Non-existent column should return None for everything + // Non-existent column should return None for column-specific stats, + // but row_counts is container-level and still available let non_existent = Column::new_unqualified("non_existent"); assert!(composite_stats.min_values(&non_existent).is_none()); assert!(composite_stats.max_values(&non_existent).is_none()); assert!(composite_stats.null_counts(&non_existent).is_none()); - assert!(composite_stats.row_counts(&non_existent).is_none()); + assert!(composite_stats.row_counts().is_some()); assert!(composite_stats.contained(&non_existent, &values).is_none()); // Verify num_containers matches @@ -1155,7 +1153,7 @@ mod tests { let expected_null_counts = vec![Some(0), Some(5)]; assert_eq!(null_counts, expected_null_counts); - let row_counts = as_uint64_array(&composite_stats.row_counts(&col_a).unwrap()) + let row_counts = as_uint64_array(&composite_stats.row_counts().unwrap()) .unwrap() .into_iter() .collect::>(); @@ -1195,11 +1193,10 @@ mod tests { let expected_null_counts = vec![Some(10), Some(20)]; assert_eq!(null_counts, expected_null_counts); - let row_counts = - as_uint64_array(&composite_stats_reversed.row_counts(&col_a).unwrap()) - .unwrap() - .into_iter() - .collect::>(); + let row_counts = as_uint64_array(&composite_stats_reversed.row_counts().unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_row_counts = vec![Some(1000), Some(2000)]; assert_eq!(row_counts, expected_row_counts); } diff --git a/datafusion/datasource-parquet/src/page_filter.rs b/datafusion/datasource-parquet/src/page_filter.rs index 194e6e94fba3a..baef36ce147d4 100644 --- a/datafusion/datasource-parquet/src/page_filter.rs +++ b/datafusion/datasource-parquet/src/page_filter.rs @@ -509,7 +509,7 @@ impl PruningStatistics for PagesPruningStatistics<'_> { } } - fn row_counts(&self, _column: &datafusion_common::Column) -> Option { + fn row_counts(&self) -> Option { match self.converter.data_page_row_counts( self.offset_index, self.row_group_metadatas, diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 7a2ed8f2777e3..3f254c9f55282 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use super::{ParquetAccessPlan, ParquetFileMetrics}; -use arrow::array::{ArrayRef, BooleanArray}; +use arrow::array::{ArrayRef, BooleanArray, UInt64Array}; use arrow::datatypes::Schema; use datafusion_common::pruning::PruningStatistics; use datafusion_common::{Column, Result, ScalarValue}; @@ -536,7 +536,7 @@ impl PruningStatistics for BloomFilterStatistics { None } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { None } @@ -626,13 +626,13 @@ impl PruningStatistics for RowGroupPruningStatistics<'_> { .map(|counts| Arc::new(counts) as ArrayRef) } - fn row_counts(&self, column: &Column) -> Option { - // row counts are the same for all columns in a row group - self.statistics_converter(column) - .and_then(|c| Ok(c.row_group_row_counts(self.metadata_iter())?)) - .ok() - .flatten() - .map(|counts| Arc::new(counts) as ArrayRef) + fn row_counts(&self) -> Option { + // Row counts are container-level — read directly from row group metadata. + let counts: UInt64Array = self + .metadata_iter() + .map(|rg| Some(rg.num_rows() as u64)) + .collect(); + Some(Arc::new(counts) as ArrayRef) } fn contained( diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 3465084a72aaf..d541d6d8b7419 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -976,7 +976,7 @@ fn build_statistics_record_batch( StatisticsType::Min => statistics.min_values(&column), StatisticsType::Max => statistics.max_values(&column), StatisticsType::NullCount => statistics.null_counts(&column), - StatisticsType::RowCount => statistics.row_counts(&column), + StatisticsType::RowCount => statistics.row_counts(), }; let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers)); @@ -2420,9 +2420,7 @@ mod tests { .unwrap_or(None) } - fn row_counts(&self, _column: &Column) -> Option { - // Row count is container-level, not column-specific. - // Return row counts from any column that has them. + fn row_counts(&self) -> Option { self.stats .values() .find_map(|container_stats| container_stats.row_counts()) @@ -2463,7 +2461,7 @@ mod tests { None } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { None } diff --git a/datafusion/pruning/src/statistics.rs b/datafusion/pruning/src/statistics.rs index 09cc0e860df44..315eae872284b 100644 --- a/datafusion/pruning/src/statistics.rs +++ b/datafusion/pruning/src/statistics.rs @@ -457,12 +457,8 @@ fn resolve_aggregate_function( return stats.null_counts(col); } } else { - // count(*) without filter → total row count. - // PruningStatistics::row_counts takes a column parameter - // but the value is container-level (not column-specific), - // so all implementations return the same result regardless - // of column. - return stats.row_counts(&Column::new_unqualified("")); + // count(*) without filter → total row count + return stats.row_counts(); } } _ => {} @@ -557,7 +553,7 @@ mod tests { fn null_counts(&self, _column: &Column) -> Option { Some(Arc::clone(&self.null_counts)) } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { Some(Arc::clone(&self.row_counts)) } fn contained(