From 0d2f5794e680f92abccb2d932e7134e2bb9cf9e5 Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Thu, 19 Mar 2026 22:34:49 +0800 Subject: [PATCH 1/5] feat: Support Spark expression window time --- .../apache/comet/serde/QueryPlanSerde.scala | 3 ++- .../org/apache/comet/serde/datetime.scala | 27 ++++++++++++++++++- .../apache/comet/CometExpressionSuite.scala | 23 ++++++++++++++++ 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 8c39ba779d..18bb4666a1 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -215,7 +215,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[WeekDay] -> CometWeekDay, classOf[DayOfYear] -> CometDayOfYear, classOf[WeekOfYear] -> CometWeekOfYear, - classOf[Quarter] -> CometQuarter) + classOf[Quarter] -> CometQuarter, + classOf[PreciseTimestampConversion] -> CometPreciseTimestampConversion) private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Cast] -> CometCast) diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 0720f785dd..60cc3414ef 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, PreciseTimestampConversion, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -586,3 +586,28 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] { } } } + +object CometPreciseTimestampConversion extends CometExpressionSerde[PreciseTimestampConversion] { + override def convert( + expr: PreciseTimestampConversion, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + // PreciseTimestampConversion reinterprets between LongType and TimestampType + // without changing the underlying microsecond value, so a simple cast suffices. + for { + childExpr <- exprToProtoInternal(expr.child, inputs, binding) + dt <- serializeDataType(expr.toType) + } yield { + ExprOuterClass.Expr + .newBuilder() + .setCast( + ExprOuterClass.Cast + .newBuilder() + .setChild(childExpr) + .setDatatype(dt) + .setEvalMode(ExprOuterClass.EvalMode.LEGACY) + .setAllowIncompat(false)) + .build() + } + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 68c1a82f14..e7d602b2ff 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2992,4 +2992,27 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("window_time") { + withTable("t1") { + sql("create table t1(time timestamp, value int) using parquet") + sql( + "insert into t1 values" + + "(cast('2023-01-01 12:00:00' as timestamp), 1)," + + "(cast('2023-01-01 12:05:00' as timestamp), 2)," + + "(cast('2023-01-01 12:15:00' as timestamp), 3)") + + // basic window_time with aggregation + checkSparkAnswer( + "select max(window_time(window)), sum(value) " + + "from (select window(time, '10 minutes') as window, value from t1) " + + "group by window") + + // window_time with sliding window + checkSparkAnswer( + "select max(window_time(window)), count(value) " + + "from (select window(time, '10 minutes', '5 minutes') as window, value from t1) " + + "group by window") + } + } + } From 9046b80abb70685ad7d4fc0a81e5afa54d708e95 Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Fri, 20 Mar 2026 13:54:54 +0800 Subject: [PATCH 2/5] add SQL test and fix error --- .../org/apache/comet/serde/datetime.scala | 30 ++++++++--------- .../expressions/datetime/window_time.sql | 32 +++++++++++++++++++ .../apache/comet/CometExpressionSuite.scala | 23 ------------- 3 files changed, 45 insertions(+), 40 deletions(-) create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 60cc3414ef..539e1cdd52 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -22,7 +22,7 @@ package org.apache.comet.serde import java.util.Locale import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, PreciseTimestampConversion, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} -import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, TimestampType} import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -588,26 +588,22 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] { } object CometPreciseTimestampConversion extends CometExpressionSerde[PreciseTimestampConversion] { + override def getSupportLevel(expr: PreciseTimestampConversion): SupportLevel = { + (expr.fromType, expr.toType) match { + case (TimestampType, LongType) | (LongType, TimestampType) => + Compatible() + case _ => + Unsupported(Some(s"PreciseTimestampConversion from ${expr.fromType} to ${expr.toType}")) + } + } + override def convert( expr: PreciseTimestampConversion, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { // PreciseTimestampConversion reinterprets between LongType and TimestampType - // without changing the underlying microsecond value, so a simple cast suffices. - for { - childExpr <- exprToProtoInternal(expr.child, inputs, binding) - dt <- serializeDataType(expr.toType) - } yield { - ExprOuterClass.Expr - .newBuilder() - .setCast( - ExprOuterClass.Cast - .newBuilder() - .setChild(childExpr) - .setDatatype(dt) - .setEvalMode(ExprOuterClass.EvalMode.LEGACY) - .setAllowIncompat(false)) - .build() - } + // without changing the underlying microsecond value. In Arrow, both types + // share the same i64 representation, so we simply return the child expression. + exprToProtoInternal(expr.child, inputs, binding) } } diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql new file mode 100644 index 0000000000..d0df1e8dd4 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql @@ -0,0 +1,32 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true + +statement +CREATE TABLE test_window_time(time timestamp, value int) USING parquet + +statement +INSERT INTO test_window_time VALUES (timestamp('2023-01-01 12:00:00'), 1), (timestamp('2023-01-01 12:05:00'), 2), (timestamp('2023-01-01 12:15:00'), 3) + +-- basic window_time with tumbling window +query spark_answer_only +SELECT max(window_time(window)), sum(value) FROM (SELECT window(time, '10 minutes') AS window, value FROM test_window_time) GROUP BY window + +-- window_time with sliding window +query spark_answer_only +SELECT max(window_time(window)), count(value) FROM (SELECT window(time, '10 minutes', '5 minutes') AS window, value FROM test_window_time) GROUP BY window diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index e7d602b2ff..68c1a82f14 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2992,27 +2992,4 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("window_time") { - withTable("t1") { - sql("create table t1(time timestamp, value int) using parquet") - sql( - "insert into t1 values" + - "(cast('2023-01-01 12:00:00' as timestamp), 1)," + - "(cast('2023-01-01 12:05:00' as timestamp), 2)," + - "(cast('2023-01-01 12:15:00' as timestamp), 3)") - - // basic window_time with aggregation - checkSparkAnswer( - "select max(window_time(window)), sum(value) " + - "from (select window(time, '10 minutes') as window, value from t1) " + - "group by window") - - // window_time with sliding window - checkSparkAnswer( - "select max(window_time(window)), count(value) " + - "from (select window(time, '10 minutes', '5 minutes') as window, value from t1) " + - "group by window") - } - } - } From 1e4201afdf397d6d032c85bef2f2a7612ecbceef Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Fri, 20 Mar 2026 13:58:01 +0800 Subject: [PATCH 3/5] Simplify annotations --- spark/src/main/scala/org/apache/comet/serde/datetime.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 539e1cdd52..ef20892575 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -601,9 +601,7 @@ object CometPreciseTimestampConversion extends CometExpressionSerde[PreciseTimes expr: PreciseTimestampConversion, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - // PreciseTimestampConversion reinterprets between LongType and TimestampType - // without changing the underlying microsecond value. In Arrow, both types - // share the same i64 representation, so we simply return the child expression. + // Both types are i64 micros in Arrow, so no conversion needed — return child directly. exprToProtoInternal(expr.child, inputs, binding) } } From 83f83abedd2ad0846ae2ba04e196a2728126c8fd Mon Sep 17 00:00:00 2001 From: ChenChen Lai <72776271+0lai0@users.noreply.github.com> Date: Sat, 21 Mar 2026 00:12:16 +0800 Subject: [PATCH 4/5] Apply suggestion from @andygrove Co-authored-by: Andy Grove --- .../resources/sql-tests/expressions/datetime/window_time.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql index d0df1e8dd4..58083d4784 100644 --- a/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql +++ b/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql @@ -15,7 +15,6 @@ -- specific language governing permissions and limitations -- under the License. --- ConfigMatrix: parquet.enable.dictionary=false,true statement CREATE TABLE test_window_time(time timestamp, value int) USING parquet From 4bd328b2c2d4513463a2bea2a5779ad50f03b693 Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Sat, 21 Mar 2026 00:46:59 +0800 Subject: [PATCH 5/5] add list of supported expressions to expressions.md and add test --- docs/source/user-guide/latest/expressions.md | 1 + .../resources/sql-tests/expressions/datetime/window_time.sql | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 57b7a3455a..bb748f77e5 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -117,6 +117,7 @@ Expressions that are not Spark-compatible will fall back to Spark by default and | DayOfYear | `dayofyear` | Yes | | | WeekOfYear | `weekofyear` | Yes | | | Quarter | `quarter` | Yes | | +| PreciseTimestampConversion | `window_time` | Yes | Only supports conversions between TimestampType and LongType | ## Math Expressions diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql index 58083d4784..79c7be40c9 100644 --- a/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql +++ b/spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql @@ -20,7 +20,9 @@ statement CREATE TABLE test_window_time(time timestamp, value int) USING parquet statement -INSERT INTO test_window_time VALUES (timestamp('2023-01-01 12:00:00'), 1), (timestamp('2023-01-01 12:05:00'), 2), (timestamp('2023-01-01 12:15:00'), 3) +INSERT INTO test_window_time VALUES (timestamp('2023-01-01 12:00:00'), 1), (timestamp('2023-01-01 12:05:00'), 2), (timestamp('2023-01-01 12:15:00'), 3), (NULL, 4) + +-- spark_answer_only: window() uses unsupported CreateNamedStruct and KnownNullable -- basic window_time with tumbling window query spark_answer_only