Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -942,43 +942,48 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
}

test("size with array input") {
withTempDir { dir =>
withTempView("t1") {
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = true, 100)
spark.read.parquet(path.toString).createOrReplaceTempView("t1")
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
withTempDir { dir =>
withTempView("t1") {
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = true, 100)
spark.read.parquet(path.toString).createOrReplaceTempView("t1")

// Test size function with arrays built from columns (ensures native execution)
checkSparkAnswerAndOperator(
sql("SELECT size(array(_2, _3, _4)) from t1 where _2 is not null order by _2, _3, _4"))
checkSparkAnswerAndOperator(
sql("SELECT size(array(_1)) from t1 where _1 is not null order by _1"))
checkSparkAnswerAndOperator(
sql("SELECT size(array(_2, _3)) from t1 where _2 is null order by _2, _3"))
// Test size function with arrays built from columns (ensures native execution)
checkSparkAnswerAndOperator(
sql(
"SELECT size(array(_2, _3, _4)) from t1 where _2 is not null order by _2, _3, _4"))
checkSparkAnswerAndOperator(
sql("SELECT size(array(_1)) from t1 where _1 is not null order by _1"))
checkSparkAnswerAndOperator(
sql("SELECT size(array(_2, _3)) from t1 where _2 is null order by _2, _3"))

// Test with conditional arrays (forces runtime evaluation)
checkSparkAnswerAndOperator(sql(
"SELECT size(case when _2 > 0 then array(_2, _3, _4) else array(_2) end) from t1 order by _2, _3, _4"))
checkSparkAnswerAndOperator(sql(
"SELECT size(case when _1 then array(_8, _9) else array(_8, _9, _10) end) from t1 order by _1, _8, _9, _10"))
// Test with conditional arrays (forces runtime evaluation)
checkSparkAnswerAndOperator(sql(
"SELECT size(case when _2 > 0 then array(_2, _3, _4) else array(_2) end) from t1 order by _2, _3, _4"))
checkSparkAnswerAndOperator(sql(
"SELECT size(case when _1 then array(_8, _9) else array(_8, _9, _10) end) from t1 order by _1, _8, _9, _10"))

// Test empty arrays using conditional logic to avoid constant folding
checkSparkAnswerAndOperator(sql(
"SELECT size(case when _2 < 0 then array(_2, _3) else array() end) from t1 order by _2, _3"))
// Test empty arrays using conditional logic to avoid constant folding
checkSparkAnswerAndOperator(sql(
"SELECT size(case when _2 < 0 then array(_2, _3) else array() end) from t1 order by _2, _3"))

// Test null arrays using conditional logic
checkSparkAnswerAndOperator(sql(
"SELECT size(case when _2 is null then cast(null as array<int>) else array(_2) end) from t1 order by _2"))
// Test null arrays using conditional logic
checkSparkAnswerAndOperator(sql(
"SELECT size(case when _2 is null then cast(null as array<int>) else array(_2) end) from t1 order by _2"))

// Test with different data types using column references
checkSparkAnswerAndOperator(
sql("SELECT size(array(_8, _9, _10)) from t1 where _8 is not null order by _8, _9, _10")
) // string arrays
checkSparkAnswerAndOperator(
sql(
"SELECT size(array(_2, _3, _4, _5, _6)) from t1 where _2 is not null order by _2, _3, _4, _5, _6"
)
) // int arrays
// Test with different data types using column references
checkSparkAnswerAndOperator(
sql(
"SELECT size(array(_8, _9, _10)) from t1 where _8 is not null order by _8, _9, _10"
)
) // string arrays
checkSparkAnswerAndOperator(
sql(
"SELECT size(array(_2, _3, _4, _5, _6)) from t1 where _2 is not null order by _2, _3, _4, _5, _6"
)
) // int arrays
}
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable.ListBuffer
import scala.util.Random

import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql.{CometTestBase, DataFrame, Row, SaveMode}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.parser.ParseException
Expand All @@ -41,6 +42,11 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {

import testImplicits._

// Casts in this suite predominantly test non-ANSI semantics (silent overflow/null on
// invalid input); tests that target ANSI behavior opt in explicitly via withSQLConf.
override protected def sparkConf: SparkConf =
super.sparkConf.set(SQLConf.ANSI_ENABLED.key, "false")

/** Create a data generator using a fixed seed so that tests are reproducible */
private val gen = DataGenerator.DEFAULT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class CometDateTimeUtilsSuite extends CometTestBase {
test("string to timestamp - invalid formats return null") {
// All of these should produce null (not throw) in non-ANSI mode.
for (tz <- Seq("UTC", "America/Los_Angeles")) {
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) {
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz, SQLConf.ANSI_ENABLED.key -> "false") {
checkCastToTimestamp(
Seq(
"238",
Expand Down Expand Up @@ -210,7 +210,9 @@ class CometDateTimeUtilsSuite extends CometTestBase {

// "SPARK-35780: support full range of timestamp string"
test("SPARK-35780: full range of timestamp string") {
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
withSQLConf(
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC",
SQLConf.ANSI_ENABLED.key -> "false") {
// Normal-range cases: collect() as TimestampType directly.
checkCastToTimestamp(
Seq(
Expand Down Expand Up @@ -262,16 +264,21 @@ class CometDateTimeUtilsSuite extends CometTestBase {
}

test("SPARK-15379: invalid calendar dates in string to date cast") {
// Feb 29 on a non-leap year and Apr 31 must produce null for both DATE and TIMESTAMP.
checkCastToDate(Seq("2015-02-29 00:00:00", "2015-04-31 00:00:00", "2015-02-29", "2015-04-31"))
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
// Feb 29 on a non-leap year and Apr 31 must produce null for both DATE and TIMESTAMP.
checkCastToDate(
Seq("2015-02-29 00:00:00", "2015-04-31 00:00:00", "2015-02-29", "2015-04-31"))

checkCastToTimestamp(
Seq("2015-02-29 00:00:00", "2015-04-31 00:00:00", "2015-02-29", "2015-04-31"))
checkCastToTimestamp(
Seq("2015-02-29 00:00:00", "2015-04-31 00:00:00", "2015-02-29", "2015-04-31"))
}
}

test("trailing characters while converting string to timestamp") {
// Garbage after a valid ISO timestamp must make the whole value null.
checkCastToTimestamp(Seq("2019-10-31T10:59:23Z:::"))
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
checkCastToTimestamp(Seq("2019-10-31T10:59:23Z:::"))
}
}

test("DST spring-forward gap and fall-back overlap") {
Expand Down
85 changes: 50 additions & 35 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("compare true/false to negative zero") {
Seq(false, true).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
withSQLConf(
SQLConf.ANSI_ENABLED.key -> "false",
"parquet.enable.dictionary" -> dictionary.toString) {
val table = "test"
withTable(table) {
sql(s"create table $table(col1 boolean, col2 float) using parquet")
Expand All @@ -153,6 +155,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
test("decimals divide by zero") {
Seq(true, false).foreach { dictionary =>
withSQLConf(
SQLConf.ANSI_ENABLED.key -> "false",
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false",
"parquet.enable.dictionary" -> dictionary.toString) {
withTempPath { dir =>
Expand All @@ -172,12 +175,14 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

test("Integral Division Overflow Handling Matches Spark Behavior") {
withTable("t1") {
val value = Long.MinValue
sql("create table t1(c1 long, c2 short) using parquet")
sql(s"insert into t1 values($value, -1)")
val res = sql("select c1 div c2 from t1 order by c1")
checkSparkAnswerAndOperator(res)
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
withTable("t1") {
val value = Long.MinValue
sql("create table t1(c1 long, c2 short) using parquet")
sql(s"insert into t1 values($value, -1)")
val res = sql("select c1 div c2 from t1 order by c1")
checkSparkAnswerAndOperator(res)
}
}
}

Expand Down Expand Up @@ -402,19 +407,21 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

test("try_divide") {
val data = Seq((15121991, 0))
withParquetTable(data, "tbl") {
checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl")
checkSparkAnswerAndOperator("""
|SELECT
| try_divide(10, 0),
| try_divide(NULL, 5),
| try_divide(5, NULL),
| try_divide(-2147483648, -1),
| try_divide(-9223372036854775808, -1),
| try_divide(DECIMAL('9999999999999999999999999999'), 0.1)
| from tbl
|""".stripMargin)
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
val data = Seq((15121991, 0))
withParquetTable(data, "tbl") {
checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl")
checkSparkAnswerAndOperator("""
|SELECT
| try_divide(10, 0),
| try_divide(NULL, 5),
| try_divide(5, NULL),
| try_divide(-2147483648, -1),
| try_divide(-9223372036854775808, -1),
| try_divide(DECIMAL('9999999999999999999999999999'), 0.1)
| from tbl
|""".stripMargin)
}
}
}

Expand Down Expand Up @@ -1695,6 +1702,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
test("from_unixtime") {
Seq(false, true).foreach { dictionary =>
withSQLConf(
SQLConf.ANSI_ENABLED.key -> "false",
"parquet.enable.dictionary" -> dictionary.toString,
CometConf.getExprAllowIncompatConfigKey(classOf[FromUnixTime]) -> "true") {
val table = "test"
Expand Down Expand Up @@ -1734,6 +1742,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
test("Decimal binary ops multiply is aligned to Spark") {
Seq(true, false).foreach { allowPrecisionLoss =>
withSQLConf(
SQLConf.ANSI_ENABLED.key -> "false",
"spark.sql.decimalOperations.allowPrecisionLoss" -> allowPrecisionLoss.toString) {

testSingleLineQuery(
Expand Down Expand Up @@ -1828,6 +1837,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
sql(s"insert into $table values $values")
Seq(true, false).foreach { allowPrecisionLoss =>
withSQLConf(
SQLConf.ANSI_ENABLED.key -> "false",
"spark.sql.decimalOperations.allowPrecisionLoss" -> allowPrecisionLoss.toString) {
val a = makeNum(p1, s1)
val b = makeNum(p2, s2)
Expand Down Expand Up @@ -1857,15 +1867,17 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

// Supported boolean values as true by both Arrow and Spark
testCastedColumn(inputValues = Seq("t", "true", "y", "yes", "1", "T", "TrUe", "Y", "YES"))
// Supported boolean values as false by both Arrow and Spark
testCastedColumn(inputValues = Seq("f", "false", "n", "no", "0", "F", "FaLSe", "N", "No"))
// Supported boolean values by Arrow but not Spark
testCastedColumn(inputValues =
Seq("TR", "FA", "tr", "tru", "ye", "on", "fa", "fal", "fals", "of", "off"))
// Invalid boolean casting values for Arrow and Spark
testCastedColumn(inputValues = Seq("car", "Truck"))
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
// Supported boolean values as true by both Arrow and Spark
testCastedColumn(inputValues = Seq("t", "true", "y", "yes", "1", "T", "TrUe", "Y", "YES"))
// Supported boolean values as false by both Arrow and Spark
testCastedColumn(inputValues = Seq("f", "false", "n", "no", "0", "F", "FaLSe", "N", "No"))
// Supported boolean values by Arrow but not Spark
testCastedColumn(inputValues =
Seq("TR", "FA", "tr", "tru", "ye", "on", "fa", "fal", "fals", "of", "off"))
// Invalid boolean casting values for Arrow and Spark
testCastedColumn(inputValues = Seq("car", "Truck"))
}
}

test("explain comet") {
Expand Down Expand Up @@ -3027,12 +3039,15 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

test("test length function") {
withTable("t1") {
sql(
"create table t1 using parquet as select cast(id as string) as c1, cast(id as binary) as c2 from range(10)")
// FIXME: Change checkSparkAnswer to checkSparkAnswerAndOperator after resolving
// https://github.com/apache/datafusion-comet/issues/2348
checkSparkAnswer("select length(c1), length(c2) AS x FROM t1 ORDER BY c1")
// cast(id as binary) is rejected by Spark 4 ANSI analyzer
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
withTable("t1") {
sql(
"create table t1 using parquet as select cast(id as string) as c1, cast(id as binary) as c2 from range(10)")
// FIXME: Change checkSparkAnswer to checkSparkAnswerAndOperator after resolving
// https://github.com/apache/datafusion-comet/issues/2348
checkSparkAnswer("select length(c1), length(c2) AS x FROM t1 ORDER BY c1")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@

package org.apache.comet

import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType}

class CometFuzzMathSuite extends CometFuzzTestBase {

// The integer-math fuzz tests intentionally cover overflowing inputs; under Spark 4's
// default ANSI mode those would throw rather than produce a result for both Spark and
// Comet to compare against. Pin ANSI off so the parity comparison can run.
override protected def sparkConf: SparkConf =
super.sparkConf.set(SQLConf.ANSI_ENABLED.key, "false")

for (op <- Seq("+", "-", "*", "/", "div")) {
test(s"integer math: $op") {
val df = spark.read.parquet(filename)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
class CometMathExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("abs") {
val df = createTestData(generateNegativeZero = false)
df.createOrReplaceTempView("tbl")
for (field <- df.schema.fields) {
val col = field.name
checkSparkAnswerAndOperator(s"SELECT $col, abs($col) FROM tbl ORDER BY $col")
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
val df = createTestData(generateNegativeZero = false)
df.createOrReplaceTempView("tbl")
for (field <- df.schema.fields) {
val col = field.name
checkSparkAnswerAndOperator(s"SELECT $col, abs($col) FROM tbl ORDER BY $col")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.io.File

import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.internal.SQLConf

class CometSqlFileTestSuite extends CometTestBase with AdaptiveSparkPlanHelper {

Expand Down Expand Up @@ -65,8 +66,13 @@ class CometSqlFileTestSuite extends CometTestBase with AdaptiveSparkPlanHelper {
"spark.sql.optimizer.excludedRules" ->
"org.apache.spark.sql.catalyst.optimizer.ConstantFolding")

// Most SQL fixtures here predate Spark 4 ANSI default and expect non-ANSI semantics
// (silent overflow/null on bad input). Individual files can opt in via their own
// --CONFIG line, which appears later in the pair list and wins.
private val ansiDisabled = Seq(SQLConf.ANSI_ENABLED.key -> "false")

private def runTestFile(relativePath: String, file: SqlTestFile): Unit = {
val allConfigs = file.configs ++ constantFoldingExcluded
val allConfigs = ansiDisabled ++ file.configs ++ constantFoldingExcluded
withSQLConf(allConfigs: _*) {
withTable(file.tables: _*) {
file.records.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package org.apache.comet.exec
import scala.util.Random

import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.optimizer.EliminateSorts
Expand All @@ -42,6 +43,11 @@ import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, ParquetGener
class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
import testImplicits._

// Several aggregate tests exercise overflow behavior expected to wrap around silently;
// ANSI-mode variants opt in to ANSI explicitly via withSQLConf.
override protected def sparkConf: SparkConf =
super.sparkConf.set(SQLConf.ANSI_ENABLED.key, "false")

test("min/max floating point with negative zero") {
val r = new Random(42)
val schema = StructType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3594,6 +3594,8 @@ class CometExecSuite extends CometTestBase {
"struct(id)").foreach { valueType =>
{
withSQLConf(
// cast(id as tinyint) overflows for id >= 128, which throws under ANSI
SQLConf.ANSI_ENABLED.key -> "false",
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false",
CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true",
Expand Down
Loading
Loading