Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/contributor-guide/spark_expressions_support.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@
- [x] last_day
- [x] localtimestamp
- [x] make_date
- [ ] make_dt_interval
- [x] make_dt_interval
- [ ] make_interval
- [ ] make_time
- [ ] make_timestamp
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ of expressions that be disabled.
| LastDay | `last_day` |
| LocalTimestamp | `localtimestamp` |
| MakeDate | `make_date` |
| MakeDTInterval | `make_dt_interval` |
| Minute | `minute` |
| NextDay | `next_day` |
| Second | `second` |
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd;
use datafusion_spark::function::datetime::date_sub::SparkDateSub;
use datafusion_spark::function::datetime::from_utc_timestamp::SparkFromUtcTimestamp;
use datafusion_spark::function::datetime::last_day::SparkLastDay;
use datafusion_spark::function::datetime::make_dt_interval::SparkMakeDtInterval;
use datafusion_spark::function::datetime::next_day::SparkNextDay;
use datafusion_spark::function::datetime::to_utc_timestamp::SparkToUtcTimestamp;
use datafusion_spark::function::hash::crc32::SparkCrc32;
Expand Down Expand Up @@ -578,6 +579,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) {
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateSub::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkFromUtcTimestamp::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkLastDay::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkMakeDtInterval::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkNextDay::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkToUtcTimestamp::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default()));
Expand Down
3 changes: 3 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ impl PhysicalPlanner {
DataType::Map(f, s) => DataType::Map(f, s).try_into()?,
DataType::List(f) => DataType::List(f).try_into()?,
DataType::Null => ScalarValue::Null,
DataType::Duration(TimeUnit::Microsecond) => {
ScalarValue::DurationMicrosecond(None)
}
dt => {
return Err(GeneralError(format!("{dt:?} is not supported in Comet")))
}
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub fn to_arrow_datatype(dt_value: &DataType) -> ArrowDataType {
}
_ => unreachable!(),
},
DataTypeId::DurationMicrosecond => ArrowDataType::Duration(TimeUnit::Microsecond),
DataTypeId::Timestamp => {
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string().into()))
}
Expand Down
1 change: 1 addition & 0 deletions native/proto/src/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ message DataType {
LIST = 14;
MAP = 15;
STRUCT = 16;
DURATION_MICROSECOND = 17;
}
DataTypeId type_id = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
classOf[LastDay] -> CometLastDay,
classOf[Hour] -> CometHour,
classOf[MakeDate] -> CometMakeDate,
classOf[MakeDTInterval] -> CometMakeDTInterval,
classOf[Minute] -> CometMinute,
classOf[NextDay] -> CometNextDay,
classOf[Second] -> CometSecond,
Expand Down Expand Up @@ -364,7 +365,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
def supportedDataType(dt: DataType, allowComplex: Boolean = false): Boolean = dt match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType |
_: DecimalType | _: DateType | _: BooleanType | _: NullType =>
_: DecimalType | _: DateType | _: DayTimeIntervalType | _: BooleanType | _: NullType =>
true
case s: StructType if allowComplex =>
s.fields.nonEmpty && s.fields.map(_.dataType).forall(supportedDataType(_, allowComplex))
Expand Down Expand Up @@ -400,6 +401,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
case _: ArrayType => 14
case _: MapType => 15
case _: StructType => 16
case _: DayTimeIntervalType => 17
case dt =>
logWarning(s"Cannot serialize Spark data type: $dt")
return None
Expand Down
4 changes: 3 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.comet.serde

import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, MakeDTInterval, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -413,6 +413,8 @@ object CometNextDay extends CometScalarFunction[NextDay]("next_day")

object CometMakeDate extends CometScalarFunction[MakeDate]("make_date")

object CometMakeDTInterval extends CometScalarFunction[MakeDTInterval]("make_dt_interval")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you implement getSupportLevel and mark this as incompatible. Also, we need getIncompatReasons so we generate documentation.

My AI review helper tells me that Spark's IntervalUtils.makeDayTimeInterval uses Math.addExact / Math.multiplyExact and always throws on overflow:

  catch {
    case _: ArithmeticException =>
      throw QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError(context)
  }

The upstream SparkMakeDtInterval kernel uses checked_mul / checked_add and silently returns NULL on overflow. Inputs that throw INTERVAL_ARITHMETIC_OVERFLOW in Spark will
produce a NULL row in Comet.

Could you verify whether this is true or not? If it is then we can just mark it incompatible for now and file an upstream issue against the datafusion repo.


object CometSecondsToTimestamp
extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp") {
override def getSupportLevel(expr: SecondsToTimestamp): SupportLevel =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ object Utils extends CometTypeShim with Logging {
case yi: ArrowType.Interval if yi.getUnit == IntervalUnit.YEAR_MONTH =>
YearMonthIntervalType()
case di: ArrowType.Interval if di.getUnit == IntervalUnit.DAY_TIME => DayTimeIntervalType()
case d: ArrowType.Duration if d.getUnit == TimeUnit.MICROSECOND => DayTimeIntervalType()
case _ => throw new UnsupportedOperationException(s"Unsupported data type: ${dt.toString}")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- literal arguments, all arities (omitted args default to 0)
query
SELECT make_dt_interval()

query
SELECT make_dt_interval(1)

query
SELECT make_dt_interval(1, 2)

query
SELECT make_dt_interval(1, 2, 3)

query
SELECT make_dt_interval(1, 2, 3, 4.5)

-- microsecond precision in the seconds component
query
SELECT make_dt_interval(0, 0, 0, 4.123456)

-- negative components
query
SELECT make_dt_interval(-1, -2, -3, -4.5)

-- null-intolerant: any null input yields a null result
query
SELECT make_dt_interval(CAST(NULL AS INT), 2, 3, 4.5)

query
SELECT make_dt_interval(1, 2, 3, CAST(NULL AS DECIMAL(18, 6)))

-- column inputs (not constant-folded): exercises the per-row path
statement
CREATE TABLE test_make_dt_interval(id INT, d INT, h INT, m INT, s DECIMAL(18, 6)) USING parquet

statement
INSERT INTO test_make_dt_interval VALUES
(1, 1, 2, 3, 4.5),
(2, 0, 0, 0, 0),
(3, -1, -2, -3, -4.123456),
(4, 100, 23, 59, 59.999999),
(5, NULL, 2, 3, 4.5)

query
SELECT id, make_dt_interval(d, h, m, s) FROM test_make_dt_interval
Loading