diff --git a/docs/source/contributor-guide/spark_expressions_support.md b/docs/source/contributor-guide/spark_expressions_support.md index 9c19926a7c..dbaf611e41 100644 --- a/docs/source/contributor-guide/spark_expressions_support.md +++ b/docs/source/contributor-guide/spark_expressions_support.md @@ -247,7 +247,7 @@ - [x] last_day - [x] localtimestamp - [x] make_date -- [ ] make_dt_interval +- [x] make_dt_interval - [ ] make_interval - [ ] make_time - [ ] make_timestamp diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 37440980ef..09e9fe57e4 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -116,6 +116,7 @@ of expressions that be disabled. | LastDay | `last_day` | | LocalTimestamp | `localtimestamp` | | MakeDate | `make_date` | +| MakeDTInterval | `make_dt_interval` | | MakeTime | `make_time` | | Minute | `minute` | | NextDay | `next_day` | diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a19d1ee368..1c9727489c 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -50,6 +50,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::datetime::from_utc_timestamp::SparkFromUtcTimestamp; use datafusion_spark::function::datetime::last_day::SparkLastDay; +use datafusion_spark::function::datetime::make_dt_interval::SparkMakeDtInterval; use datafusion_spark::function::datetime::next_day::SparkNextDay; use datafusion_spark::function::datetime::to_utc_timestamp::SparkToUtcTimestamp; use datafusion_spark::function::hash::crc32::SparkCrc32; @@ -580,6 +581,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateSub::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkFromUtcTimestamp::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkLastDay::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkMakeDtInterval::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkNextDay::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkToUtcTimestamp::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 41f0d37195..9ccda5ab67 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -356,6 +356,9 @@ impl PhysicalPlanner { DataType::Map(f, s) => DataType::Map(f, s).try_into()?, DataType::List(f) => DataType::List(f).try_into()?, DataType::Null => ScalarValue::Null, + DataType::Duration(TimeUnit::Microsecond) => { + ScalarValue::DurationMicrosecond(None) + } DataType::Time64(TimeUnit::Nanosecond) => { ScalarValue::Time64Nanosecond(None) } diff --git a/native/core/src/execution/serde.rs b/native/core/src/execution/serde.rs index d6ec6be132..a67be01a5b 100644 --- a/native/core/src/execution/serde.rs +++ b/native/core/src/execution/serde.rs @@ -91,6 +91,7 @@ pub fn to_arrow_datatype(dt_value: &DataType) -> ArrowDataType { } _ => unreachable!(), }, + DataTypeId::DurationMicrosecond => ArrowDataType::Duration(TimeUnit::Microsecond), DataTypeId::Timestamp => { ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string().into())) } diff --git a/native/proto/src/proto/types.proto b/native/proto/src/proto/types.proto index df0c0c5553..897627ecf5 100644 --- a/native/proto/src/proto/types.proto +++ b/native/proto/src/proto/types.proto @@ -60,6 +60,7 @@ message DataType { MAP = 15; STRUCT = 16; TIME = 17; + DURATION_MICROSECOND = 18; } DataTypeId type_id = 1; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 9c80f33d39..56a4114f9a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -233,6 +233,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[LastDay] -> CometLastDay, classOf[Hour] -> CometHour, classOf[MakeDate] -> CometMakeDate, + classOf[MakeDTInterval] -> CometMakeDTInterval, classOf[Minute] -> CometMinute, classOf[NextDay] -> CometNextDay, classOf[Second] -> CometSecond, @@ -367,7 +368,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { def supportedDataType(dt: DataType, allowComplex: Boolean = false): Boolean = dt match { case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType | - _: DecimalType | _: DateType | _: BooleanType | _: NullType => + _: DecimalType | _: DateType | _: DayTimeIntervalType | _: BooleanType | _: NullType => true case dt if isTimeType(dt) => true @@ -406,6 +407,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { case _: MapType => 15 case _: StructType => 16 case dt if isTimeType(dt) => 17 + case _: DayTimeIntervalType => 18 case dt => logWarning(s"Cannot serialize Spark data type: $dt") return None diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index b57b1e4e56..7229d9814b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, ConvertTimezone, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, ConvertTimezone, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, MakeDTInterval, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -434,6 +434,17 @@ object CometNextDay extends CometScalarFunction[NextDay]("next_day") object CometMakeDate extends CometScalarFunction[MakeDate]("make_date") +object CometMakeDTInterval extends CometScalarFunction[MakeDTInterval]("make_dt_interval") { + private val overflowIncompatReason: String = + "make_dt_interval returns NULL on arithmetic overflow, whereas Spark throws " + + "INTERVAL_ARITHMETIC_OVERFLOW" + + override def getIncompatibleReasons(): Seq[String] = Seq(overflowIncompatReason) + + override def getSupportLevel(expr: MakeDTInterval): SupportLevel = + Incompatible(Some(overflowIncompatReason)) +} + object CometSecondsToTimestamp extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp") { override def getSupportLevel(expr: SecondsToTimestamp): SupportLevel = diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 783367c054..30017acd54 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -108,6 +108,7 @@ object Utils extends CometTypeShim with Logging { case yi: ArrowType.Interval if yi.getUnit == IntervalUnit.YEAR_MONTH => YearMonthIntervalType() case di: ArrowType.Interval if di.getUnit == IntervalUnit.DAY_TIME => DayTimeIntervalType() + case d: ArrowType.Duration if d.getUnit == TimeUnit.MICROSECOND => DayTimeIntervalType() case t: ArrowType.Time if t.getUnit == TimeUnit.NANOSECOND && t.getBitWidth == 64 => // scalastyle:off classforname val clazz = Class.forName("org.apache.spark.sql.types.TimeType$") diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/make_dt_interval.sql b/spark/src/test/resources/sql-tests/expressions/datetime/make_dt_interval.sql new file mode 100644 index 0000000000..3440e14c8d --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/make_dt_interval.sql @@ -0,0 +1,64 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Config: spark.comet.expression.MakeDTInterval.allowIncompatible=true + +-- literal arguments, all arities (omitted args default to 0) +query +SELECT make_dt_interval() + +query +SELECT make_dt_interval(1) + +query +SELECT make_dt_interval(1, 2) + +query +SELECT make_dt_interval(1, 2, 3) + +query +SELECT make_dt_interval(1, 2, 3, 4.5) + +-- microsecond precision in the seconds component +query +SELECT make_dt_interval(0, 0, 0, 4.123456) + +-- negative components +query +SELECT make_dt_interval(-1, -2, -3, -4.5) + +-- null-intolerant: any null input yields a null result +query +SELECT make_dt_interval(CAST(NULL AS INT), 2, 3, 4.5) + +query +SELECT make_dt_interval(1, 2, 3, CAST(NULL AS DECIMAL(18, 6))) + +-- column inputs (not constant-folded): exercises the per-row path +statement +CREATE TABLE test_make_dt_interval(id INT, d INT, h INT, m INT, s DECIMAL(18, 6)) USING parquet + +statement +INSERT INTO test_make_dt_interval VALUES + (1, 1, 2, 3, 4.5), + (2, 0, 0, 0, 0), + (3, -1, -2, -3, -4.123456), + (4, 100, 23, 59, 59.999999), + (5, NULL, 2, 3, 4.5) + +query +SELECT id, make_dt_interval(d, h, m, s) FROM test_make_dt_interval diff --git a/spark/src/test/resources/sql-tests/expressions/misc/schema_of_json.sql b/spark/src/test/resources/sql-tests/expressions/misc/schema_of_json.sql new file mode 100644 index 0000000000..919248b55b --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/misc/schema_of_json.sql @@ -0,0 +1,40 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- schema_of_json requires a foldable (literal) JSON argument; its result is a +-- constant schema string. In production, ConstantFolding collapses it to a +-- string literal that runs natively in Comet. This test suite disables +-- ConstantFolding, so the expression survives to the physical plan and Comet +-- falls back. The exact fallback reason differs by Spark version: +-- * Spark 3.4 / 3.5: "schema_of_json is not supported" (plain expression) +-- * Spark 4.0+: "invoke is not supported" (SchemaOfJson is now +-- RuntimeReplaceable -> Invoke on SchemaOfJsonEvaluator) +-- so we assert on the common substring "is not supported". These assertions +-- document the current fallback behavior while still verifying Comet's result +-- matches Spark. If Comet later runs this natively, flip to plain `query`. + +query expect_fallback(is not supported) +SELECT schema_of_json('{"name":"John", "age":30}') + +query expect_fallback(is not supported) +SELECT schema_of_json('{"users":[{"name":"John","scores":[95,87]}]}') + +query expect_fallback(is not supported) +SELECT schema_of_json('[1, 2, 3]') + +query expect_fallback(is not supported) +SELECT schema_of_json('{}')