Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2526,6 +2526,23 @@ impl ScalarValue {
(Self::Float64(Some(l)), Self::Float64(Some(r))) => {
Some((l - r).abs().round() as _)
}
(Self::Date32(Some(l)), Self::Date32(Some(r))) => Some(l.abs_diff(*r) as _),
(Self::Date64(Some(l)), Self::Date64(Some(r))) => Some(l.abs_diff(*r) as _),
(Self::TimestampSecond(Some(l), _), Self::TimestampSecond(Some(r), _)) => {
Some(l.abs_diff(*r) as _)
}
(
Self::TimestampMillisecond(Some(l), _),
Self::TimestampMillisecond(Some(r), _),
) => Some(l.abs_diff(*r) as _),
(
Self::TimestampMicrosecond(Some(l), _),
Self::TimestampMicrosecond(Some(r), _),
) => Some(l.abs_diff(*r) as _),
(
Self::TimestampNanosecond(Some(l), _),
Self::TimestampNanosecond(Some(r), _),
) => Some(l.abs_diff(*r) as _),
(
Self::Decimal128(Some(l), lprecision, lscale),
Self::Decimal128(Some(r), rprecision, rscale),
Expand Down Expand Up @@ -8766,6 +8783,42 @@ mod tests {
ScalarValue::Decimal256(Some(10.into()), 1, 0),
5,
),
// Temporal types
(
ScalarValue::Date32(Some(0)),
ScalarValue::Date32(Some(10)),
10,
),
(
ScalarValue::Date32(Some(10)),
ScalarValue::Date32(Some(0)),
10,
),
(
ScalarValue::Date64(Some(1000)),
ScalarValue::Date64(Some(5000)),
4000,
),
(
ScalarValue::TimestampSecond(Some(100), None),
ScalarValue::TimestampSecond(Some(200), None),
100,
),
(
ScalarValue::TimestampMillisecond(Some(1000), None),
ScalarValue::TimestampMillisecond(Some(5000), None),
4000,
),
(
ScalarValue::TimestampMicrosecond(Some(0), None),
ScalarValue::TimestampMicrosecond(Some(1_000_000), None),
1_000_000,
),
(
ScalarValue::TimestampNanosecond(Some(1_000_000_000), None),
ScalarValue::TimestampNanosecond(Some(2_000_000_000), None),
1_000_000_000,
),
];
for (lhs, rhs, expected) in cases.iter() {
let distance = lhs.distance(rhs).unwrap();
Expand Down Expand Up @@ -8828,8 +8881,6 @@ mod tests {
ScalarValue::Boolean(Some(true)),
ScalarValue::Boolean(Some(false)),
),
(ScalarValue::Date32(Some(0)), ScalarValue::Date32(Some(1))),
(ScalarValue::Date64(Some(0)), ScalarValue::Date64(Some(1))),
(
ScalarValue::Decimal128(Some(123), 5, 5),
ScalarValue::Decimal128(Some(120), 5, 3),
Expand Down
32 changes: 16 additions & 16 deletions datafusion/core/tests/physical_optimizer/partition_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,16 @@ mod test {
// - null_count = 0 (partition values from paths are never null)
// - min/max are the merged partition values across files in the group
// - byte_size = num_rows * 4 (Date32 is 4 bytes per row)
// - distinct_count = Inexact(1) per partition file (single partition value per file),
// preserved via max() when merging stats across partitions
// - distinct_count = Inexact(max_date - min_date + 1), derived from the
// date range via interval analysis for temporal types
let date32_byte_size = num_rows * 4;
let distinct_dates = (max_date - min_date + 1) as usize;
column_stats.push(ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Date32(Some(max_date))),
min_value: Precision::Exact(ScalarValue::Date32(Some(min_date))),
sum_value: Precision::Absent,
distinct_count: Precision::Inexact(1),
distinct_count: Precision::Inexact(distinct_dates),
byte_size: Precision::Exact(date32_byte_size),
});
}
Expand Down Expand Up @@ -583,7 +584,7 @@ mod test {
max_value: Precision::Exact(ScalarValue::Date32(Some(20151))),
min_value: Precision::Exact(ScalarValue::Date32(Some(20148))),
sum_value: Precision::Absent,
distinct_count: Precision::Inexact(1),
distinct_count: Precision::Inexact(4),
byte_size: Precision::Absent,
},
// column 2: right.id (Int32, file column from t2) - right partition 0: ids [3,4]
Expand Down Expand Up @@ -617,7 +618,7 @@ mod test {
max_value: Precision::Exact(ScalarValue::Date32(Some(20151))),
min_value: Precision::Exact(ScalarValue::Date32(Some(20148))),
sum_value: Precision::Absent,
distinct_count: Precision::Inexact(1),
distinct_count: Precision::Inexact(4),
byte_size: Precision::Absent,
},
// column 2: right.id (Int32, file column from t2) - right partition 1: ids [1,2]
Expand Down Expand Up @@ -1256,7 +1257,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
distinct_count: Precision::Inexact(1),
distinct_count: Precision::Inexact(2),
byte_size: Precision::Exact(8),
},
ColumnStatistics::new_unknown(), // window column
Expand Down Expand Up @@ -1284,7 +1285,7 @@ mod test {
DATE_2025_03_03,
))),
sum_value: Precision::Absent,
distinct_count: Precision::Inexact(1),
distinct_count: Precision::Inexact(2),
byte_size: Precision::Exact(8),
},
ColumnStatistics::new_unknown(), // window column
Expand Down Expand Up @@ -1421,8 +1422,7 @@ mod test {
byte_size: Precision::Exact(16),
},
// Left date column: all partitions (2025-03-01..2025-03-04)
// NDV is Inexact(1) because each Hive partition has exactly 1 distinct date value,
// and merging takes max as a conservative lower bound
// NDV is Inexact(4) derived from the date range via interval analysis
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Date32(Some(
Expand All @@ -1432,7 +1432,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
distinct_count: Precision::Inexact(1),
distinct_count: Precision::Inexact(4),
byte_size: Precision::Exact(16),
},
// Right id column: partition 0 only (id 3..4)
Expand All @@ -1445,7 +1445,7 @@ mod test {
byte_size: Precision::Exact(8),
},
// Right date column: partition 0 only (2025-03-01..2025-03-02)
// NDV is Inexact(1) from the single Hive partition's date value
// NDV is Inexact(2) derived from the date range via interval analysis
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Date32(Some(
Expand All @@ -1455,7 +1455,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
distinct_count: Precision::Inexact(1),
distinct_count: Precision::Inexact(2),
byte_size: Precision::Exact(8),
},
],
Expand Down Expand Up @@ -1507,7 +1507,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
distinct_count: Precision::Inexact(1),
distinct_count: Precision::Inexact(2),
byte_size: Precision::Exact(8),
},
// Right id column: partition 0 only (id 3..4)
Expand All @@ -1529,7 +1529,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
distinct_count: Precision::Inexact(1),
distinct_count: Precision::Inexact(2),
byte_size: Precision::Exact(8),
},
],
Expand Down Expand Up @@ -1581,7 +1581,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
distinct_count: Precision::Inexact(1),
distinct_count: Precision::Inexact(4),
byte_size: Precision::Exact(16),
},
// Right id column: all partitions (id 1..4)
Expand All @@ -1603,7 +1603,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
distinct_count: Precision::Inexact(1),
distinct_count: Precision::Inexact(4),
byte_size: Precision::Exact(16),
},
],
Expand Down
27 changes: 26 additions & 1 deletion datafusion/expr-common/src/interval_arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ impl Interval {
/// when the calculated cardinality does not fit in an `u64`.
pub fn cardinality(&self) -> Option<u64> {
let data_type = self.data_type();
if data_type.is_integer() {
if data_type.is_integer() || data_type.is_temporal() {
self.upper.distance(&self.lower).map(|diff| diff as u64)
} else if data_type.is_floating() {
// Negative numbers are sorted in the reverse order. To
Expand Down Expand Up @@ -3958,6 +3958,31 @@ mod tests {
)?;
assert_eq!(interval.cardinality().unwrap(), 2);

// Temporal types
let interval = Interval::try_new(
ScalarValue::Date32(Some(0)),
ScalarValue::Date32(Some(10)),
)?;
assert_eq!(interval.cardinality().unwrap(), 11);

let interval = Interval::try_new(
ScalarValue::Date64(Some(1000)),
ScalarValue::Date64(Some(5000)),
)?;
assert_eq!(interval.cardinality().unwrap(), 4001);

let interval = Interval::try_new(
ScalarValue::TimestampSecond(Some(100), None),
ScalarValue::TimestampSecond(Some(200), None),
)?;
assert_eq!(interval.cardinality().unwrap(), 101);

let interval = Interval::try_new(
ScalarValue::TimestampNanosecond(Some(1_000_000_000), None),
ScalarValue::TimestampNanosecond(Some(2_000_000_000), None),
)?;
assert_eq!(interval.cardinality().unwrap(), 1_000_000_001);

Ok(())
}

Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-expr/src/intervals/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ pub fn is_datatype_supported(data_type: &DataType) -> bool {
| &DataType::UInt8
| &DataType::Float64
| &DataType::Float32
| &DataType::Date32
| &DataType::Date64
| &DataType::Timestamp(_, _)
)
}

Expand Down
46 changes: 46 additions & 0 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2577,6 +2577,52 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_filter_statistics_equality_timestamp_ndv() -> Result<()> {
// ts: min=1_000_000_000, max=2_000_000_000, ndv=500
let schema = Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None),
false,
)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(8000),
column_statistics: vec![ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::TimestampNanosecond(
Some(1_000_000_000),
None,
)),
max_value: Precision::Inexact(ScalarValue::TimestampNanosecond(
Some(2_000_000_000),
None,
)),
distinct_count: Precision::Inexact(500),
..Default::default()
}],
},
schema.clone(),
));

let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("ts", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::TimestampNanosecond(
Some(1_500_000_000),
None,
))),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
assert_eq!(
statistics.column_statistics[0].distinct_count,
Precision::Exact(1)
);
Ok(())
}

/// Regression test: ProjectionExec on top of a FilterExec that already has
/// an explicit projection must not panic when `try_swapping_with_projection`
/// attempts to swap the two nodes.
Expand Down
Loading