Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3260,9 +3260,11 @@ class Analyzer(
// The star will be expanded differently if we insert `Generate` under `Project` too early.
case p @ Project(projectList, child) if !projectList.exists(_.exists(_.isInstanceOf[Star])) =>
val (resolvedGenerator, newProjectList) = projectList
.map(trimNonTopLevelAliases)
.foldLeft((None: Option[Generate], Nil: Seq[NamedExpression])) { (res, e) =>
e match {
// SPARK-48091: Only trim aliases on the generator expression itself. Trimming
// non-generator expressions strips aliases inside lambda functions (e.g.,
// struct(x.as("data"))) before they can be resolved into struct field names.
trimNonTopLevelAliases(e) match {
// If there are more than one generator, we only rewrite the first one and wait for
// the next analyzer iteration to rewrite the next one.
case AliasedGenerator(generator, names, outer) if res._1.isEmpty &&
Expand All @@ -3275,8 +3277,8 @@ class Analyzer(
generatorOutput = GeneratorResolution.makeGeneratorOutput(generator, names),
child)
(Some(g), res._2 ++ g.nullableOutput)
case other =>
(res._1, res._2 :+ other)
case _ =>
(res._1, res._2 :+ e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,37 @@ class GeneratorFunctionSuite extends SharedSparkSession {
Seq(Row(0, 10, 0, 10), Row(1, 20, 1, 20))
)
}

test("SPARK-48091: explode with transform should preserve struct field aliases") {
val df = spark.createDataFrame(Seq((1, Array(1, 2, 3), Array(4, 5, 6))))
.toDF("id", "my_array", "my_array2")

// Without explode - aliases should work (baseline)
val good = df.select(
transform(col("my_array2"), x => struct(x.as("data"))).as("my_struct")
)
assert(good.schema("my_struct").dataType.asInstanceOf[types.ArrayType]
.elementType.asInstanceOf[StructType].fieldNames.toSeq === Seq("data"))

// With explode in same select - aliases should still be preserved
val result = df.select(
explode(col("my_array")).as("exploded"),
transform(col("my_array2"), x => struct(x.as("data"))).as("my_struct")
)
assert(result.schema("my_struct").dataType.asInstanceOf[types.ArrayType]
.elementType.asInstanceOf[StructType].fieldNames.toSeq === Seq("data"))

// Multiple aliases inside struct
val result2 = df.select(
explode(col("my_array")).as("exploded"),
transform(col("my_array2"),
x => struct(x.as("value"), col("id").as("key"))
).as("my_struct")
)
val fields2 = result2.schema("my_struct").dataType.asInstanceOf[types.ArrayType]
.elementType.asInstanceOf[StructType].fieldNames.toSeq
assert(fields2 === Seq("value", "key"))
}
}

case class EmptyGenerator() extends Generator with LeafLike[Expression] {
Expand Down