-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-55716][SQL] Support NOT NULL constraint enforcement for V1 file source table inserts #54517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,7 +29,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors | |
| import org.apache.spark.sql.execution.CommandExecutionMode | ||
| import org.apache.spark.sql.execution.datasources._ | ||
| import org.apache.spark.sql.sources.BaseRelation | ||
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} | ||
| import org.apache.spark.util.ArrayImplicits._ | ||
|
|
||
| /** | ||
|
|
@@ -107,8 +107,17 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo | |
| table.copy(schema = new StructType(), partitionColumnNames = Nil) | ||
|
|
||
| case _ => | ||
| // Merge nullability from the user-specified schema into the resolved schema. | ||
| // DataSource.resolveRelation() calls dataSchema.asNullable which strips NOT NULL | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it simpler to not do
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| // constraints. We restore nullability from the original user schema while keeping | ||
| // the resolved data types (which may include CharVarchar normalization, metadata, etc.) | ||
| val resolvedSchema = if (table.schema.nonEmpty) { | ||
| restoreNullability(dataSource.schema, table.schema) | ||
| } else { | ||
| dataSource.schema | ||
| } | ||
| table.copy( | ||
| schema = dataSource.schema, | ||
| schema = resolvedSchema, | ||
| partitionColumnNames = partitionColumnNames, | ||
| // If metastore partition management for file source tables is enabled, we start off with | ||
| // partition provider hive, but no partitions in the metastore. The user has to call | ||
|
|
@@ -122,6 +131,39 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo | |
|
|
||
| Seq.empty[Row] | ||
| } | ||
|
|
||
| /** | ||
| * Recursively restores nullability from the original user-specified schema into | ||
| * the resolved schema. The resolved schema's data types are preserved (they may | ||
| * contain CharVarchar normalization, metadata, etc.), but nullability flags | ||
| * (top-level and nested) are taken from the original schema. | ||
| */ | ||
| private def restoreNullability(resolved: StructType, original: StructType): StructType = { | ||
| val originalFields = original.fields.map(f => f.name -> f).toMap | ||
| StructType(resolved.fields.map { resolvedField => | ||
| originalFields.get(resolvedField.name) match { | ||
| case Some(origField) => | ||
| resolvedField.copy( | ||
| nullable = origField.nullable, | ||
| dataType = restoreDataTypeNullability(resolvedField.dataType, origField.dataType)) | ||
| case None => resolvedField | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| private def restoreDataTypeNullability(resolved: DataType, original: DataType): DataType = { | ||
| (resolved, original) match { | ||
| case (r: StructType, o: StructType) => restoreNullability(r, o) | ||
| case (ArrayType(rElem, _), ArrayType(oElem, oNull)) => | ||
| ArrayType(restoreDataTypeNullability(rElem, oElem), oNull) | ||
| case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) => | ||
| MapType( | ||
| restoreDataTypeNullability(rKey, oKey), | ||
| restoreDataTypeNullability(rVal, oVal), | ||
| oValNull) | ||
| case _ => resolved | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we follow the existing config namespace? Currently, we have many
spark.sql.files.*configs which are "effective only when using file-based sources"There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make not much sense to me, these configurations work on pure files, but this one actually takes no effect on the file-only mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's true because a file-based table without a catalog can not store constraint info, but is that worth a new namespace for config? I can not infer such info from the proposed namespace name
fileSource. IMO, it still fits thespark.sql.files.scope, and we can mention such a limitation in the config docs, if necessary.