From dd394fc2f9054687c5d47ba76a6997171e7d116a Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 27 Feb 2026 01:15:07 +0800 Subject: [PATCH 1/2] [SPARK-55716][SQL] Enforce NOT NULL constraints for V1 file source table inserts V1 file-based DataSource writes (parquet/orc/json) silently accept null values into NOT NULL columns. This PR adds opt-in NOT NULL constraint enforcement by: 1. CreateDataSourceTableCommand: Preserve user-specified nullability by recursively merging nullability flags from the user schema into the resolved dataSource.schema. 2. PreprocessTableInsertion: Restore nullability flags from the catalog schema before null checks. This ensures AssertNotNull is injected when needed, gated by spark.sql.fileSource.insert.enforceNotNull. 3. Config: spark.sql.fileSource.insert.enforceNotNull (default false) - when true, enforces NOT NULL constraints for file-based tables. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../apache/spark/sql/internal/SQLConf.scala | 11 ++ .../command/createDataSourceTables.scala | 46 ++++- .../sql/execution/datasources/rules.scala | 54 +++++- .../command/v1/ShowCreateTableSuite.scala | 2 +- .../spark/sql/sources/InsertSuite.scala | 182 ++++++++++++++++++ .../SparkGetColumnsOperation.scala | 2 +- .../SparkMetadataOperationSuite.scala | 12 +- 7 files changed, 298 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b2a2b7027394f..01a01cf8ca534 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4743,6 +4743,17 @@ object SQLConf { .enumConf(StoreAssignmentPolicy) .createWithDefault(StoreAssignmentPolicy.ANSI) + val FILE_SOURCE_INSERT_ENFORCE_NOT_NULL = + buildConf("spark.sql.fileSource.insert.enforceNotNull") + .internal() + .doc("When true, Spark enforces NOT NULL constraints when inserting data into " + + "file-based data source tables (e.g., Parquet, ORC, JSON), consistent with the " + + "behavior for other data sources and V2 catalog tables. " + + "When false (default), null values are silently accepted into NOT NULL columns.") + .version("4.2.0") + .booleanConf + .createWithDefault(false) + val ANSI_ENABLED = buildConf(SqlApiConfHelper.ANSI_ENABLED_KEY) .doc("When true, Spark SQL uses an ANSI compliant dialect instead of being Hive compliant. " + "For example, Spark will throw an exception at runtime instead of returning null results " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 5ef19b832f5b3..2edac5b0179bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.CommandExecutionMode import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.BaseRelation -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} import org.apache.spark.util.ArrayImplicits._ /** @@ -107,8 +107,17 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo table.copy(schema = new StructType(), partitionColumnNames = Nil) case _ => + // Merge nullability from the user-specified schema into the resolved schema. + // DataSource.resolveRelation() calls dataSchema.asNullable which strips NOT NULL + // constraints. We restore nullability from the original user schema while keeping + // the resolved data types (which may include CharVarchar normalization, metadata, etc.) + val resolvedSchema = if (table.schema.nonEmpty) { + restoreNullability(dataSource.schema, table.schema) + } else { + dataSource.schema + } table.copy( - schema = dataSource.schema, + schema = resolvedSchema, partitionColumnNames = partitionColumnNames, // If metastore partition management for file source tables is enabled, we start off with // partition provider hive, but no partitions in the metastore. The user has to call @@ -122,6 +131,39 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo Seq.empty[Row] } + + /** + * Recursively restores nullability from the original user-specified schema into + * the resolved schema. The resolved schema's data types are preserved (they may + * contain CharVarchar normalization, metadata, etc.), but nullability flags + * (top-level and nested) are taken from the original schema. + */ + private def restoreNullability(resolved: StructType, original: StructType): StructType = { + val originalFields = original.fields.map(f => f.name -> f).toMap + StructType(resolved.fields.map { resolvedField => + originalFields.get(resolvedField.name) match { + case Some(origField) => + resolvedField.copy( + nullable = origField.nullable, + dataType = restoreDataTypeNullability(resolvedField.dataType, origField.dataType)) + case None => resolvedField + } + }) + } + + private def restoreDataTypeNullability(resolved: DataType, original: DataType): DataType = { + (resolved, original) match { + case (r: StructType, o: StructType) => restoreNullability(r, o) + case (ArrayType(rElem, _), ArrayType(oElem, oNull)) => + ArrayType(restoreDataTypeNullability(rElem, oElem), oNull) + case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) => + MapType( + restoreDataTypeNullability(rKey, oKey), + restoreDataTypeNullability(rVal, oVal), + oValNull) + case _ => resolved + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index f097e1aa63799..acb113f81bacd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.InsertableRelation -import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, MetadataBuilder, StructField, StructType} import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.ArrayImplicits._ @@ -470,7 +470,29 @@ object PreprocessTableInsertion extends ResolveInsertionBase { insert.partitionSpec, partColNames, tblName, conf.resolver) val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet - val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name)) + val expectedColumns = { + val cols = insert.table.output.filterNot(a => staticPartCols.contains(a.name)) + // When the legacy config is disabled, restore the original nullability from the + // catalog table schema. HadoopFsRelation forces dataSchema.asNullable for safe reads, + // which strips NOT NULL constraints (both top-level and nested) from the + // LogicalRelation output. We restore nullability so that AssertNotNull checks are + // properly injected. + if (conf.getConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL)) { + catalogTable.map { ct => + val catalogFields = ct.schema.fields.map(f => f.name -> f).toMap + cols.map { col => + catalogFields.get(col.name) match { + case Some(field) => + col.withNullability(field.nullable) + .withDataType(restoreDataTypeNullability(col.dataType, field.dataType)) + case None => col + } + } + }.getOrElse(cols) + } else { + cols + } + } val partitionsTrackedByCatalog = catalogTable.isDefined && catalogTable.get.partitionColumnNames.nonEmpty && @@ -546,6 +568,34 @@ object PreprocessTableInsertion extends ResolveInsertionBase { case _ => i } } + + /** + * Recursively restores nullability flags from the original data type into the resolved + * data type, keeping the resolved type structure intact. + */ + private def restoreDataTypeNullability(resolved: DataType, original: DataType): DataType = { + (resolved, original) match { + case (r: StructType, o: StructType) => + val origFields = o.fields.map(f => f.name -> f).toMap + StructType(r.fields.map { rf => + origFields.get(rf.name) match { + case Some(of) => + rf.copy( + nullable = of.nullable, + dataType = restoreDataTypeNullability(rf.dataType, of.dataType)) + case None => rf + } + }) + case (ArrayType(rElem, _), ArrayType(oElem, oNull)) => + ArrayType(restoreDataTypeNullability(rElem, oElem), oNull) + case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) => + MapType( + restoreDataTypeNullability(rKey, oKey), + restoreDataTypeNullability(rVal, oVal), + oValNull) + case _ => resolved + } + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala index e65bf1c72bb62..95b539e58ac6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala @@ -185,7 +185,7 @@ trait ShowCreateTableSuiteBase extends command.ShowCreateTableSuiteBase val showDDL = getShowCreateDDL(t) assert(showDDL === Array( s"CREATE TABLE $fullName (", - "a BIGINT,", + "a BIGINT NOT NULL,", "b BIGINT DEFAULT 42,", "c STRING COLLATE UTF8_BINARY DEFAULT 'abc, \"def\"' COMMENT 'comment')", "USING parquet", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index d92e796455714..d9aa50d1fb3a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -2851,6 +2851,188 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } + test("SPARK-55716: V1 INSERT rejects null into NOT NULL column for file sources") { + Seq("parquet", "orc", "json").foreach { format => + withSQLConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true") { + withTable("t") { + sql(s"CREATE TABLE t(i INT NOT NULL, s STRING NOT NULL) USING $format") + // V1 DataSource writes now enforce NOT NULL constraints via AssertNotNull + val e1 = intercept[SparkRuntimeException] { + sql("INSERT INTO t VALUES(null, 'a')") + } + assert(e1.getCondition === "NOT_NULL_ASSERT_VIOLATION") + val e2 = intercept[SparkRuntimeException] { + sql("INSERT INTO t VALUES(1, null)") + } + assert(e2.getCondition === "NOT_NULL_ASSERT_VIOLATION") + // Valid insert should succeed + sql("INSERT INTO t VALUES(1, 'a')") + checkAnswer(spark.table("t"), Seq(Row(1, "a"))) + } + } + } + } + + test("SPARK-55716: V1 INSERT NOT NULL enforcement respects storeAssignmentPolicy") { + Seq("parquet", "orc").foreach { format => + // ANSI mode (default): rejects null + withSQLConf( + SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true", + SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.ANSI.toString) { + withTable("t") { + sql(s"CREATE TABLE t(i INT NOT NULL) USING $format") + val e = intercept[SparkRuntimeException] { + sql("INSERT INTO t VALUES(null)") + } + assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION") + } + } + // STRICT mode: also rejects null (fails at analysis with type mismatch) + withSQLConf( + SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true", + SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.STRICT.toString) { + withTable("t") { + sql(s"CREATE TABLE t(i INT NOT NULL) USING $format") + intercept[AnalysisException] { + sql("INSERT INTO t VALUES(null)") + } + } + } + // LEGACY mode: allows null (no AssertNotNull injected) + withSQLConf( + SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.LEGACY.toString) { + withTable("t") { + sql(s"CREATE TABLE t(i INT NOT NULL) USING $format") + sql("INSERT INTO t VALUES(null)") + checkAnswer(spark.table("t"), Seq(Row(null))) + } + } + // Legacy config: allows null even in ANSI mode + withSQLConf( + SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "false") { + withTable("t") { + sql(s"CREATE TABLE t(i INT NOT NULL) USING $format") + sql("INSERT INTO t VALUES(null)") + checkAnswer(spark.table("t"), Seq(Row(null))) + } + } + } + } + + test("SPARK-55716: V1 INSERT rejects null with V2 file source path") { + Seq("parquet", "orc").foreach { format => + withSQLConf( + SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true", + SQLConf.USE_V1_SOURCE_LIST.key -> "") { + withTable("t") { + sql(s"CREATE TABLE t(i INT NOT NULL, s STRING NOT NULL) USING $format") + val e = intercept[SparkRuntimeException] { + sql("INSERT INTO t VALUES(null, 'a')") + } + assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION") + } + } + } + } + + test("SPARK-55716: V1 INSERT rejects null array element for NOT NULL element type") { + Seq("parquet", "orc").foreach { format => + withSQLConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true") { + withTable("t") { + val schema = new StructType() + .add("a", ArrayType(IntegerType, containsNull = false)) + spark.sessionState.catalog.createTable( + CatalogTable( + identifier = TableIdentifier("t"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = schema, + provider = Some(format)), + ignoreIfExists = false) + val e = intercept[SparkRuntimeException] { + sql("INSERT INTO t SELECT array(1, null, 3)") + } + assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION") + // Valid insert should succeed + sql("INSERT INTO t SELECT array(1, 2, 3)") + checkAnswer(spark.table("t"), Seq(Row(Seq(1, 2, 3)))) + } + } + } + } + + test("SPARK-55716: V1 INSERT rejects null struct field for NOT NULL field") { + Seq("parquet", "orc").foreach { format => + withSQLConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true") { + withTable("t") { + val schema = new StructType() + .add("s", new StructType() + .add("x", IntegerType, nullable = false) + .add("y", StringType, nullable = false)) + spark.sessionState.catalog.createTable( + CatalogTable( + identifier = TableIdentifier("t"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = schema, + provider = Some(format)), + ignoreIfExists = false) + val e = intercept[SparkRuntimeException] { + sql("INSERT INTO t SELECT named_struct('x', null, 'y', 'hello')") + } + assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION") + // Valid insert should succeed + sql("INSERT INTO t SELECT named_struct('x', 1, 'y', 'hello')") + checkAnswer(spark.table("t"), Seq(Row(Row(1, "hello")))) + } + } + } + } + + test("SPARK-55716: V1 INSERT rejects null map value for NOT NULL value type") { + Seq("parquet", "orc").foreach { format => + withSQLConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true") { + withTable("t") { + val schema = new StructType() + .add("m", MapType(StringType, IntegerType, valueContainsNull = false)) + spark.sessionState.catalog.createTable( + CatalogTable( + identifier = TableIdentifier("t"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = schema, + provider = Some(format)), + ignoreIfExists = false) + val e = intercept[SparkRuntimeException] { + sql("INSERT INTO t SELECT map('a', 1, 'b', null)") + } + assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION") + // Valid insert should succeed + sql("INSERT INTO t SELECT map('a', 1, 'b', 2)") + checkAnswer(spark.table("t"), Seq(Row(Map("a" -> 1, "b" -> 2)))) + } + } + } + } + + test("SPARK-55716: V1 DataFrame write ignores NOT NULL schema constraint") { + Seq("parquet", "orc").foreach { format => + withTempPath { path => + val data = Seq(Row(null, "hello", 1.0), Row(1, null, 2.0), Row(2, "world", null)) + val df = spark.createDataFrame( + spark.sparkContext.parallelize(data), + new StructType() + .add("id", IntegerType, nullable = true) + .add("name", StringType, nullable = true) + .add("value", DoubleType, nullable = true)) + // V1 DataSource writes do not enforce NOT NULL constraints + df.write.mode(SaveMode.Overwrite).format(format).save(path.getCanonicalPath) + val result = spark.read.format(format).load(path.getCanonicalPath) + checkAnswer(result, data) + } + } + } + test("UNSUPPORTED_OVERWRITE.PATH: Can't overwrite a path that is also being read from") { val tableName = "t1" withTable(tableName) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index d3eec329efc11..01b94c1e9cbac 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -224,7 +224,7 @@ private[hive] class SparkGetColumnsOperation( null, // SQL_DATETIME_SUB null, // CHAR_OCTET_LENGTH ordinal.asInstanceOf[AnyRef], // ORDINAL_POSITION, 1-based - "YES", // IS_NULLABLE + (if (column.nullable) "YES" else "NO"), // IS_NULLABLE null, // SCOPE_CATALOG null, // SCOPE_SCHEMA null, // SCOPE_TABLE diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index a10d2974db744..c2a5ca1023e90 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -340,10 +340,12 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase { case _ => assert(radix === 0) // nulls } - assert(rowSet.getInt("NULLABLE") === 1) + val expectedNullable = if (schema(pos).nullable) 1 else 0 + assert(rowSet.getInt("NULLABLE") === expectedNullable) assert(rowSet.getString("REMARKS") === pos.toString) assert(rowSet.getInt("ORDINAL_POSITION") === pos + 1) - assert(rowSet.getString("IS_NULLABLE") === "YES") + val expectedIsNullable = if (schema(pos).nullable) "YES" else "NO" + assert(rowSet.getString("IS_NULLABLE") === expectedIsNullable) assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO") pos += 1 } @@ -374,7 +376,7 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase { assert(rowSet.getInt("NULLABLE") === 0) assert(rowSet.getString("REMARKS") === "") assert(rowSet.getInt("ORDINAL_POSITION") === 1) - assert(rowSet.getString("IS_NULLABLE") === "YES") + assert(rowSet.getString("IS_NULLABLE") === "NO") assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO") } } @@ -402,7 +404,7 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase { assert(rowSet.getInt("NULLABLE") === 0) assert(rowSet.getString("REMARKS") === "") assert(rowSet.getInt("ORDINAL_POSITION") === 1) - assert(rowSet.getString("IS_NULLABLE") === "YES") + assert(rowSet.getString("IS_NULLABLE") === "NO") assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO") } } @@ -428,7 +430,7 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase { assert(rowSet.getInt("NULLABLE") === 0) assert(rowSet.getString("REMARKS") === "") assert(rowSet.getInt("ORDINAL_POSITION") === 1) - assert(rowSet.getString("IS_NULLABLE") === "YES") + assert(rowSet.getString("IS_NULLABLE") === "NO") assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO") } } From 3feda3df244aafb7bd7f05fdc4a1ac7d7e0a6140 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 3 Mar 2026 12:34:39 +0800 Subject: [PATCH 2/2] Apply suggestion from @yaooqinn --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 01a01cf8ca534..217fb6672f243 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4745,7 +4745,6 @@ object SQLConf { val FILE_SOURCE_INSERT_ENFORCE_NOT_NULL = buildConf("spark.sql.fileSource.insert.enforceNotNull") - .internal() .doc("When true, Spark enforces NOT NULL constraints when inserting data into " + "file-based data source tables (e.g., Parquet, ORC, JSON), consistent with the " + "behavior for other data sources and V2 catalog tables. " +