[SPARK-56550][SQL] Support source with fewer columns/fields in INSERT INTO WITH SCHEMA EVOLUTION#55427
[SPARK-56550][SQL] Support source with fewer columns/fields in INSERT INTO WITH SCHEMA EVOLUTION#55427szehon-ho wants to merge 10 commits into
Conversation
…SERT INTO WITH SCHEMA EVOLUTION Add support for INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested struct fields with null (or column defaults) when the source has fewer fields than the target, mirroring existing MERGE INTO behavior. Changes: - Add spark.sql.insertNestedTypeCoercion.enabled config flag (default false) - Refactor TableOutputResolver.resolveOutputColumns to accept DefaultValueFillMode enum directly instead of two overlapping boolean parameters - Enable RECURSE mode for V2 inserts when both schema evolution and the config flag are active - Add comprehensive tests for all scenarios
johanl-db
left a comment
There was a problem hiding this comment.
The change makes sense, the most important points are:
- It only applies when schema evolution is enabled. Without schema evolution, Spark should fallback to schema enforcement: fail when schemas don't match
- It's disabled by default for now. That'll allow tuning the behavior if we find any case that deviates from the intended behavior.
| TableOutputResolver.suitableForByNameCheck(v2Write.isByName, | ||
| expected = v2Write.table.output, queryOutput = v2Write.query.output) | ||
| val defaultValueFillMode = | ||
| if (conf.coerceInsertNestedTypes && v2Write.schemaEvolutionEnabled) RECURSE |
There was a problem hiding this comment.
Worth calling out in a comment: without schema evolution, spark will enforce that there are no missing columns in the data being written
| } | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // Tests for source with fewer columns/fields than target |
There was a problem hiding this comment.
Another interesting test case I can think of:
insert by name + schema evolution with an extra column and a missing column: column count is the same between source and target, but not the same column names
Same for nested struct
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
Prior state and problem. INSERT INTO ... WITH SCHEMA EVOLUTION required source columns and nested fields to match the target exactly; sources with fewer struct fields (or fewer trailing top-level columns, under by-position) produced arity / struct-missing-field errors. MERGE INTO already has the equivalent coercion via spark.sql.mergeNestedTypeCoercion.enabled (SPARK-53482), which created an asymmetry for schema-evolution workflows where older sources trail the target.
Design approach. Mirrors the MERGE pattern with a parallel internal flag spark.sql.insertNestedTypeCoercion.enabled (off by default, experimental, v4.2.0). Leverages the existing DefaultValueFillMode enum (NONE / FILL / RECURSE) already threaded through TableOutputResolver and replaces the supportColDefaultValue: Boolean parameter on resolveOutputColumns with it. RECURSE is selected in ResolveOutputRelation only when both conf.coerceInsertNestedTypes and v2Write.schemaEvolutionEnabled are true; otherwise FILL is selected (preserving prior top-level default-fill behavior). RECURSE then (a) relaxes the by-position "not enough columns" gate and fills trailing target columns with defaults / null, and (b) propagates the fill flag through nested struct resolution.
Key design decisions made by this PR.
- Gating: config ×
schemaEvolutionEnabled. Off by default and explicitly scoped to the WITH SCHEMA EVOLUTION path, so plain INSERT INTO behavior is untouched. - V2-only scope:
PreprocessTableInsertion(V1) keeps FILL — V1 doesn't support schema evolution, so this is coherent. - Enum replacement of the boolean: cleaner signature, and makes the three possible modes explicit at every call site.
Implementation sketch.
Analyzer.scalaResolveOutputRelationpicks the mode.TableOutputResolver.resolveOutputColumnstakes the new parameter, derivesfillDefaultValue = defaultValueFillMode == RECURSE, and loosens the by-position gate.resolveColumnsByPositiongainsfillDefaultValue; when true, appends defaults for target columns trailing past the input length.resolveStructTypeforwardsfillDefaultValueinto recursive by-position resolution.- 17 new tests in
InsertIntoSchemaEvolutionTestscover by-name / by-position, explicit DEFAULT, nested structs (including inside arrays and map values — by-name only), deeply nested, null structs, and negative cases.
Main open points (see inline comments).
resolveArrayType/resolveMapTypeby-position recursion doesn't propagatefillDefaultValue, inconsistent withresolveStructType. Concretely, by-position INSERT intoarray<struct<...>>/map<_, struct<...>>with missing nested fields still errors even under RECURSE, while the by-name counterpart succeeds. Tests only cover by-name for array / map, so this gap isn't exercised.- By-position trailing default fill skips
applyColumnMetadata, inconsistent with the by-name path — likely to break char / varchar write-side handling for trailing filled columns. - A few doc / comment / test-fidelity issues.
General note on the pre-existing DefaultValueFillMode enum doc (TableOutputResolver.scala:42-48, not in this diff): it was written for the MERGE use case and currently says RECURSE "fill[s] missing top-level columns and also recurse[s] into nested struct fields". After this PR, RECURSE also enables by-position top-level trailing fill. Worth updating the scaladoc as part of this PR so the enum semantics don't drift further.
| } else { | ||
| resolveColumnsByPosition( | ||
| tableName, fields, toAttributes(expectedType), conf, addError, colPath) | ||
| tableName, fields, toAttributes(expectedType), conf, addError, colPath, fillDefaultValue) |
There was a problem hiding this comment.
fillDefaultValue is correctly propagated here, but the sibling paths in resolveArrayType (line 522) and resolveMapType (lines 559, 571) still call resolveColumnsByPosition without the flag. Consequence: INSERT BY POSITION ... WITH SCHEMA EVOLUTION into a column typed array<struct<...>> or map<_, struct<...>> whose nested struct has missing fields still errors — while the BY NAME counterpart succeeds. The PR's test matrix only covers by-name for array / map, so this gap isn't exercised.
Either propagate fillDefaultValue in those two methods (consistent with resolveStructType), or narrow the enum doc / PR description to say array / map nested fill is by-name only — and skip the fillDefaultValue plumbing into those methods entirely. If the former, please add by-position test coverage for array-of-struct and map-of-struct symmetric to the existing by-name tests.
There was a problem hiding this comment.
Done in the latest commit: resolveArrayType and resolveMapType now pass fillDefaultValue into resolveColumnsByPosition for the by-position branches (matching resolveStructType). Added Insert schema evolution: source missing field in struct nested in array/map value by position tests in InsertIntoTests.
| val defaults = if (fillDefaultValue) { | ||
| actualExpectedCols.drop(inputCols.size).flatMap { expectedCol => | ||
| getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) | ||
| .map(expr => Alias(expr, expectedCol.name)()) |
There was a problem hiding this comment.
The by-name path at line 327 routes the default-value expression through applyColumnMetadata(defaultExpr.get, expectedCol), which strips source metadata and pins the target column's required metadata — including CharVarcharUtils.cleanMetadata and the write-side metadata guarantees documented on applyColumnMetadata. This trailing-fill branch just does Alias(expr, expectedCol.name)(), so for a by-position insert where the trailing target column is char / varchar, the filled column won't carry the target-column metadata the way the by-name filled column does. Suggest aligning with the by-name path:
| .map(expr => Alias(expr, expectedCol.name)()) | |
| getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) | |
| .map(expr => applyColumnMetadata(expr, expectedCol)) |
There was a problem hiding this comment.
Done: the trailing-fill branch now uses applyColumnMetadata(expr, expectedCol) like the by-name path.
| // Negative tests: missing columns/fields should fail WITHOUT schema evolution | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
| test("Insert without evolution: source missing top-level column by name fails") { |
There was a problem hiding this comment.
Test name says "by name" but the call uses doInsert (by-position). The asserted error INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS is only emitted on the by-position path — the by-name path would instead hit incompatibleDataToTableCannotFindDataError (see the nested-field counterpart a few tests below that uses doInsertByName). For symmetry with the positive test that uses byName = true, please switch to doInsertByName and update the expected error accordingly (or rename this test to "… by position fails", but then it duplicates the existing by-position test at line 1791).
There was a problem hiding this comment.
Done: switched to doInsertByName and assert INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA for missing salary. Wrapped in withSQLConf(USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES -> false) so FILL mode does not silently insert null under the test session defaults (otherwise the insert succeeds and no exception is thrown).
| // With schema evolution, allow the source to have fewer columns/fields than the target | ||
| // and fill missing ones with default values or nulls (RECURSE mode). Without schema | ||
| // evolution, only top-level default column values are filled (FILL mode) and any | ||
| // missing columns will cause a schema enforcement error. |
There was a problem hiding this comment.
The phrase "only top-level default column values are filled (FILL mode) and any missing columns will cause a schema enforcement error" reads as self-contradictory: FILL mode does fill missing top-level columns (with explicit DEFAULT values, or null when spark.sql.defaultColumn.useNullsForMissingDefaultValues is true). The intended contrast is with nested fields, not top-level. Suggest:
| // missing columns will cause a schema enforcement error. | |
| // With schema evolution + coercion flag, missing top-level columns AND missing nested | |
| // struct fields are filled with defaults/null (RECURSE mode). Otherwise, only missing | |
| // top-level columns are filled via FILL mode; missing nested struct fields still cause | |
| // schema enforcement errors. |
There was a problem hiding this comment.
Done: updated the comment to your suggested wording (RECURSE vs FILL and nested vs top-level).
| val INSERT_INTO_NESTED_TYPE_COERCION_ENABLED = | ||
| buildConf("spark.sql.insertNestedTypeCoercion.enabled") | ||
| .internal() | ||
| .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested " + |
There was a problem hiding this comment.
The doc describes only the nested-struct-field effect, but enabling this flag also loosens by-position trailing top-level fill (see the "Missing top-level column (by position)" row in the PR description, which moves from "error" to "fill trailing"). Users skimming the config doc might reasonably expect the flag's scope to be limited to nested types. Suggest extending, e.g.:
| .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested " + | |
| .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested " + | |
| "struct fields with null when the source has fewer nested fields than the target " + | |
| "table. Also relaxes by-position column-count enforcement so trailing missing " + | |
| "top-level columns are filled with their default value (or null). This is " + | |
| "experimental and the semantics may change.") |
There was a problem hiding this comment.
Done: extended the config doc to mention by-position trailing top-level fill as well.
…on coercion Propagate fillDefaultValue through resolveArrayType and resolveMapType by-position paths; use applyColumnMetadata for trailing default fills; clarify Analyzer and SQLConf docs; extend DefaultValueFillMode scaladoc; fix by-name negative test (with USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES disabled) and add by-position array/map nested struct tests.
|
@cloud-fan thanks for the detailed review. I addressed the inline points in a single follow-up commit (pushed to this branch):
Replies are threaded on each of your line comments. |
cloud-fan
left a comment
There was a problem hiding this comment.
5 addressed, 0 remaining, 1 new (1 late catch).
- Addressed:
fillDefaultValuepropagation throughresolveArrayType/resolveMapTypeplus by-position array/map tests; trailing default fill now goes throughapplyColumnMetadata; by-name negative test usesdoInsertByNamewithUSE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES=false;ResolveOutputRelationcomment clarified;INSERT_INTO_NESTED_TYPE_COERCION_ENABLEDdoc covers by-position top-level fill;DefaultValueFillModescaladoc updated for RECURSE. - New (late catch): by-position trailing fill silently drops a trailing target column/field when no default is available — see the inline comment. This existed in
0852d145dbfand I missed it last round; flagging now because the by-name path errors cleanly in the same case and the asymmetry is a real correctness gap with the flag enabled.
| val defaults = if (fillDefaultValue) { | ||
| actualExpectedCols.drop(inputCols.size).flatMap { expectedCol => | ||
| getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) | ||
| .map(expr => applyColumnMetadata(expr, expectedCol)) | ||
| } | ||
| } else { | ||
| Nil | ||
| } |
There was a problem hiding this comment.
When a trailing target column has no usable default — non-nullable with no DEFAULT, or useNullsForMissingDefaultColumnValues=false with no explicit DEFAULT — getDefaultValueExprOrNullLit returns None and the surrounding .flatMap { _.map(...) } silently drops that column. The result matched ++ defaults is shorter than expected. Two failure modes:
- Top-level:
resolveOutputColumnsbuilds an undersizedProjectand the V2 plan never becomesoutputResolved(V2WriteCommand.areCompatiblerequiresinAttrs.size == outAttrs.size) — producing an obscure analyzer error instead of the cleanINCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATAthe by-name path emits. - Nested:
resolveStructTypeline 490 returnsNoneon length mismatch, propagating a cascading silent drop of the enclosing struct field.
The by-name path at lines 317-328 throws incompatibleDataToTableCannotFindDataError in the same situation; suggest mirroring:
| val defaults = if (fillDefaultValue) { | |
| actualExpectedCols.drop(inputCols.size).flatMap { expectedCol => | |
| getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) | |
| .map(expr => applyColumnMetadata(expr, expectedCol)) | |
| } | |
| } else { | |
| Nil | |
| } | |
| val defaults = if (fillDefaultValue) { | |
| actualExpectedCols.drop(inputCols.size).map { expectedCol => | |
| val defaultExpr = getDefaultValueExprOrNullLit( | |
| expectedCol, conf.useNullsForMissingDefaultColumnValues) | |
| if (defaultExpr.isEmpty) { | |
| throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError( | |
| tableName, (colPath :+ expectedCol.name).quoted) | |
| } | |
| applyColumnMetadata(defaultExpr.get, expectedCol) | |
| } | |
| } else { | |
| Nil | |
| } |
Repro (mirroring the existing by-name negative test at line 1839): same V2 table (id int, salary int, dep string), set INSERT_INTO_NESTED_TYPE_COERCION_ENABLED=true AND USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES=false, then doInsertWithSchemaEvolution(t, Seq((1, 200)).toDF("id", "salary")) (by-position, source missing dep). Today this silently produces a 2-column Project; with the suggested fix it errors symmetrically with the by-name negative test. Worth adding a by-position counterpart there.
There was a problem hiding this comment.
Addressed in two follow-up commits on the branch:
-
Trailing default fill (this thread): Replaced the
flatMapwith an explicit check: ifgetDefaultValueExprOrNullLitis empty we now throwINCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATAwith the same path quoting as the by-name path. Added regression tests (top-level and nested by-position) withUSE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES=false. -
Further hardening (same file):
resolveColumnsByPositionnow assertsmatched ++ defaultshas full arity;reorderColumnsByNameand nestedresolveStructType/resolveArrayType/resolveMapTypetakeenforceFullOutputso INSERT output resolution throws on any incomplete nested resolution instead of returningNil/NonefromflatMap. MERGEresolveUpdatekeepsenforceFullOutput=falsesogetOrElsefallback semantics are unchanged.
Ran DataSourceV2SQLSuiteV1Filter (Insert schema evolution / source tests) and MergeIntoDataFrameSuite (nested struct merge tests).
…s no default When insert nested coercion + schema evolution fills by-position trailing columns/fields, getDefaultValueExprOrNullLit can return None (e.g. nullable column with useNullsForMissingDefaultColumnValues=false and no explicit DEFAULT). The previous flatMap silently dropped those targets; mirror the by-name path by throwing INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA. Add regression tests for top-level and nested struct by-position cases.
…umn drops After fixing trailing by-position default fill (throw when no default), add: - Post-check on resolveColumnsByPosition: result length must match expected arity - enforceFullOutput on reorderColumnsByName and nested struct/array/map resolvers: INSERT (resolveOutputColumns) throws on incomplete resolution; MERGE resolveUpdate keeps enforceFullOutput=false so getOrElse fallback semantics are unchanged. Scalastyle: argcount off for the three nested resolver methods.
|
@cloud-fan Thanks for the catch on the trailing `flatMap` silently dropping columns when no default is available — fixed in `380a3ac` (throw + regression tests). I also pushed an additional hardening commit (`0be4cc8`) so INSERT and MERGE output resolution cannot end up with a too-short projection |
Align MERGE assignment resolution with INSERT: resolveStructType/Array/Map now pass enforceFullOutput=true so incomplete nested resolution throws instead of falling back via Option. MergeInto schema evolution suites (824 tests across Group/Delta SQL+Scala) and MergeIntoDataFrameSuite nested struct tests pass.
4831d54 to
2c56ade
Compare
cloud-fan
left a comment
There was a problem hiding this comment.
1 addressed, 0 remaining, 1 new (newly introduced in commit 2c56adef).
- Addressed: by-position trailing fill silently dropping trailing target columns when no default is available — fixed across
380a3ac98and0be4cc8d951. - New (newly introduced in
2c56adef, "Use enforceFullOutput for MERGE resolveUpdate as well"): MERGE/UPDATE error-reporting regression from removing theenforceFullOutputparameter. See the inline comment.
Smaller, no inline: the new cannotFindOutputData(tableName, colPath, quotedFallback: String) overload silently ignores quotedFallback when colPath is non-empty. The resolveArrayType / resolveMapType callers pass toSQLId(expected.name) as quotedFallback, but at those call sites colPath is always non-empty — so the third arg is dead. Either drop it or document the precedence.
| } | ||
| } else { | ||
| Nil | ||
| cannotFindOutputData(tableName, colPath, expectedCols.map(_.name)) |
There was a problem hiding this comment.
The latest commit (2c56adef, "Use enforceFullOutput for MERGE resolveUpdate as well") removed the enforceFullOutput parameter and made this throw — plus the matching throws in resolveStructType (line 504), resolveArrayType (line 545), resolveMapType (line 608) — unconditional. The previous commit 0be4cc8d951 had enforceFullOutput=true for INSERT (via resolveOutputColumns) and false for MERGE (via resolveUpdate), so MERGE could return Nil here, the recursive resolveStructType could return None, resolveUpdate's .getOrElse(value) could fall back, and AssignmentUtils.alignUpdateAssignments would throw DATATYPE_MISMATCH.INVALID_ROW_LEVEL_OPERATION_ASSIGNMENTS with the specific addError message (e.g. "Cannot safely cast string to int").
Now the throw fires for both callers and the addError-collected cast-failure detail is replaced by a generic INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA for MERGE/UPDATE. Concrete tests that assert the prior behavior:
AlignMergeAssignmentsSuitelines 738-744 (ANSI mode,UPDATE SET s.n_s = named_struct('dn_i', 'string-value', 'dn_l', 1L)on targetstruct<dn_i:int, dn_l:bigint>) —assertAnalysisException(…, "Cannot safely cast"). Same pattern at lines 920-925 (strict mode).AlignUpdateAssignmentsSuitelines 546-548 and 599-601, same pattern.
Repro path: applyAssignments → resolveUpdate (struct value, struct col) → resolveStructType → reorderColumnsByName with NONE mode. For field dn_i: matched but types (string, int) ⇒ checkField's canWrite fails, addError("Cannot safely cast string to int"), returns None. The flatMap drops it, reordered.length=1 != expectedCols.length=2, and HEAD's else branch on line 372 throws — bypassing AssignmentUtils.alignUpdateAssignments's errors.nonEmpty check. assertAnalysisException does substring matching, so it fails on "Cannot find data" vs the expected "Cannot safely cast".
The commit message says MergeInto schema evolution suites and MergeIntoDataFrameSuite nested struct tests pass; those don't exercise the type-mismatch path that goes through reorderColumnsByName's addError accumulation.
Side effect: resolveStructType / resolveArrayType / resolveMapType still declare Option[NamedExpression] but None is now unreachable. resolveUpdate's .getOrElse(value) is dead code and no longer protects MERGE error reporting.
Two reasonable resolutions:
- Restore
enforceFullOutputas in0be4cc8d951— surgical, MERGE behavior preserved. - Commit to the new MERGE behavior — but then it belongs in the PR description (this is scope beyond "INSERT WITH SCHEMA EVOLUTION"), the AlignMerge/AlignUpdate assertions need to be updated, and ideally the thrown
Cannot find dataerror should still carry theaddErrorcast-failure detail so MERGE users don't lose actionable error information.
There was a problem hiding this comment.
Done in ca22b54: reverted the unconditional throws from 2c56adef and restored enforceFullOutput as in 0be4cc8 — true for INSERT (resolveOutputColumns) and false for MERGE (resolveUpdate), so addError cast-failure detail still surfaces via AssignmentUtils.alignUpdateAssignments. Verified AlignMergeAssignmentsSuite and AlignUpdateAssignmentsSuite (including the ANSI named_struct('dn_i', 'string-value', ...) cases).
| val ex = intercept[AnalysisException] { | ||
| doInsertByName(t1, sourceData) | ||
| } | ||
| assert(ex.getMessage.contains("Cannot find data")) |
There was a problem hiding this comment.
The other negative tests added in this patch (e.g. lines 1879-1892, 1905-1918) use checkError(condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", parameters = Map(...)), which pins tableName and colName and would catch a future change that drops the column path. Suggest aligning here (and at line 1989-1992 in the next test):
| val ex = intercept[AnalysisException] { | |
| doInsertByName(t1, sourceData) | |
| } | |
| assert(ex.getMessage.contains("Cannot find data")) | |
| checkError( | |
| exception = intercept[AnalysisException] { | |
| doInsertByName(t1, sourceData) | |
| }, | |
| condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", | |
| parameters = Map( | |
| "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), | |
| "colName" -> "`s`.`c3`") | |
| ) |
There was a problem hiding this comment.
Done in ca22b54: both tests now use checkError with INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA and pin tableName / `s`.`c3`.
Revert the 2c56ade change that made incomplete nested resolution throw for MERGE/UPDATE, restoring addError-based cast failure messages in AlignMergeAssignmentsSuite and AlignUpdateAssignmentsSuite. Align two INSERT negative tests with checkError for CANNOT_FIND_DATA.
|
@cloud-fan Thanks for the May 15 follow-up — addressed in ca22b54:
Tests run locally: |
|
thanks, merging to master/4.x! |
… INTO WITH SCHEMA EVOLUTION ### What changes were proposed in this pull request? Add support for `INSERT INTO ... WITH SCHEMA EVOLUTION` to fill missing nested struct fields with null (or column defaults) when the source has fewer fields than the target table. This mirrors the existing `MERGE INTO` behavior gated by `spark.sql.mergeNestedTypeCoercion.enabled`. **Specific changes:** 1. **New config flag**: `spark.sql.insertNestedTypeCoercion.enabled` (internal, default `false`) — mirrors the existing `spark.sql.mergeNestedTypeCoercion.enabled` for MERGE. 2. **Refactored `TableOutputResolver.resolveOutputColumns`**: Replaced two overlapping boolean parameters (`supportColDefaultValue`, `fillNestedDefaults`) with a single `DefaultValueFillMode` enum (`NONE`, `FILL`, `RECURSE`), making the API cleaner and the intent explicit at each call site. 3. **RECURSE mode for V2 inserts**: When both schema evolution and the coercion flag are enabled, `RECURSE` mode fills missing nested struct fields with null, relaxes the by-position arity check, and recurses into structs nested within arrays and maps. **Supported scenarios** (source has fewer columns/fields than target, with schema evolution + coercion flag): | Scenario | Before | After (+ coercion flag) | |---|---|---| | Missing top-level column (by name) | fill with default/null | same (unchanged) | | Missing top-level column with DEFAULT (by name) | fill with default value | same (unchanged) | | Missing top-level column (by position) | error | fill trailing with default/null | | Missing top-level column with DEFAULT (by position) | error | fill trailing with default value | | Missing nested struct field (by name) | error | fill with null | | Missing nested struct field (by position) | error | fill with null | | Missing field in struct inside array (by name) | error | fill with null | | Missing field in struct inside map value (by name) | error | fill with null | | Missing deeply nested struct field (by name) | error | fill with null | ### Why are the changes needed? `MERGE INTO` already supports coercing nested types when the source has fewer struct fields than the target (via `spark.sql.mergeNestedTypeCoercion.enabled`). `INSERT INTO WITH SCHEMA EVOLUTION` lacked this capability, causing errors for legitimate use cases where the source schema is a subset of the target schema at the nested level. This is important for schema evolution workflows where tables accumulate new nested fields over time, but older data sources may not have all fields populated. ### Does this PR introduce _any_ user-facing change? Yes. When `spark.sql.insertNestedTypeCoercion.enabled` is set to `true` (default `false`), `INSERT INTO ... WITH SCHEMA EVOLUTION` will no longer fail when the source has fewer nested struct fields than the target. Instead, missing fields are filled with null. This is gated behind an internal, experimental config flag. ### How was this patch tested? Added 17 new test cases in `InsertIntoSchemaEvolutionTests`: **Positive tests** (with schema evolution + coercion flag): - Missing top-level column by name / by position - Missing top-level column with DEFAULT value by name / by position - Missing nested struct field by name / by position - Missing field in struct nested in array / map value - Missing deeply nested struct field - Null struct with missing field by name / by position - Mixed null and non-null structs with missing field - Null deeply nested struct with missing field - Null struct in array with missing field **Negative tests** (verifying errors when coercion is disabled): - Missing top-level column by name / by position (without evolution) - Missing nested struct field by name / by position (without evolution) - Missing nested struct field with evolution but without coercion flag All 64 matched tests pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor (Claude Opus 4) Closes #55427 from szehon-ho/insert-schema-evolution-missing-fields. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 558ea1c) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
|
Thank you, @szehon-ho and @cloud-fan . cc @peter-toth |
|
Backport to |
…NSERT INTO WITH SCHEMA EVOLUTION ## Summary Backport of #55427 to `branch-4.2`. Adds support for `INSERT INTO ... WITH SCHEMA EVOLUTION` to fill missing nested struct fields with null (or column defaults) when the source has fewer fields than the target table, mirroring existing `MERGE INTO` behavior gated by `spark.sql.mergeNestedTypeCoercion.enabled`. Key changes: - New config: `spark.sql.insertNestedTypeCoercion.enabled` (internal, default `false`) - Refactor `TableOutputResolver.resolveOutputColumns` to use `DefaultValueFillMode` (`NONE`, `FILL`, `RECURSE`) - Enable `RECURSE` mode for V2 inserts when schema evolution and the coercion flag are both enabled - 17 new tests in `InsertIntoSchemaEvolutionTests` (via `InsertIntoTests.scala`) ## Why are the changes needed? `MERGE INTO` already supports nested type coercion when the source has fewer struct fields than the target. `INSERT INTO WITH SCHEMA EVOLUTION` lacked this capability, causing errors for legitimate schema-evolution workflows where older sources omit newer nested fields. ## Does this PR introduce _any_ user-facing change? Yes. When `spark.sql.insertNestedTypeCoercion.enabled` is set to `true` (default `false`), `INSERT INTO ... WITH SCHEMA EVOLUTION` fills missing nested struct fields with null instead of failing. ## How was this patch tested? Cherry-picked from #55427 onto current `branch-4.2` (`bd8872a0cc7`) with clean cherry-picks (no conflicts). Original PR test plan: - Added comprehensive positive/negative tests in `InsertIntoTests.scala` - All matched tests passed on master ## Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor (Claude Opus 4) Closes #56002 from szehon-ho/insert-schema-evolution-missing-fields-4.2. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Add support for
INSERT INTO ... WITH SCHEMA EVOLUTIONto fill missing nested struct fields with null (or column defaults) when the source has fewer fields than the target table. This mirrors the existingMERGE INTObehavior gated byspark.sql.mergeNestedTypeCoercion.enabled.Specific changes:
New config flag:
spark.sql.insertNestedTypeCoercion.enabled(internal, defaultfalse) — mirrors the existingspark.sql.mergeNestedTypeCoercion.enabledfor MERGE.Refactored
TableOutputResolver.resolveOutputColumns: Replaced two overlapping boolean parameters (supportColDefaultValue,fillNestedDefaults) with a singleDefaultValueFillModeenum (NONE,FILL,RECURSE), making the API cleaner and the intent explicit at each call site.RECURSE mode for V2 inserts: When both schema evolution and the coercion flag are enabled,
RECURSEmode fills missing nested struct fields with null, relaxes the by-position arity check, and recurses into structs nested within arrays and maps.Supported scenarios (source has fewer columns/fields than target, with schema evolution + coercion flag):
Why are the changes needed?
MERGE INTOalready supports coercing nested types when the source has fewer struct fields than the target (viaspark.sql.mergeNestedTypeCoercion.enabled).INSERT INTO WITH SCHEMA EVOLUTIONlacked this capability, causing errors for legitimate use cases where the source schema is a subset of the target schema at the nested level.This is important for schema evolution workflows where tables accumulate new nested fields over time, but older data sources may not have all fields populated.
Does this PR introduce any user-facing change?
Yes. When
spark.sql.insertNestedTypeCoercion.enabledis set totrue(defaultfalse),INSERT INTO ... WITH SCHEMA EVOLUTIONwill no longer fail when the source has fewer nested struct fields than the target. Instead, missing fields are filled with null. This is gated behind an internal, experimental config flag.How was this patch tested?
Added 17 new test cases in
InsertIntoSchemaEvolutionTests:Positive tests (with schema evolution + coercion flag):
Negative tests (verifying errors when coercion is disabled):
All 64 matched tests pass.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor (Claude Opus 4)