From 50fe005f9f4541891f9a9231e68c4aa6c60f2f2a Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Fri, 20 Mar 2026 11:41:45 +0100 Subject: [PATCH 1/4] Cap NDV at row count in joins, filters, and with_fetch --- datafusion/common/src/stats.rs | 47 +++++++++++++++- .../tests/custom_sources_cases/statistics.rs | 21 +++---- datafusion/physical-plan/src/filter.rs | 55 +++++++++++++++++-- datafusion/physical-plan/src/joins/utils.rs | 31 ++++++++++- 4 files changed, 136 insertions(+), 18 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index a212122401f98..a48bca486c391 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -594,6 +594,10 @@ impl Statistics { } Precision::Absent => Precision::Absent, }; + // NDV can never exceed the number of rows + if let Some(&rows) = self.num_rows.get_value() { + cs.distinct_count = cs.distinct_count.min(&Precision::Inexact(rows)); + } cs }) .collect(); @@ -2169,7 +2173,8 @@ mod tests { result_col_stats.sum_value, Precision::Inexact(ScalarValue::Int32(Some(123456))) ); - assert_eq!(result_col_stats.distinct_count, Precision::Inexact(789)); + // NDV is capped at the new row count (250) since 789 > 250 + assert_eq!(result_col_stats.distinct_count, Precision::Inexact(250)); } #[test] @@ -2280,6 +2285,46 @@ mod tests { assert_eq!(result.total_byte_size, Precision::Inexact(800)); } + #[test] + fn test_with_fetch_caps_ndv_at_row_count() { + // NDV=500 but after LIMIT 10, NDV should be capped at 10 + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Exact(8000), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Inexact(500), + ..Default::default() + }], + }; + + let result = stats.with_fetch(Some(10), 0, 1).unwrap(); + assert_eq!(result.num_rows, Precision::Exact(10)); + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(10) + ); + } + + #[test] + fn test_with_fetch_ndv_below_row_count_unchanged() { + // NDV=5 and LIMIT 10: NDV should stay at 5 + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Exact(8000), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Inexact(5), + ..Default::default() + }], + }; + + let result = stats.with_fetch(Some(10), 0, 1).unwrap(); + assert_eq!(result.num_rows, Precision::Exact(10)); + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(5) + ); + } + #[test] fn test_try_merge_iter_basic() { let schema = Arc::new(Schema::new(vec![ diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index e38238f861739..22e0db663a3d4 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -281,17 +281,18 @@ async fn sql_limit() -> Result<()> { let df = ctx.sql("SELECT * FROM stats_table LIMIT 5").await.unwrap(); let physical_plan = df.create_physical_plan().await.unwrap(); // when the limit is smaller than the original number of lines we mark the statistics as inexact + // and cap NDV at the new row count + let limit_stats = physical_plan.partition_statistics(None)?; + assert_eq!(limit_stats.num_rows, Precision::Exact(5)); + // c1: NDV=2 stays at 2 (already below limit of 5) assert_eq!( - Statistics { - num_rows: Precision::Exact(5), - column_statistics: stats - .column_statistics - .iter() - .map(|c| c.clone().to_inexact()) - .collect(), - total_byte_size: Precision::Absent - }, - *physical_plan.partition_statistics(None)? + limit_stats.column_statistics[0].distinct_count, + Precision::Inexact(2) + ); + // c2: NDV=13 capped to 5 (the limit row count) + assert_eq!( + limit_stats.column_statistics[1].distinct_count, + Precision::Inexact(5) ); let df = ctx diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 8720d5f7d223b..c1ff48cb66ec3 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -341,6 +341,7 @@ impl FilterExec { schema, &input_stats.column_statistics, analysis_ctx.boundaries, + num_rows.get_value().copied(), ); Ok(Statistics { num_rows, @@ -781,6 +782,7 @@ fn collect_new_statistics( schema: &SchemaRef, input_column_stats: &[ColumnStatistics], analysis_boundaries: Vec, + filtered_num_rows: Option, ) -> Vec { analysis_boundaries .into_iter() @@ -816,11 +818,17 @@ fn collect_new_statistics( let min_value = interval_bound_to_precision(lower, is_single_value); let max_value = interval_bound_to_precision(upper, is_single_value); // When the interval collapses to a single value (equality - // predicate), the column has exactly 1 distinct value + // predicate), the column has exactly 1 distinct value. + // Otherwise, cap NDV at the filtered row count. let capped_distinct_count = if is_single_value { Precision::Exact(1) } else { - distinct_count.to_inexact() + match filtered_num_rows { + Some(rows) => { + distinct_count.to_inexact().min(&Precision::Inexact(rows)) + } + None => distinct_count.to_inexact(), + } }; ColumnStatistics { null_count: input_column_stats[idx].null_count.to_inexact(), @@ -2398,10 +2406,12 @@ mod tests { statistics.column_statistics[0].distinct_count, Precision::Exact(1) ); - // b > 10 narrows to [11, 50] but doesn't collapse + // b > 10 narrows to [11, 50] but doesn't collapse to a single value. + // The combined selectivity of a=42 (1/80) and c=7 (1/150) on 100 rows + // computes num_rows = 1, so NDV is capped at the row count: min(40, 1) = 1. assert_eq!( statistics.column_statistics[1].distinct_count, - Precision::Inexact(40) + Precision::Inexact(1) ); // c = 7 collapses to single value assert_eq!( @@ -2639,4 +2649,41 @@ mod tests { assert_eq!(out_schema.field(1).name(), "tokens"); Ok(()) } + + #[tokio::test] + async fn test_filter_statistics_ndv_capped_at_row_count() -> Result<()> { + // Table: a: min=1, max=100, distinct_count=80, 100 rows + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Inexact(80), + ..Default::default() + }], + }, + schema.clone(), + )); + + // a <= 10 => ~10 rows out of 100 + let predicate: Arc = + binary(col("a", &schema)?, Operator::LtEq, lit(10i32), &schema)?; + + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + + let statistics = filter.partition_statistics(None)?; + // Filter estimates ~10 rows (selectivity = 10/100) + assert_eq!(statistics.num_rows, Precision::Inexact(10)); + // NDV should be capped at the filtered row count (10), not the original 80 + let ndv = &statistics.column_statistics[0].distinct_count; + assert!( + ndv.get_value().copied() <= Some(10), + "Expected NDV <= 10 (filtered row count), got {ndv:?}" + ); + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 39a380eed9d41..73be8c12475e1 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -709,7 +709,13 @@ fn max_distinct_count( stats: &ColumnStatistics, ) -> Precision { match &stats.distinct_count { - &dc @ (Precision::Exact(_) | Precision::Inexact(_)) => dc, + &dc @ (Precision::Exact(_) | Precision::Inexact(_)) => { + // NDV can never exceed the number of rows + match num_rows { + Precision::Absent => dc, + _ => dc.min(num_rows).to_inexact(), + } + } _ => { // The number can never be greater than the number of rows we have // minus the nulls (since they don't count as distinct values). @@ -2392,6 +2398,22 @@ mod tests { (10, Inexact(1), Inexact(10), Absent, Absent), Some(Inexact(0)), ), + // NDV > num_rows: distinct count should be capped at row count + ( + (5, Inexact(1), Inexact(100), Inexact(50), Absent), + (10, Inexact(1), Inexact(100), Inexact(50), Absent), + // max_distinct_count caps: left NDV=min(50,5)=5, right NDV=min(50,10)=10 + // cardinality = (5 * 10) / max(5, 10) = 50 / 10 = 5 + Some(Inexact(5)), + ), + // NDV > num_rows on one side only + ( + (3, Inexact(1), Inexact(100), Inexact(100), Absent), + (10, Inexact(1), Inexact(100), Inexact(5), Absent), + // max_distinct_count caps: left NDV=min(100,3)=3, right NDV=min(5,10)=5 + // cardinality = (3 * 10) / max(3, 5) = 30 / 5 = 6 + Some(Inexact(6)), + ), ]; for (left_info, right_info, expected_cardinality) in cases { @@ -2531,11 +2553,14 @@ mod tests { // y: min=0, max=100, distinct=None // // Join on a=c, b=d (ignore x/y) + // Right column d has NDV=2500 but only 2000 rows, so NDV is capped + // to 2000. join_selectivity = max(500, 2000) = 2000. + // Inner cardinality = (1000 * 2000) / 2000 = 1000 let cases = vec![ - (JoinType::Inner, 800), + (JoinType::Inner, 1000), (JoinType::Left, 1000), (JoinType::Right, 2000), - (JoinType::Full, 2200), + (JoinType::Full, 2000), ]; let left_col_stats = vec![ From b7e8e16411d9ff09abd992d2882d76677150c2c6 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Wed, 1 Apr 2026 11:00:34 +0200 Subject: [PATCH 2/4] Add with_fetch NDV capping tests with skip --- datafusion/common/src/stats.rs | 44 ++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index a48bca486c391..254e437b69cc0 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -2305,6 +2305,50 @@ mod tests { ); } + #[test] + fn test_with_fetch_caps_ndv_with_skip() { + // 1000 rows, NDV=500, OFFSET 5 LIMIT 10 + // with_fetch computes num_rows = min(1000 - 5, 10) = 10 + // NDV should be capped at 10 + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Exact(8000), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Inexact(500), + ..Default::default() + }], + }; + + let result = stats.with_fetch(Some(10), 5, 1).unwrap(); + assert_eq!(result.num_rows, Precision::Exact(10)); + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(10) + ); + } + + #[test] + fn test_with_fetch_caps_ndv_with_large_skip() { + // 1000 rows, NDV=500, OFFSET 995 LIMIT 100 + // with_fetch computes num_rows = min(1000 - 995, 100) = 5 + // NDV should be capped at 5 + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Exact(8000), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Inexact(500), + ..Default::default() + }], + }; + + let result = stats.with_fetch(Some(100), 995, 1).unwrap(); + assert_eq!(result.num_rows, Precision::Exact(5)); + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(5) + ); + } + #[test] fn test_with_fetch_ndv_below_row_count_unchanged() { // NDV=5 and LIMIT 10: NDV should stay at 5 From 9ff12fbd1986a53209e08fc43a35146a4e8e9d15 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Fri, 10 Apr 2026 18:48:29 +0200 Subject: [PATCH 3/4] Preserve NDV precision in max_distinct_count when cap is not triggered --- datafusion/physical-plan/src/joins/utils.rs | 80 ++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 73be8c12475e1..b0ecf7b789f6b 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -713,7 +713,13 @@ fn max_distinct_count( // NDV can never exceed the number of rows match num_rows { Precision::Absent => dc, - _ => dc.min(num_rows).to_inexact(), + _ => { + if dc.get_value() <= num_rows.get_value() { + dc + } else { + num_rows.to_inexact() + } + } } } _ => { @@ -3195,4 +3201,76 @@ mod tests { assert_eq!(cmp_nl.compare(0, 0), Ordering::Greater); assert_eq!(cmp_nl.compare(1, 1), Ordering::Less); } + + #[test] + fn test_max_distinct_count_preserves_precision_when_not_capped() { + assert_eq!( + max_distinct_count( + &Exact(10), + &ColumnStatistics { + distinct_count: Exact(5), + ..Default::default() + } + ), + Exact(5) + ); + assert_eq!( + max_distinct_count( + &Exact(10), + &ColumnStatistics { + distinct_count: Inexact(5), + ..Default::default() + } + ), + Inexact(5) + ); + // Inexact num_rows does not affect an exact NDV that is within bounds + assert_eq!( + max_distinct_count( + &Inexact(10), + &ColumnStatistics { + distinct_count: Exact(5), + ..Default::default() + } + ), + Exact(5) + ); + } + + #[test] + fn test_max_distinct_count_demotes_to_inexact_when_capped() { + // Exact NDV > Exact num_rows is an illegal state (NDV <= num_rows is a + // mathematical invariant), but the code handles it defensively by + // capping and demoting to inexact + assert_eq!( + max_distinct_count( + &Exact(10), + &ColumnStatistics { + distinct_count: Exact(15), + ..Default::default() + } + ), + Inexact(10) + ); + assert_eq!( + max_distinct_count( + &Inexact(10), + &ColumnStatistics { + distinct_count: Exact(15), + ..Default::default() + } + ), + Inexact(10) + ); + assert_eq!( + max_distinct_count( + &Exact(10), + &ColumnStatistics { + distinct_count: Inexact(15), + ..Default::default() + } + ), + Inexact(10) + ); + } } From f3ea230a0297d1cae79fec6897ee87b414813791 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Fri, 10 Apr 2026 18:48:29 +0200 Subject: [PATCH 4/4] Pass Precision to collect_new_statistics instead of raw usize --- datafusion/physical-plan/src/filter.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index c1ff48cb66ec3..f8bb9a141bde3 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -341,7 +341,10 @@ impl FilterExec { schema, &input_stats.column_statistics, analysis_ctx.boundaries, - num_rows.get_value().copied(), + match &num_rows { + Precision::Absent => None, + p => Some(*p), + }, ); Ok(Statistics { num_rows, @@ -782,7 +785,7 @@ fn collect_new_statistics( schema: &SchemaRef, input_column_stats: &[ColumnStatistics], analysis_boundaries: Vec, - filtered_num_rows: Option, + filtered_num_rows: Option>, ) -> Vec { analysis_boundaries .into_iter() @@ -824,9 +827,7 @@ fn collect_new_statistics( Precision::Exact(1) } else { match filtered_num_rows { - Some(rows) => { - distinct_count.to_inexact().min(&Precision::Inexact(rows)) - } + Some(rows) => distinct_count.to_inexact().min(&rows), None => distinct_count.to_inexact(), } };