From ba8d237a4d575db6b7c69d5312e2fcdf52f87553 Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Thu, 14 May 2026 18:38:37 -0400 Subject: [PATCH 1/7] feat: support DayTimeIntervalType --- native/core/src/execution/serde.rs | 1 + native/proto/src/proto/types.proto | 1 + .../src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/serde.rs b/native/core/src/execution/serde.rs index 5d60288f68..2ca1eb0798 100644 --- a/native/core/src/execution/serde.rs +++ b/native/core/src/execution/serde.rs @@ -91,6 +91,7 @@ pub fn to_arrow_datatype(dt_value: &DataType) -> ArrowDataType { } _ => unreachable!(), }, + DataTypeId::DurationMicrosecond => ArrowDataType::Duration(TimeUnit::Microsecond), DataTypeId::Timestamp => { ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string().into())) } diff --git a/native/proto/src/proto/types.proto b/native/proto/src/proto/types.proto index fec972a8f0..ab36e8e8f3 100644 --- a/native/proto/src/proto/types.proto +++ b/native/proto/src/proto/types.proto @@ -59,6 +59,7 @@ message DataType { LIST = 14; MAP = 15; STRUCT = 16; + DURATION_MICROSECOND = 17; } DataTypeId type_id = 1; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index dced2e4da8..eecf45d17e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -361,7 +361,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { def supportedDataType(dt: DataType, allowComplex: Boolean = false): Boolean = dt match { case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType | - _: DecimalType | _: DateType | _: BooleanType | _: NullType => + _: DecimalType | _: DateType | _: DayTimeIntervalType | _: BooleanType | _: NullType => true case s: StructType if allowComplex => s.fields.nonEmpty && s.fields.map(_.dataType).forall(supportedDataType(_, allowComplex)) @@ -397,6 +397,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { case _: ArrayType => 14 case _: MapType => 15 case _: StructType => 16 + case _: DayTimeIntervalType => 17 case dt => logWarning(s"Cannot serialize Spark data type: $dt") return None From 37032090ff1369e9500e679ffc446a1ef6fde9e6 Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Thu, 14 May 2026 20:22:46 -0400 Subject: [PATCH 2/7] feat: serde DayTimeInterval --- native/core/src/execution/jni_api.rs | 2 ++ .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 1 + spark/src/main/scala/org/apache/comet/serde/datetime.scala | 4 +++- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index f5b04cc51d..14eeb9d229 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -49,6 +49,7 @@ use datafusion_spark::function::bitwise::bitwise_not::SparkBitwiseNot; use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::datetime::last_day::SparkLastDay; +use datafusion_spark::function::datetime::make_dt_interval::SparkMakeDtInterval; use datafusion_spark::function::datetime::next_day::SparkNextDay; use datafusion_spark::function::hash::crc32::SparkCrc32; use datafusion_spark::function::hash::sha1::SparkSha1; @@ -574,6 +575,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateAdd::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateSub::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkLastDay::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkMakeDtInterval::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkNextDay::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index eecf45d17e..79603f46a8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -227,6 +227,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[LastDay] -> CometLastDay, classOf[Hour] -> CometHour, classOf[MakeDate] -> CometMakeDate, + classOf[MakeDTInterval] -> CometMakeDTInterval, classOf[Minute] -> CometMinute, classOf[NextDay] -> CometNextDay, classOf[Second] -> CometSecond, diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index cb3be75717..e998d68789 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, MakeDTInterval, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -368,6 +368,8 @@ object CometNextDay extends CometScalarFunction[NextDay]("next_day") object CometMakeDate extends CometScalarFunction[MakeDate]("make_date") +object CometMakeDTInterval extends CometScalarFunction[MakeDTInterval]("make_dt_interval") + object CometSecondsToTimestamp extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp") { override def getSupportLevel(expr: SecondsToTimestamp): SupportLevel = From 7db92a7853f3936a3b84e4394329b5c0bfcf955d Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Thu, 14 May 2026 20:23:48 -0400 Subject: [PATCH 3/7] test: add make_dt_interval sql --- .../expressions/datetime/make_dt_interval.sql | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/make_dt_interval.sql diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/make_dt_interval.sql b/spark/src/test/resources/sql-tests/expressions/datetime/make_dt_interval.sql new file mode 100644 index 0000000000..5a50bc4559 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/make_dt_interval.sql @@ -0,0 +1,62 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- literal arguments, all arities (omitted args default to 0) +query +SELECT make_dt_interval() + +query +SELECT make_dt_interval(1) + +query +SELECT make_dt_interval(1, 2) + +query +SELECT make_dt_interval(1, 2, 3) + +query +SELECT make_dt_interval(1, 2, 3, 4.5) + +-- microsecond precision in the seconds component +query +SELECT make_dt_interval(0, 0, 0, 4.123456) + +-- negative components +query +SELECT make_dt_interval(-1, -2, -3, -4.5) + +-- null-intolerant: any null input yields a null result +query +SELECT make_dt_interval(CAST(NULL AS INT), 2, 3, 4.5) + +query +SELECT make_dt_interval(1, 2, 3, CAST(NULL AS DECIMAL(18, 6))) + +-- column inputs (not constant-folded): exercises the per-row path +statement +CREATE TABLE test_make_dt_interval(id INT, d INT, h INT, m INT, s DECIMAL(18, 6)) USING parquet + +statement +INSERT INTO test_make_dt_interval VALUES + (1, 1, 2, 3, 4.5), + (2, 0, 0, 0, 0), + (3, -1, -2, -3, -4.123456), + (4, 100, 23, 59, 59.999999), + (5, NULL, 2, 3, 4.5) + +query +SELECT id, make_dt_interval(d, h, m, s) FROM test_make_dt_interval From a15824b9d8cc192ba527b7e2164ac04f4cfd1c2f Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Thu, 14 May 2026 20:27:10 -0400 Subject: [PATCH 4/7] feat: support null for Duration --- native/core/src/execution/planner.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index b00f140026..0f1eea28c2 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -355,6 +355,9 @@ impl PhysicalPlanner { DataType::Map(f, s) => DataType::Map(f, s).try_into()?, DataType::List(f) => DataType::List(f).try_into()?, DataType::Null => ScalarValue::Null, + DataType::Duration(TimeUnit::Microsecond) => { + ScalarValue::DurationMicrosecond(None) + } dt => { return Err(GeneralError(format!("{dt:?} is not supported in Comet"))) } From 7969297585164eb8df0e7ca620e34097d2f2bb56 Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Thu, 14 May 2026 20:29:15 -0400 Subject: [PATCH 5/7] feat: support duration microsecond from ArrowType --- .../src/main/scala/org/apache/spark/sql/comet/util/Utils.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 78f2e81c7c..dd5574507b 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -108,6 +108,7 @@ object Utils extends CometTypeShim with Logging { case yi: ArrowType.Interval if yi.getUnit == IntervalUnit.YEAR_MONTH => YearMonthIntervalType() case di: ArrowType.Interval if di.getUnit == IntervalUnit.DAY_TIME => DayTimeIntervalType() + case d: ArrowType.Duration if d.getUnit == TimeUnit.MICROSECOND => DayTimeIntervalType() case _ => throw new UnsupportedOperationException(s"Unsupported data type: ${dt.toString}") } From 6cd5da5cc2d4d9c679a98117b18eb185e5fa5037 Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Thu, 14 May 2026 20:53:28 -0400 Subject: [PATCH 6/7] docs: support make_dt_interval --- docs/source/contributor-guide/spark_expressions_support.md | 2 +- docs/source/user-guide/latest/expressions.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/source/contributor-guide/spark_expressions_support.md b/docs/source/contributor-guide/spark_expressions_support.md index cb27a439b9..38ae1daa29 100644 --- a/docs/source/contributor-guide/spark_expressions_support.md +++ b/docs/source/contributor-guide/spark_expressions_support.md @@ -243,7 +243,7 @@ - [x] last_day - [ ] localtimestamp - [x] make_date -- [ ] make_dt_interval +- [x] make_dt_interval - [ ] make_interval - [ ] make_time - [ ] make_timestamp diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 668081d257..35606b4ab4 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -112,6 +112,7 @@ of expressions that be disabled. | Hour | `hour` | | LastDay | `last_day` | | MakeDate | `make_date` | +| MakeDTInterval | `make_dt_interval` | | Minute | `minute` | | NextDay | `next_day` | | Second | `second` | From 8cf1c5b8494a97889023d3f069a5c0303925694c Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Wed, 20 May 2026 15:38:31 -0400 Subject: [PATCH 7/7] feat: mark make_dt_interval incompatible --- .../main/scala/org/apache/comet/serde/datetime.scala | 11 ++++++++++- .../expressions/datetime/make_dt_interval.sql | 2 ++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 54f32639c2..7229d9814b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -434,7 +434,16 @@ object CometNextDay extends CometScalarFunction[NextDay]("next_day") object CometMakeDate extends CometScalarFunction[MakeDate]("make_date") -object CometMakeDTInterval extends CometScalarFunction[MakeDTInterval]("make_dt_interval") +object CometMakeDTInterval extends CometScalarFunction[MakeDTInterval]("make_dt_interval") { + private val overflowIncompatReason: String = + "make_dt_interval returns NULL on arithmetic overflow, whereas Spark throws " + + "INTERVAL_ARITHMETIC_OVERFLOW" + + override def getIncompatibleReasons(): Seq[String] = Seq(overflowIncompatReason) + + override def getSupportLevel(expr: MakeDTInterval): SupportLevel = + Incompatible(Some(overflowIncompatReason)) +} object CometSecondsToTimestamp extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp") { diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/make_dt_interval.sql b/spark/src/test/resources/sql-tests/expressions/datetime/make_dt_interval.sql index 5a50bc4559..3440e14c8d 100644 --- a/spark/src/test/resources/sql-tests/expressions/datetime/make_dt_interval.sql +++ b/spark/src/test/resources/sql-tests/expressions/datetime/make_dt_interval.sql @@ -15,6 +15,8 @@ -- specific language governing permissions and limitations -- under the License. +-- Config: spark.comet.expression.MakeDTInterval.allowIncompatible=true + -- literal arguments, all arities (omitted args default to 0) query SELECT make_dt_interval()