diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 57b7a3455a..bb748f77e5 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -117,6 +117,7 @@ Expressions that are not Spark-compatible will fall back to Spark by default and | DayOfYear | `dayofyear` | Yes | | | WeekOfYear | `weekofyear` | Yes | | | Quarter | `quarter` | Yes | | +| PreciseTimestampConversion | `window_time` | Yes | Only supports conversions between TimestampType and LongType | ## Math Expressions diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 8c39ba779d..18bb4666a1 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -215,7 +215,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[WeekDay] -> CometWeekDay, classOf[DayOfYear] -> CometDayOfYear, classOf[WeekOfYear] -> CometWeekOfYear, - classOf[Quarter] -> CometQuarter) + classOf[Quarter] -> CometQuarter, + classOf[PreciseTimestampConversion] -> CometPreciseTimestampConversion) private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Cast] -> CometCast) diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 0720f785dd..ef20892575 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,8 +21,8 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} -import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, PreciseTimestampConversion, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, TimestampType} import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -586,3 +586,22 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] { } } } + +object CometPreciseTimestampConversion extends CometExpressionSerde[PreciseTimestampConversion] { + override def getSupportLevel(expr: PreciseTimestampConversion): SupportLevel = { + (expr.fromType, expr.toType) match { + case (TimestampType, LongType) | (LongType, TimestampType) => + Compatible() + case _ => + Unsupported(Some(s"PreciseTimestampConversion from ${expr.fromType} to ${expr.toType}")) + } + } + + override def convert( + expr: PreciseTimestampConversion, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + // Both types are i64 micros in Arrow, so no conversion needed — return child directly. + exprToProtoInternal(expr.child, inputs, binding) + } +} diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql new file mode 100644 index 0000000000..79c7be40c9 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql @@ -0,0 +1,33 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + + +statement +CREATE TABLE test_window_time(time timestamp, value int) USING parquet + +statement +INSERT INTO test_window_time VALUES (timestamp('2023-01-01 12:00:00'), 1), (timestamp('2023-01-01 12:05:00'), 2), (timestamp('2023-01-01 12:15:00'), 3), (NULL, 4) + +-- spark_answer_only: window() uses unsupported CreateNamedStruct and KnownNullable + +-- basic window_time with tumbling window +query spark_answer_only +SELECT max(window_time(window)), sum(value) FROM (SELECT window(time, '10 minutes') AS window, value FROM test_window_time) GROUP BY window + +-- window_time with sliding window +query spark_answer_only +SELECT max(window_time(window)), count(value) FROM (SELECT window(time, '10 minutes', '5 minutes') AS window, value FROM test_window_time) GROUP BY window