From 83f7a3b29fb8ccc344b8f7fb11210b66a0f24b44 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 25 Feb 2026 17:46:30 -0800 Subject: [PATCH 1/4] port_ceil_comet_to_df --- datafusion/spark/src/function/math/ceil.rs | 155 +++++++++++++ datafusion/spark/src/function/math/mod.rs | 4 + .../test_files/spark/math/ceil.slt | 210 +++++++++++++++++- 3 files changed, 367 insertions(+), 2 deletions(-) create mode 100644 datafusion/spark/src/function/math/ceil.rs diff --git a/datafusion/spark/src/function/math/ceil.rs b/datafusion/spark/src/function/math/ceil.rs new file mode 100644 index 0000000000000..b888a69a26913 --- /dev/null +++ b/datafusion/spark/src/function/math/ceil.rs @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, +}; +use arrow::compute::kernels::arity::unary; +use arrow::datatypes::DataType; +use datafusion_common::{DataFusionError, ScalarValue}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; +// spark semantics + +macro_rules! downcast_compute_op { + ($ARRAY:expr, $NAME:expr, $FUNC:ident, $TYPE:ident, $RESULT:ident) => {{ + let n = $ARRAY.as_any().downcast_ref::<$TYPE>(); + match n { + Some(array) => { + let res: $RESULT = + arrow::compute::kernels::arity::unary(array, |x| x.$FUNC() as i64); + Ok(Arc::new(res)) + } + _ => Err(DataFusionError::Internal(format!( + "Invalid data type for {}", + $NAME + ))), + } + }}; +} + +pub fn spark_ceil(args: &[ColumnarValue]) -> Result { + let value = &args[0]; + match value { + ColumnarValue::Array(array) => match array.data_type() { + DataType::Float32 => { + let result = + downcast_compute_op!(array, "ceil", ceil, Float32Array, Int64Array); + Ok(ColumnarValue::Array(result?)) + } + DataType::Float64 => { + let result = + downcast_compute_op!(array, "ceil", ceil, Float64Array, Int64Array); + Ok(ColumnarValue::Array(result?)) + } + DataType::Int8 => { + let input = array.as_any().downcast_ref::().unwrap(); + let result: Int64Array = unary(input, |x| x as i64); + Ok(ColumnarValue::Array(Arc::new(result))) + } + DataType::Int16 => { + let input = array.as_any().downcast_ref::().unwrap(); + let result: Int64Array = unary(input, |x| x as i64); + Ok(ColumnarValue::Array(Arc::new(result))) + } + DataType::Int32 => { + let input = array.as_any().downcast_ref::().unwrap(); + let result: Int64Array = unary(input, |x| x as i64); + Ok(ColumnarValue::Array(Arc::new(result))) + } + DataType::Int64 => { + // Optimization: Int64 -> Int64 doesn't need conversion, just return same array + Ok(ColumnarValue::Array(Arc::clone(array))) + } + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {other:?} for function ceil", + ))), + }, + ColumnarValue::Scalar(a) => match a { + ScalarValue::Float32(a) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( + a.map(|x| x.ceil() as i64), + ))), + ScalarValue::Float64(a) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( + a.map(|x| x.ceil() as i64), + ))), + ScalarValue::Int8(a) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( + a.map(|x| x as i64), + ))), + ScalarValue::Int16(a) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( + a.map(|x| x as i64), + ))), + ScalarValue::Int32(a) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( + a.map(|x| x as i64), + ))), + ScalarValue::Int64(a) => Ok(ColumnarValue::Scalar(ScalarValue::Int64(*a))), + _ => Err(DataFusionError::Internal(format!( + "Unsupported data type {:?} for function ceil", + value.data_type(), + ))), + }, + } +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkCiel { + signature: Signature, +} + +impl Default for SparkCiel { + fn default() -> Self { + Self::new() + } +} + +impl SparkCiel { + pub fn new() -> Self { + Self { + signature: Signature::numeric(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for SparkCiel { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "ceil" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type( + &self, + _arg_types: &[DataType], + ) -> datafusion_common::Result { + Ok(DataType::Int64) + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + spark_ceil(&args.args) + } +} diff --git a/datafusion/spark/src/function/math/mod.rs b/datafusion/spark/src/function/math/mod.rs index 92d8e90ac372e..7f1067741d4d8 100644 --- a/datafusion/spark/src/function/math/mod.rs +++ b/datafusion/spark/src/function/math/mod.rs @@ -16,6 +16,7 @@ // under the License. pub mod abs; +mod ceil; pub mod expm1; pub mod factorial; pub mod hex; @@ -42,6 +43,7 @@ make_udf_function!(width_bucket::SparkWidthBucket, width_bucket); make_udf_function!(trigonometry::SparkCsc, csc); make_udf_function!(trigonometry::SparkSec, sec); make_udf_function!(negative::SparkNegative, negative); +make_udf_function!(ceil::SparkCiel, ceil); pub mod expr_fn { use datafusion_functions::export_functions; @@ -70,6 +72,7 @@ pub mod expr_fn { "Returns the negation of expr (unary minus).", arg1 )); + export_functions!((ceil, "Returns the ceiling of expr.", arg1)); } pub fn functions() -> Vec> { @@ -86,5 +89,6 @@ pub fn functions() -> Vec> { csc(), sec(), negative(), + ceil(), ] } diff --git a/datafusion/sqllogictest/test_files/spark/math/ceil.slt b/datafusion/sqllogictest/test_files/spark/math/ceil.slt index c87a29b61fd49..f6f6e1ef507f2 100644 --- a/datafusion/sqllogictest/test_files/spark/math/ceil.slt +++ b/datafusion/sqllogictest/test_files/spark/math/ceil.slt @@ -38,5 +38,211 @@ ## Original Query: SELECT ceil(5); ## PySpark 3.5.5 Result: {'CEIL(5)': 5, 'typeof(CEIL(5))': 'bigint', 'typeof(5)': 'int'} -#query -#SELECT ceil(5::int); +query I +SELECT ceil(5::int); +---- +5 + +# Additional tests for all supported numeric types + +# Test Float32 +query I +SELECT ceil(125.2345::float); +---- +126 + +query I +SELECT ceil(15.0001::float); +---- +16 + +query I +SELECT ceil(0.1::float); +---- +1 + +query I +SELECT ceil(-0.9::float); +---- +0 + +query I +SELECT ceil(-1.1::float); +---- +-1 + +query I +SELECT ceil(123.0::float); +---- +123 + +query I +SELECT ceil(0.0::float); +---- +0 + +# Test Float64 +query I +SELECT ceil(125.2345::double); +---- +126 + +query I +SELECT ceil(15.0001::double); +---- +16 + +query I +SELECT ceil(0.1::double); +---- +1 + +query I +SELECT ceil(-0.9::double); +---- +0 + +query I +SELECT ceil(-1.1::double); +---- +-1 + +query I +SELECT ceil(123.0::double); +---- +123 + +query I +SELECT ceil(1.9999::double); +---- +2 + +query I +SELECT ceil(-1.9999::double); +---- +-1 + +# Test Int8 (tinyint) +query I +SELECT ceil(5::tinyint); +---- +5 + +query I +SELECT ceil(-1::tinyint); +---- +-1 + +query I +SELECT ceil(0::tinyint); +---- +0 + +query I +SELECT ceil(127::tinyint); +---- +127 + +query I +SELECT ceil(-128::tinyint); +---- +-128 + +# Test Int16 (smallint) +query I +SELECT ceil(100::smallint); +---- +100 + +query I +SELECT ceil(-50::smallint); +---- +-50 + +query I +SELECT ceil(0::smallint); +---- +0 + +query I +SELECT ceil(32767::smallint); +---- +32767 + +query I +SELECT ceil(-32768::smallint); +---- +-32768 + +# Test Int32 (int) +query I +SELECT ceil(1000::int); +---- +1000 + +query I +SELECT ceil(-500::int); +---- +-500 + +query I +SELECT ceil(0::int); +---- +0 + +# Test Int64 (bigint) +query I +SELECT ceil(48::bigint); +---- +48 + +query I +SELECT ceil(-1::bigint); +---- +-1 + +query I +SELECT ceil(0::bigint); +---- +0 + +query I +SELECT ceil(9223372036854775807::bigint); +---- +9223372036854775807 + +query I +SELECT ceil(-9223372036854775808::bigint); +---- +-9223372036854775808 + +# Test NULL values +query I +SELECT ceil(NULL::float); +---- +NULL + +query I +SELECT ceil(NULL::double); +---- +NULL + +query I +SELECT ceil(NULL::tinyint); +---- +NULL + +query I +SELECT ceil(NULL::smallint); +---- +NULL + +query I +SELECT ceil(NULL::int); +---- +NULL + +query I +SELECT ceil(NULL::bigint); +---- +NULL From 548fd184d0112cce0e4f5ed45dfdd96daa31a76c Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 4 Mar 2026 11:46:11 -0800 Subject: [PATCH 2/4] spark_compatible_ceil_function --- datafusion/spark/src/function/math/ceil.rs | 122 ++++++++++++++++++++- 1 file changed, 120 insertions(+), 2 deletions(-) diff --git a/datafusion/spark/src/function/math/ceil.rs b/datafusion/spark/src/function/math/ceil.rs index b888a69a26913..b5d51e87d6c00 100644 --- a/datafusion/spark/src/function/math/ceil.rs +++ b/datafusion/spark/src/function/math/ceil.rs @@ -15,8 +15,11 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::cast::AsArray; +use arrow::array::types::Decimal128Type; use arrow::array::{ - Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, + ArrayRef, Decimal128Array, Float32Array, Float64Array, Int8Array, Int16Array, + Int32Array, Int64Array, }; use arrow::compute::kernels::arity::unary; use arrow::datatypes::DataType; @@ -26,7 +29,6 @@ use datafusion_expr::{ }; use std::any::Any; use std::sync::Arc; -// spark semantics macro_rules! downcast_compute_op { ($ARRAY:expr, $NAME:expr, $FUNC:ident, $TYPE:ident, $RESULT:ident) => {{ @@ -78,6 +80,10 @@ pub fn spark_ceil(args: &[ColumnarValue]) -> Result Int64 doesn't need conversion, just return same array Ok(ColumnarValue::Array(Arc::clone(array))) } + DataType::Decimal128(precision, scale) if *scale > 0 => { + let f = decimal_ceil_f(*scale); + make_decimal_array(array, *precision, *scale, &f) + } other => Err(DataFusionError::Internal(format!( "Unsupported data type {other:?} for function ceil", ))), @@ -99,6 +105,13 @@ pub fn spark_ceil(args: &[ColumnarValue]) -> Result Ok(ColumnarValue::Scalar(ScalarValue::Int64(*a))), + ScalarValue::Decimal128(a, precision, scale) if *scale > 0 => { + let f = decimal_ceil_f(*scale); + let result = a.map(f); + Ok(ColumnarValue::Scalar(ScalarValue::Decimal128( + result, *precision, *scale, + ))) + } _ => Err(DataFusionError::Internal(format!( "Unsupported data type {:?} for function ceil", value.data_type(), @@ -107,6 +120,33 @@ pub fn spark_ceil(args: &[ColumnarValue]) -> Result i128, +) -> Result { + let array = array.as_primitive::(); + let result: Decimal128Array = unary(array, f); + let result = result.with_data_type(DataType::Decimal128(precision, scale)); + Ok(ColumnarValue::Array(Arc::new(result))) +} + +/// Returns a closure that computes ceil for decimal values. +#[inline] +fn decimal_ceil_f(scale: i8) -> impl Fn(i128) -> i128 { + let div = 10_i128.pow(scale as u32); + move |x: i128| { + let d = x / div; + let r = x % div; + // Ceiling: round up for positive remainders + (if r > 0 { d + 1 } else { d }) * div + } +} + #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkCiel { signature: Signature, @@ -153,3 +193,81 @@ impl ScalarUDFImpl for SparkCiel { spark_ceil(&args.args) } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::Decimal128Array; + use datafusion_common::Result; + use datafusion_common::cast::as_decimal128_array; + + #[test] + fn test_ceil_decimal128_array() -> Result<()> { + let array = Decimal128Array::from(vec![ + Some(12345), // 123.45 + Some(12500), // 125.00 + Some(-12999), // -129.99 + None, + ]) + .with_precision_and_scale(5, 2)?; + let args = vec![ColumnarValue::Array(Arc::new(array))]; + let ColumnarValue::Array(result) = spark_ceil(&args)? else { + unreachable!() + }; + let expected = Decimal128Array::from(vec![ + Some(12400), // 124.00 + Some(12500), // 125.00 + Some(-12900), // -129.00 + None, + ]) + .with_precision_and_scale(5, 2)?; + let actual = as_decimal128_array(&result)?; + assert_eq!(actual, &expected); + Ok(()) + } + + #[test] + fn test_ceil_decimal128_scalar() -> Result<()> { + let args = vec![ColumnarValue::Scalar(ScalarValue::Decimal128( + Some(567), + 3, + 1, + ))]; // 56.7 + let ColumnarValue::Scalar(ScalarValue::Decimal128(Some(result), 3, 1)) = + spark_ceil(&args)? + else { + unreachable!() + }; + assert_eq!(result, 570); // 57.0 + Ok(()) + } + + #[test] + fn test_ceil_decimal128_negative_scalar() -> Result<()> { + // -56.7 should ceil to -56.0 + let args = vec![ColumnarValue::Scalar(ScalarValue::Decimal128( + Some(-567), + 3, + 1, + ))]; + let ColumnarValue::Scalar(ScalarValue::Decimal128(Some(result), 3, 1)) = + spark_ceil(&args)? + else { + unreachable!() + }; + assert_eq!(result, -560); // -56.0 + Ok(()) + } + + #[test] + fn test_ceil_decimal128_null_scalar() -> Result<()> { + let args = vec![ColumnarValue::Scalar(ScalarValue::Decimal128(None, 5, 2))]; + let ColumnarValue::Scalar(ScalarValue::Decimal128(result, 5, 2)) = + spark_ceil(&args)? + else { + unreachable!() + }; + assert_eq!(result, None); + Ok(()) + } +} From 251883f2b99fbb70d92314d68aeff7b867d8a145 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 4 Mar 2026 13:09:33 -0800 Subject: [PATCH 3/4] add_tests --- datafusion/spark/src/function/math/ceil.rs | 445 ++++++++++++------ datafusion/spark/src/function/math/mod.rs | 4 +- .../test_files/spark/math/ceil.slt | 6 +- 3 files changed, 313 insertions(+), 142 deletions(-) diff --git a/datafusion/spark/src/function/math/ceil.rs b/datafusion/spark/src/function/math/ceil.rs index b5d51e87d6c00..65a5522c9c571 100644 --- a/datafusion/spark/src/function/math/ceil.rs +++ b/datafusion/spark/src/function/math/ceil.rs @@ -17,210 +17,351 @@ use arrow::array::cast::AsArray; use arrow::array::types::Decimal128Type; -use arrow::array::{ - ArrayRef, Decimal128Array, Float32Array, Float64Array, Int8Array, Int16Array, - Int32Array, Int64Array, -}; +use arrow::array::{Decimal128Array, Int64Array}; use arrow::compute::kernels::arity::unary; -use arrow::datatypes::DataType; -use datafusion_common::{DataFusionError, ScalarValue}; +use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion_common::{DataFusionError, ScalarValue, exec_err, internal_err}; use datafusion_expr::{ - ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, + ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, + Volatility, }; use std::any::Any; use std::sync::Arc; -macro_rules! downcast_compute_op { - ($ARRAY:expr, $NAME:expr, $FUNC:ident, $TYPE:ident, $RESULT:ident) => {{ - let n = $ARRAY.as_any().downcast_ref::<$TYPE>(); - match n { - Some(array) => { - let res: $RESULT = - arrow::compute::kernels::arity::unary(array, |x| x.$FUNC() as i64); - Ok(Arc::new(res)) - } - _ => Err(DataFusionError::Internal(format!( - "Invalid data type for {}", - $NAME - ))), +/// Spark-compatible `ceil` function. +/// +/// Returns the smallest integer greater than or equal to the input. +/// Unlike standard DataFusion ceil, this returns Int64 for float/integer +/// inputs (matching Spark behavior). +/// +/// # Supported types +/// - Float32, Float64: returns Int64 +/// - Int8, Int16, Int32, Int64: returns Int64 +/// - Decimal128(p, s): returns Decimal128(p, s) (preserves precision and scale) +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkCeil { + signature: Signature, +} + +impl Default for SparkCeil { + fn default() -> Self { + Self::new() + } +} + +impl SparkCeil { + pub fn new() -> Self { + Self { + signature: Signature::numeric(1, Volatility::Immutable), } - }}; + } } +impl ScalarUDFImpl for SparkCeil { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "ceil" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type( + &self, + _arg_types: &[DataType], + ) -> datafusion_common::Result { + internal_err!("return_field_from_args should be called instead") + } + + fn return_field_from_args( + &self, + args: ReturnFieldArgs, + ) -> datafusion_common::Result { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + let return_type = match args.arg_fields[0].data_type() { + DataType::Decimal128(p, s) => DataType::Decimal128(*p, *s), + _ => DataType::Int64, + }; + Ok(Arc::new(Field::new(self.name(), return_type, nullable))) + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + spark_ceil(&args.args) + } +} + +/// Computes the Spark-compatible ceiling of the input. pub fn spark_ceil(args: &[ColumnarValue]) -> Result { let value = &args[0]; match value { ColumnarValue::Array(array) => match array.data_type() { DataType::Float32 => { - let result = - downcast_compute_op!(array, "ceil", ceil, Float32Array, Int64Array); - Ok(ColumnarValue::Array(result?)) + let input = array.as_primitive::(); + let result: Int64Array = unary(input, |x| x.ceil() as i64); + Ok(ColumnarValue::Array(Arc::new(result))) } DataType::Float64 => { - let result = - downcast_compute_op!(array, "ceil", ceil, Float64Array, Int64Array); - Ok(ColumnarValue::Array(result?)) + let input = array.as_primitive::(); + let result: Int64Array = unary(input, |x| x.ceil() as i64); + Ok(ColumnarValue::Array(Arc::new(result))) } DataType::Int8 => { - let input = array.as_any().downcast_ref::().unwrap(); + let input = array.as_primitive::(); let result: Int64Array = unary(input, |x| x as i64); Ok(ColumnarValue::Array(Arc::new(result))) } DataType::Int16 => { - let input = array.as_any().downcast_ref::().unwrap(); + let input = array.as_primitive::(); let result: Int64Array = unary(input, |x| x as i64); Ok(ColumnarValue::Array(Arc::new(result))) } DataType::Int32 => { - let input = array.as_any().downcast_ref::().unwrap(); + let input = array.as_primitive::(); let result: Int64Array = unary(input, |x| x as i64); Ok(ColumnarValue::Array(Arc::new(result))) } - DataType::Int64 => { - // Optimization: Int64 -> Int64 doesn't need conversion, just return same array - Ok(ColumnarValue::Array(Arc::clone(array))) + DataType::Int64 => Ok(ColumnarValue::Array(Arc::clone(array))), + DataType::Decimal128(precision, scale) => { + if *scale <= 0 { + Ok(ColumnarValue::Array(Arc::clone(array))) + } else { + let f = decimal_ceil_f(*scale); + let input = array.as_primitive::(); + let result: Decimal128Array = unary(input, &f); + let result = + result.with_data_type(DataType::Decimal128(*precision, *scale)); + Ok(ColumnarValue::Array(Arc::new(result))) + } } - DataType::Decimal128(precision, scale) if *scale > 0 => { - let f = decimal_ceil_f(*scale); - make_decimal_array(array, *precision, *scale, &f) + other => { + exec_err!("Unsupported data type {other:?} for function ceil") } - other => Err(DataFusionError::Internal(format!( - "Unsupported data type {other:?} for function ceil", - ))), }, - ColumnarValue::Scalar(a) => match a { - ScalarValue::Float32(a) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( - a.map(|x| x.ceil() as i64), + ColumnarValue::Scalar(scalar) => match scalar { + ScalarValue::Float32(v) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( + v.map(|x| x.ceil() as i64), ))), - ScalarValue::Float64(a) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( - a.map(|x| x.ceil() as i64), + ScalarValue::Float64(v) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( + v.map(|x| x.ceil() as i64), ))), - ScalarValue::Int8(a) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( - a.map(|x| x as i64), + ScalarValue::Int8(v) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( + v.map(|x| x as i64), ))), - ScalarValue::Int16(a) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( - a.map(|x| x as i64), + ScalarValue::Int16(v) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( + v.map(|x| x as i64), ))), - ScalarValue::Int32(a) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( - a.map(|x| x as i64), + ScalarValue::Int32(v) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( + v.map(|x| x as i64), ))), - ScalarValue::Int64(a) => Ok(ColumnarValue::Scalar(ScalarValue::Int64(*a))), - ScalarValue::Decimal128(a, precision, scale) if *scale > 0 => { - let f = decimal_ceil_f(*scale); - let result = a.map(f); - Ok(ColumnarValue::Scalar(ScalarValue::Decimal128( - result, *precision, *scale, - ))) + ScalarValue::Int64(v) => Ok(ColumnarValue::Scalar(ScalarValue::Int64(*v))), + ScalarValue::Decimal128(v, precision, scale) => { + if *scale <= 0 { + Ok(ColumnarValue::Scalar(ScalarValue::Decimal128( + *v, *precision, *scale, + ))) + } else { + let f = decimal_ceil_f(*scale); + let result = v.map(f); + Ok(ColumnarValue::Scalar(ScalarValue::Decimal128( + result, *precision, *scale, + ))) + } + } + other => { + exec_err!( + "Unsupported data type {:?} for function ceil", + other.data_type() + ) } - _ => Err(DataFusionError::Internal(format!( - "Unsupported data type {:?} for function ceil", - value.data_type(), - ))), }, } } -/// Computes ceil for a Decimal128 array, preserving the scale. -/// Divides by 10^scale, takes ceiling, then multiplies back. -#[inline] -fn make_decimal_array( - array: &ArrayRef, - precision: u8, - scale: i8, - f: &dyn Fn(i128) -> i128, -) -> Result { - let array = array.as_primitive::(); - let result: Decimal128Array = unary(array, f); - let result = result.with_data_type(DataType::Decimal128(precision, scale)); - Ok(ColumnarValue::Array(Arc::new(result))) -} - /// Returns a closure that computes ceil for decimal values. #[inline] fn decimal_ceil_f(scale: i8) -> impl Fn(i128) -> i128 { - let div = 10_i128.pow(scale as u32); + let divisor = 10_i128.pow(scale as u32); move |x: i128| { - let d = x / div; - let r = x % div; - // Ceiling: round up for positive remainders - (if r > 0 { d + 1 } else { d }) * div + let quotient = x / divisor; + let remainder = x % divisor; + if remainder > 0 { + (quotient + 1) * divisor + } else { + quotient * divisor + } } } -#[derive(Debug, PartialEq, Eq, Hash)] -pub struct SparkCiel { - signature: Signature, -} +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ + Array, Decimal128Array, Float32Array, Float64Array, Int8Array, Int64Array, + }; + use datafusion_common::Result; + use datafusion_common::cast::{as_decimal128_array, as_int64_array}; -impl Default for SparkCiel { - fn default() -> Self { - Self::new() + #[test] + fn test_ceil_float32_array() -> Result<()> { + let array = Float32Array::from(vec![ + Some(1.1), + Some(1.9), + Some(-1.1), + Some(-1.9), + Some(0.0), + None, + ]); + let args = vec![ColumnarValue::Array(Arc::new(array))]; + let ColumnarValue::Array(result) = spark_ceil(&args)? else { + unreachable!() + }; + let result = as_int64_array(&result)?; + assert_eq!(result.value(0), 2); + assert_eq!(result.value(1), 2); + assert_eq!(result.value(2), -1); + assert_eq!(result.value(3), -1); + assert_eq!(result.value(4), 0); + assert!(result.is_null(5)); + Ok(()) } -} -impl SparkCiel { - pub fn new() -> Self { - Self { - signature: Signature::numeric(1, Volatility::Immutable), - } + #[test] + fn test_ceil_float32_scalar() -> Result<()> { + let args = vec![ColumnarValue::Scalar(ScalarValue::Float32(Some(1.5)))]; + let ColumnarValue::Scalar(ScalarValue::Int64(Some(result))) = spark_ceil(&args)? + else { + unreachable!() + }; + assert_eq!(result, 2); + Ok(()) } -} -impl ScalarUDFImpl for SparkCiel { - fn as_any(&self) -> &dyn Any { - self + #[test] + fn test_ceil_float64_array() -> Result<()> { + let array = Float64Array::from(vec![ + Some(1.1), + Some(1.9), + Some(-1.1), + Some(-1.9), + Some(0.0), + Some(123.0), + None, + ]); + let args = vec![ColumnarValue::Array(Arc::new(array))]; + let ColumnarValue::Array(result) = spark_ceil(&args)? else { + unreachable!() + }; + let result = as_int64_array(&result)?; + assert_eq!(result.value(0), 2); + assert_eq!(result.value(1), 2); + assert_eq!(result.value(2), -1); + assert_eq!(result.value(3), -1); + assert_eq!(result.value(4), 0); + assert_eq!(result.value(5), 123); + assert!(result.is_null(6)); + Ok(()) } - fn name(&self) -> &str { - "ceil" + #[test] + fn test_ceil_float64_scalar() -> Result<()> { + let args = vec![ColumnarValue::Scalar(ScalarValue::Float64(Some(-1.5)))]; + let ColumnarValue::Scalar(ScalarValue::Int64(Some(result))) = spark_ceil(&args)? + else { + unreachable!() + }; + assert_eq!(result, -1); + Ok(()) } - fn signature(&self) -> &Signature { - &self.signature + #[test] + fn test_ceil_float64_null_scalar() -> Result<()> { + let args = vec![ColumnarValue::Scalar(ScalarValue::Float64(None))]; + let ColumnarValue::Scalar(ScalarValue::Int64(result)) = spark_ceil(&args)? else { + unreachable!() + }; + assert_eq!(result, None); + Ok(()) } - fn return_type( - &self, - _arg_types: &[DataType], - ) -> datafusion_common::Result { - Ok(DataType::Int64) + #[test] + fn test_ceil_int8_array() -> Result<()> { + let array = Int8Array::from(vec![Some(1), Some(-1), Some(127), Some(-128), None]); + let args = vec![ColumnarValue::Array(Arc::new(array))]; + let ColumnarValue::Array(result) = spark_ceil(&args)? else { + unreachable!() + }; + let result = as_int64_array(&result)?; + assert_eq!(result.value(0), 1); + assert_eq!(result.value(1), -1); + assert_eq!(result.value(2), 127); + assert_eq!(result.value(3), -128); + assert!(result.is_null(4)); + Ok(()) } - fn invoke_with_args( - &self, - args: ScalarFunctionArgs, - ) -> datafusion_common::Result { - spark_ceil(&args.args) + #[test] + fn test_ceil_int16_scalar() -> Result<()> { + let args = vec![ColumnarValue::Scalar(ScalarValue::Int16(Some(100)))]; + let ColumnarValue::Scalar(ScalarValue::Int64(Some(result))) = spark_ceil(&args)? + else { + unreachable!() + }; + assert_eq!(result, 100); + Ok(()) } -} -#[cfg(test)] -mod tests { - use super::*; - use arrow::array::Decimal128Array; - use datafusion_common::Result; - use datafusion_common::cast::as_decimal128_array; + #[test] + fn test_ceil_int32_scalar() -> Result<()> { + let args = vec![ColumnarValue::Scalar(ScalarValue::Int32(Some(-500)))]; + let ColumnarValue::Scalar(ScalarValue::Int64(Some(result))) = spark_ceil(&args)? + else { + unreachable!() + }; + assert_eq!(result, -500); + Ok(()) + } #[test] - fn test_ceil_decimal128_array() -> Result<()> { - let array = Decimal128Array::from(vec![ - Some(12345), // 123.45 - Some(12500), // 125.00 - Some(-12999), // -129.99 + fn test_ceil_int64_array() -> Result<()> { + let array = Int64Array::from(vec![ + Some(1), + Some(-1), + Some(i64::MAX), + Some(i64::MIN), None, - ]) - .with_precision_and_scale(5, 2)?; + ]); let args = vec![ColumnarValue::Array(Arc::new(array))]; let ColumnarValue::Array(result) = spark_ceil(&args)? else { unreachable!() }; - let expected = Decimal128Array::from(vec![ - Some(12400), // 124.00 - Some(12500), // 125.00 - Some(-12900), // -129.00 - None, - ]) - .with_precision_and_scale(5, 2)?; + let result = as_int64_array(&result)?; + assert_eq!(result.value(0), 1); + assert_eq!(result.value(1), -1); + assert_eq!(result.value(2), i64::MAX); + assert_eq!(result.value(3), i64::MIN); + assert!(result.is_null(4)); + Ok(()) + } + + #[test] + fn test_ceil_decimal128_array() -> Result<()> { + let array = + Decimal128Array::from(vec![Some(12345), Some(12500), Some(-12999), None]) + .with_precision_and_scale(5, 2)?; + let args = vec![ColumnarValue::Array(Arc::new(array))]; + let ColumnarValue::Array(result) = spark_ceil(&args)? else { + unreachable!() + }; + let expected = + Decimal128Array::from(vec![Some(12400), Some(12500), Some(-12900), None]) + .with_precision_and_scale(5, 2)?; let actual = as_decimal128_array(&result)?; assert_eq!(actual, &expected); Ok(()) @@ -232,19 +373,18 @@ mod tests { Some(567), 3, 1, - ))]; // 56.7 + ))]; let ColumnarValue::Scalar(ScalarValue::Decimal128(Some(result), 3, 1)) = spark_ceil(&args)? else { unreachable!() }; - assert_eq!(result, 570); // 57.0 + assert_eq!(result, 570); Ok(()) } #[test] fn test_ceil_decimal128_negative_scalar() -> Result<()> { - // -56.7 should ceil to -56.0 let args = vec![ColumnarValue::Scalar(ScalarValue::Decimal128( Some(-567), 3, @@ -255,7 +395,7 @@ mod tests { else { unreachable!() }; - assert_eq!(result, -560); // -56.0 + assert_eq!(result, -560); Ok(()) } @@ -270,4 +410,35 @@ mod tests { assert_eq!(result, None); Ok(()) } + + #[test] + fn test_ceil_decimal128_scale_zero() -> Result<()> { + let array = Decimal128Array::from(vec![Some(123), Some(-456), None]) + .with_precision_and_scale(10, 0)?; + let args = vec![ColumnarValue::Array(Arc::new(array))]; + let ColumnarValue::Array(result) = spark_ceil(&args)? else { + unreachable!() + }; + let result = as_decimal128_array(&result)?; + assert_eq!(result.value(0), 123); + assert_eq!(result.value(1), -456); + assert!(result.is_null(2)); + Ok(()) + } + + #[test] + fn test_ceil_decimal128_scale_zero_scalar() -> Result<()> { + let args = vec![ColumnarValue::Scalar(ScalarValue::Decimal128( + Some(12345), + 10, + 0, + ))]; + let ColumnarValue::Scalar(ScalarValue::Decimal128(Some(result), 10, 0)) = + spark_ceil(&args)? + else { + unreachable!() + }; + assert_eq!(result, 12345); + Ok(()) + } } diff --git a/datafusion/spark/src/function/math/mod.rs b/datafusion/spark/src/function/math/mod.rs index 7f1067741d4d8..7067e6246833d 100644 --- a/datafusion/spark/src/function/math/mod.rs +++ b/datafusion/spark/src/function/math/mod.rs @@ -16,7 +16,7 @@ // under the License. pub mod abs; -mod ceil; +pub mod ceil; pub mod expm1; pub mod factorial; pub mod hex; @@ -43,7 +43,7 @@ make_udf_function!(width_bucket::SparkWidthBucket, width_bucket); make_udf_function!(trigonometry::SparkCsc, csc); make_udf_function!(trigonometry::SparkSec, sec); make_udf_function!(negative::SparkNegative, negative); -make_udf_function!(ceil::SparkCiel, ceil); +make_udf_function!(ceil::SparkCeil, ceil); pub mod expr_fn { use datafusion_functions::export_functions; diff --git a/datafusion/sqllogictest/test_files/spark/math/ceil.slt b/datafusion/sqllogictest/test_files/spark/math/ceil.slt index f6f6e1ef507f2..b02ad8bcd35c8 100644 --- a/datafusion/sqllogictest/test_files/spark/math/ceil.slt +++ b/datafusion/sqllogictest/test_files/spark/math/ceil.slt @@ -144,7 +144,7 @@ SELECT ceil(127::tinyint); 127 query I -SELECT ceil(-128::tinyint); +SELECT ceil(CAST(-128 AS tinyint)); ---- -128 @@ -170,7 +170,7 @@ SELECT ceil(32767::smallint); 32767 query I -SELECT ceil(-32768::smallint); +SELECT ceil(CAST(-32768 AS smallint)); ---- -32768 @@ -212,7 +212,7 @@ SELECT ceil(9223372036854775807::bigint); 9223372036854775807 query I -SELECT ceil(-9223372036854775808::bigint); +SELECT ceil(CAST(-9223372036854775808 AS bigint)); ---- -9223372036854775808 From 0102363a5ecb73ecb0b7e875993000bf1d08cf3c Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 4 Mar 2026 15:44:09 -0800 Subject: [PATCH 4/4] ceil_fix_tests --- datafusion/spark/src/function/math/ceil.rs | 22 ++++++++-------------- datafusion/spark/src/function/math/mod.rs | 4 ++-- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/datafusion/spark/src/function/math/ceil.rs b/datafusion/spark/src/function/math/ceil.rs index 65a5522c9c571..9683b54cb1ef0 100644 --- a/datafusion/spark/src/function/math/ceil.rs +++ b/datafusion/spark/src/function/math/ceil.rs @@ -29,15 +29,11 @@ use std::any::Any; use std::sync::Arc; /// Spark-compatible `ceil` function. -/// -/// Returns the smallest integer greater than or equal to the input. -/// Unlike standard DataFusion ceil, this returns Int64 for float/integer -/// inputs (matching Spark behavior). -/// -/// # Supported types -/// - Float32, Float64: returns Int64 -/// - Int8, Int16, Int32, Int64: returns Int64 -/// - Decimal128(p, s): returns Decimal128(p, s) (preserves precision and scale) +/// Unlike standard DataFusion ceil, this returns Int64 for float/integer (per Spark spec) +/// Supported types +/// Float32, Float64 -> Int64 +/// Int8, Int16, Int32, Int64 -> Int64 +/// Decimal128(p, s): -> Decimal128(p, s) (preserves precision and scale) #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkCeil { signature: Signature, @@ -97,7 +93,6 @@ impl ScalarUDFImpl for SparkCeil { } } -/// Computes the Spark-compatible ceiling of the input. pub fn spark_ceil(args: &[ColumnarValue]) -> Result { let value = &args[0]; match value { @@ -132,7 +127,7 @@ pub fn spark_ceil(args: &[ColumnarValue]) -> Result(); let result: Decimal128Array = unary(input, &f); let result = @@ -167,7 +162,7 @@ pub fn spark_ceil(args: &[ColumnarValue]) -> Result Result impl Fn(i128) -> i128 { +fn decimal_ceil_helper(scale: i8) -> impl Fn(i128) -> i128 { let divisor = 10_i128.pow(scale as u32); move |x: i128| { let quotient = x / divisor; diff --git a/datafusion/spark/src/function/math/mod.rs b/datafusion/spark/src/function/math/mod.rs index 3e317b203b503..c9d702f8efb7f 100644 --- a/datafusion/spark/src/function/math/mod.rs +++ b/datafusion/spark/src/function/math/mod.rs @@ -16,8 +16,8 @@ // under the License. pub mod abs; -pub mod ceil; pub mod bin; +pub mod ceil; pub mod expm1; pub mod factorial; pub mod hex; @@ -44,7 +44,7 @@ make_udf_function!(width_bucket::SparkWidthBucket, width_bucket); make_udf_function!(trigonometry::SparkCsc, csc); make_udf_function!(trigonometry::SparkSec, sec); make_udf_function!(negative::SparkNegative, negative); -make_udf_function!(bin::SparkBin, bin, ceil::SparkCeil, ceil); +make_udf_function!(bin::SparkBin, bin, ceil); pub mod expr_fn { use datafusion_functions::export_functions;