Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,12 @@
<td>Boolean</td>
<td>If true, enables the file-level pruning step for MergeInto partial column update on data-evolution tables. Set this to false when most files in the target partition are expected to be updated, so that the overhead of collecting touched file IDs outweighs the benefit of pruning untouched files.</td>
</tr>
<tr>
<td><h5>data-evolution.merge-into.source-persist</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to persist source when process merge into action on data evolution table.</td>
</tr>
<tr>
<td><h5>data-file.external-paths</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2249,6 +2249,13 @@ public InlineElement getDescription() {
+ "to be updated, so that the overhead of collecting touched file IDs "
+ "outweighs the benefit of pruning untouched files.");

public static final ConfigOption<Boolean> DATA_EVOLUTION_MERGE_INTO_SOURCE_PERSIST =
key("data-evolution.merge-into.source-persist")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to persist source when process merge into action on data evolution table.");

public static final ConfigOption<Boolean> BLOB_COMPACTION_ENABLED =
key("blob-compaction.enabled")
.booleanType()
Expand Down Expand Up @@ -3759,6 +3766,10 @@ public boolean dataEvolutionMergeIntoFilePruning() {
return options.get(DATA_EVOLUTION_MERGE_INTO_FILE_PRUNING);
}

public boolean dataEvolutionMergeIntoSourcePersist() {
return options.get(DATA_EVOLUTION_MERGE_INTO_SOURCE_PERSIST);
}

public boolean blobCompactionEnabled() {
return options.get(BLOB_COMPACTION_ENABLED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,30 +189,53 @@ case class MergeIntoPaimonDataEvolutionTable(
map.toMap
}

// step 1: find the related data splits, make it target file plan
val dataSplits: Seq[DataSplit] =
targetRelatedSplits(sparkSession, tableSplits, firstRowIds, firstRowIdToBlobFirstRowIds)
val touchedFileTargetRelation =
createNewScanPlan(dataSplits, targetRelation)

// step 2: invoke update action
val updateCommit =
if (matchedActions.nonEmpty) {
val updateResult =
updateActionInvoke(dataSplits, sparkSession, touchedFileTargetRelation, firstRowIds)
checkUpdateResult(updateResult)
} else Nil

// step 3: invoke insert action
val insertCommit =
if (notMatchedActions.nonEmpty)
insertActionInvoke(sparkSession, touchedFileTargetRelation)
else Nil

if (plan.snapshotId() != null) {
writer.rowIdCheckConflict(plan.snapshotId())
val persistSourceDss: Option[Dataset[Row]] =
if (table.coreOptions().dataEvolutionMergeIntoSourcePersist() && matchedActions.nonEmpty) {
val dss = createDataset(sparkSession, sourceTable)
dss.persist()
Some(dss)
} else {
None
}

try {
// step 1: find the related data splits, make it target file plan
val dataSplits: Seq[DataSplit] = targetRelatedSplits(
sparkSession,
tableSplits,
firstRowIds,
firstRowIdToBlobFirstRowIds,
persistSourceDss)
val touchedFileTargetRelation =
createNewScanPlan(dataSplits, targetRelation)

// step 2: invoke update action
val updateCommit =
if (matchedActions.nonEmpty) {
val updateResult = updateActionInvoke(
dataSplits,
sparkSession,
touchedFileTargetRelation,
firstRowIds,
persistSourceDss)
checkUpdateResult(updateResult)
} else Nil

// step 3: invoke insert action
val insertCommit =
if (notMatchedActions.nonEmpty)
insertActionInvoke(sparkSession, touchedFileTargetRelation)
else Nil

if (plan.snapshotId() != null) {
writer.rowIdCheckConflict(plan.snapshotId())
}
writer.commit(updateCommit ++ insertCommit)
} finally {
if (persistSourceDss.isDefined) {
persistSourceDss.get.unpersist(blocking = false)
}
}
writer.commit(updateCommit ++ insertCommit)
}

private def pushDownMergePartitionFilter(snapshotReader: SnapshotReader): Unit = {
Expand Down Expand Up @@ -254,7 +277,8 @@ case class MergeIntoPaimonDataEvolutionTable(
sparkSession: SparkSession,
tableSplits: Seq[DataSplit],
firstRowIds: immutable.IndexedSeq[Long],
firstRowIdToBlobFirstRowIds: Map[Long, List[Long]]): Seq[DataSplit] = {
firstRowIdToBlobFirstRowIds: Map[Long, List[Long]],
persistSourceDss: Option[Dataset[Row]]): Seq[DataSplit] = {
// Self-Merge shortcut:
// In Self-Merge mode, every row in the table may be updated, so we scan all splits.
if (isSelfMergeOnRowId) {
Expand All @@ -268,7 +292,7 @@ case class MergeIntoPaimonDataEvolutionTable(
return tableSplits
}

val sourceDss = createDataset(sparkSession, sourceTable)
val sourceDss = persistSourceDss.getOrElse(createDataset(sparkSession, sourceTable))

val firstRowIdsTouched = extractSourceRowIdMapping match {
case Some(sourceRowIdAttr) =>
Expand Down Expand Up @@ -305,7 +329,8 @@ case class MergeIntoPaimonDataEvolutionTable(
dataSplits: Seq[DataSplit],
sparkSession: SparkSession,
touchedFileTargetRelation: DataSourceV2Relation,
firstRowIds: immutable.IndexedSeq[Long]): Seq[CommitMessage] = {
firstRowIds: immutable.IndexedSeq[Long],
persistSourceDss: Option[Dataset[Row]]): Seq[CommitMessage] = {
val mergeFields = extractFields(matchedCondition)
val allFields = mutable.SortedSet.empty[AttributeReference](
(o1, o2) => {
Expand Down Expand Up @@ -426,7 +451,8 @@ case class MergeIntoPaimonDataEvolutionTable(

val sourceTableProjExprs =
allReadFieldsOnSource.toSeq :+ Alias(TrueLiteral, ROW_FROM_SOURCE)()
val sourceTableProj = Project(sourceTableProjExprs, sourceTable)
val sourceChild = persistSourceDss.map(_.queryExecution.logical).getOrElse(sourceTable)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only wires the cached source into the matched/update path. For a MERGE that has both matched and not-matched clauses, insertActionInvoke still builds its left-anti join from sourceTable, so the source is scanned again after the update path. Could you pass the persisted source into the insert path too, so the new option avoids repeated source loading for the whole merge action?

val sourceTableProj = Project(sourceTableProjExprs, sourceChild)

val joinPlan =
Join(targetTableProj, sourceTableProj, LeftOuter, Some(matchedCondition), JoinHint.NONE)
Expand Down
Loading
Loading