From 9c50e6d6436a94041035c018e2cfa6cb50fe5a50 Mon Sep 17 00:00:00 2001 From: buraksenn Date: Thu, 9 Apr 2026 20:36:48 +0300 Subject: [PATCH 1/2] feat: extend interval analysis support for temporal types (Date32, Date64, Timestamp) --- datafusion/common/src/scalar/mod.rs | 55 ++++++++++++++++++- .../expr-common/src/interval_arithmetic.rs | 27 ++++++++- .../physical-expr/src/intervals/utils.rs | 3 + datafusion/physical-plan/src/filter.rs | 46 ++++++++++++++++ 4 files changed, 128 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 4511d8db90075..4a8b5a8856399 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2526,6 +2526,23 @@ impl ScalarValue { (Self::Float64(Some(l)), Self::Float64(Some(r))) => { Some((l - r).abs().round() as _) } + (Self::Date32(Some(l)), Self::Date32(Some(r))) => Some(l.abs_diff(*r) as _), + (Self::Date64(Some(l)), Self::Date64(Some(r))) => Some(l.abs_diff(*r) as _), + (Self::TimestampSecond(Some(l), _), Self::TimestampSecond(Some(r), _)) => { + Some(l.abs_diff(*r) as _) + } + ( + Self::TimestampMillisecond(Some(l), _), + Self::TimestampMillisecond(Some(r), _), + ) => Some(l.abs_diff(*r) as _), + ( + Self::TimestampMicrosecond(Some(l), _), + Self::TimestampMicrosecond(Some(r), _), + ) => Some(l.abs_diff(*r) as _), + ( + Self::TimestampNanosecond(Some(l), _), + Self::TimestampNanosecond(Some(r), _), + ) => Some(l.abs_diff(*r) as _), ( Self::Decimal128(Some(l), lprecision, lscale), Self::Decimal128(Some(r), rprecision, rscale), @@ -8766,6 +8783,42 @@ mod tests { ScalarValue::Decimal256(Some(10.into()), 1, 0), 5, ), + // Temporal types + ( + ScalarValue::Date32(Some(0)), + ScalarValue::Date32(Some(10)), + 10, + ), + ( + ScalarValue::Date32(Some(10)), + ScalarValue::Date32(Some(0)), + 10, + ), + ( + ScalarValue::Date64(Some(1000)), + ScalarValue::Date64(Some(5000)), + 4000, + ), + ( + ScalarValue::TimestampSecond(Some(100), None), + ScalarValue::TimestampSecond(Some(200), None), + 100, + ), + ( + ScalarValue::TimestampMillisecond(Some(1000), None), + ScalarValue::TimestampMillisecond(Some(5000), None), + 4000, + ), + ( + ScalarValue::TimestampMicrosecond(Some(0), None), + ScalarValue::TimestampMicrosecond(Some(1_000_000), None), + 1_000_000, + ), + ( + ScalarValue::TimestampNanosecond(Some(1_000_000_000), None), + ScalarValue::TimestampNanosecond(Some(2_000_000_000), None), + 1_000_000_000, + ), ]; for (lhs, rhs, expected) in cases.iter() { let distance = lhs.distance(rhs).unwrap(); @@ -8828,8 +8881,6 @@ mod tests { ScalarValue::Boolean(Some(true)), ScalarValue::Boolean(Some(false)), ), - (ScalarValue::Date32(Some(0)), ScalarValue::Date32(Some(1))), - (ScalarValue::Date64(Some(0)), ScalarValue::Date64(Some(1))), ( ScalarValue::Decimal128(Some(123), 5, 5), ScalarValue::Decimal128(Some(120), 5, 3), diff --git a/datafusion/expr-common/src/interval_arithmetic.rs b/datafusion/expr-common/src/interval_arithmetic.rs index c0cecad4a35c9..76b05d08eae8b 100644 --- a/datafusion/expr-common/src/interval_arithmetic.rs +++ b/datafusion/expr-common/src/interval_arithmetic.rs @@ -929,7 +929,7 @@ impl Interval { /// when the calculated cardinality does not fit in an `u64`. pub fn cardinality(&self) -> Option { let data_type = self.data_type(); - if data_type.is_integer() { + if data_type.is_integer() || data_type.is_temporal() { self.upper.distance(&self.lower).map(|diff| diff as u64) } else if data_type.is_floating() { // Negative numbers are sorted in the reverse order. To @@ -3958,6 +3958,31 @@ mod tests { )?; assert_eq!(interval.cardinality().unwrap(), 2); + // Temporal types + let interval = Interval::try_new( + ScalarValue::Date32(Some(0)), + ScalarValue::Date32(Some(10)), + )?; + assert_eq!(interval.cardinality().unwrap(), 11); + + let interval = Interval::try_new( + ScalarValue::Date64(Some(1000)), + ScalarValue::Date64(Some(5000)), + )?; + assert_eq!(interval.cardinality().unwrap(), 4001); + + let interval = Interval::try_new( + ScalarValue::TimestampSecond(Some(100), None), + ScalarValue::TimestampSecond(Some(200), None), + )?; + assert_eq!(interval.cardinality().unwrap(), 101); + + let interval = Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1_000_000_000), None), + ScalarValue::TimestampNanosecond(Some(2_000_000_000), None), + )?; + assert_eq!(interval.cardinality().unwrap(), 1_000_000_001); + Ok(()) } diff --git a/datafusion/physical-expr/src/intervals/utils.rs b/datafusion/physical-expr/src/intervals/utils.rs index 3cada63a34ace..dfda190fbef5e 100644 --- a/datafusion/physical-expr/src/intervals/utils.rs +++ b/datafusion/physical-expr/src/intervals/utils.rs @@ -104,6 +104,9 @@ pub fn is_datatype_supported(data_type: &DataType) -> bool { | &DataType::UInt8 | &DataType::Float64 | &DataType::Float32 + | &DataType::Date32 + | &DataType::Date64 + | &DataType::Timestamp(_, _) ) } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index afe2b0ae810a3..b82366f603b47 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -2572,4 +2572,50 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn test_filter_statistics_equality_timestamp_ndv() -> Result<()> { + // ts: min=1_000_000_000, max=2_000_000_000, ndv=500 + let schema = Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None), + false, + )]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Inexact(8000), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::TimestampNanosecond( + Some(1_000_000_000), + None, + )), + max_value: Precision::Inexact(ScalarValue::TimestampNanosecond( + Some(2_000_000_000), + None, + )), + distinct_count: Precision::Inexact(500), + ..Default::default() + }], + }, + schema.clone(), + )); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("ts", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::TimestampNanosecond( + Some(1_500_000_000), + None, + ))), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } } From 3a328aa6f2c636941c8465c9e216dcb57ed80fdd Mon Sep 17 00:00:00 2001 From: buraksenn Date: Thu, 9 Apr 2026 21:04:30 +0300 Subject: [PATCH 2/2] fix partition statistics tests in optimizer --- .../partition_statistics.rs | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 07b92923b6f00..96c80d07efea3 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -154,15 +154,16 @@ mod test { // - null_count = 0 (partition values from paths are never null) // - min/max are the merged partition values across files in the group // - byte_size = num_rows * 4 (Date32 is 4 bytes per row) - // - distinct_count = Inexact(1) per partition file (single partition value per file), - // preserved via max() when merging stats across partitions + // - distinct_count = Inexact(max_date - min_date + 1), derived from the + // date range via interval analysis for temporal types let date32_byte_size = num_rows * 4; + let distinct_dates = (max_date - min_date + 1) as usize; column_stats.push(ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Date32(Some(max_date))), min_value: Precision::Exact(ScalarValue::Date32(Some(min_date))), sum_value: Precision::Absent, - distinct_count: Precision::Inexact(1), + distinct_count: Precision::Inexact(distinct_dates), byte_size: Precision::Exact(date32_byte_size), }); } @@ -583,7 +584,7 @@ mod test { max_value: Precision::Exact(ScalarValue::Date32(Some(20151))), min_value: Precision::Exact(ScalarValue::Date32(Some(20148))), sum_value: Precision::Absent, - distinct_count: Precision::Inexact(1), + distinct_count: Precision::Inexact(4), byte_size: Precision::Absent, }, // column 2: right.id (Int32, file column from t2) - right partition 0: ids [3,4] @@ -617,7 +618,7 @@ mod test { max_value: Precision::Exact(ScalarValue::Date32(Some(20151))), min_value: Precision::Exact(ScalarValue::Date32(Some(20148))), sum_value: Precision::Absent, - distinct_count: Precision::Inexact(1), + distinct_count: Precision::Inexact(4), byte_size: Precision::Absent, }, // column 2: right.id (Int32, file column from t2) - right partition 1: ids [1,2] @@ -1256,7 +1257,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Inexact(1), + distinct_count: Precision::Inexact(2), byte_size: Precision::Exact(8), }, ColumnStatistics::new_unknown(), // window column @@ -1284,7 +1285,7 @@ mod test { DATE_2025_03_03, ))), sum_value: Precision::Absent, - distinct_count: Precision::Inexact(1), + distinct_count: Precision::Inexact(2), byte_size: Precision::Exact(8), }, ColumnStatistics::new_unknown(), // window column @@ -1421,8 +1422,7 @@ mod test { byte_size: Precision::Exact(16), }, // Left date column: all partitions (2025-03-01..2025-03-04) - // NDV is Inexact(1) because each Hive partition has exactly 1 distinct date value, - // and merging takes max as a conservative lower bound + // NDV is Inexact(4) derived from the date range via interval analysis ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Date32(Some( @@ -1432,7 +1432,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Inexact(1), + distinct_count: Precision::Inexact(4), byte_size: Precision::Exact(16), }, // Right id column: partition 0 only (id 3..4) @@ -1445,7 +1445,7 @@ mod test { byte_size: Precision::Exact(8), }, // Right date column: partition 0 only (2025-03-01..2025-03-02) - // NDV is Inexact(1) from the single Hive partition's date value + // NDV is Inexact(2) derived from the date range via interval analysis ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Date32(Some( @@ -1455,7 +1455,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Inexact(1), + distinct_count: Precision::Inexact(2), byte_size: Precision::Exact(8), }, ], @@ -1507,7 +1507,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Inexact(1), + distinct_count: Precision::Inexact(2), byte_size: Precision::Exact(8), }, // Right id column: partition 0 only (id 3..4) @@ -1529,7 +1529,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Inexact(1), + distinct_count: Precision::Inexact(2), byte_size: Precision::Exact(8), }, ], @@ -1581,7 +1581,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Inexact(1), + distinct_count: Precision::Inexact(4), byte_size: Precision::Exact(16), }, // Right id column: all partitions (id 1..4) @@ -1603,7 +1603,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Inexact(1), + distinct_count: Precision::Inexact(4), byte_size: Precision::Exact(16), }, ],