Skip to content

[FLINK-39339][table-planner] Consider immutable cols to infer sink required updated mode traits#27838

Open
xuyangzhong wants to merge 3 commits intoapache:masterfrom
xuyangzhong:sink_uk
Open

[FLINK-39339][table-planner] Consider immutable cols to infer sink required updated mode traits#27838
xuyangzhong wants to merge 3 commits intoapache:masterfrom
xuyangzhong:sink_uk

Conversation

@xuyangzhong
Copy link
Copy Markdown
Contributor

What is the purpose of the change

When deciding whether a sink can accept ONLY_UPDATE_AFTER (i.e., drop UPDATE_BEFORE), the current logic requires the input's upsert key to exactly match the sink's primary key. This is overly conservative when immutable columns are present.I f the extra columns in the sink PK (beyond the upsert key) are all immutable, then the sink PK is effectively a valid upsert key and UPDATE_BEFORE can still be safely dropped.

Brief change log

  • In inferSinkRequiredTraits, query the input's immutable columns via FlinkRelMetadataQuery.getImmutableColumns
  • Relax the upsert-key-vs-sink-PK check: instead of requiring exact equality, allow the sink PK to match if it contains a upsert key and all remaining columns are immutable
  • Add corresponding test cases in ChangelogModeInferenceTest

Verifying this change

New tests are added to verify this pr.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented?

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Mar 27, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuyangzhong +1 for the optimization! I left some comments there.

For the tests, we should add at least one test case triggering the DELETE_BY_KEY path with immutable columns.

* <p>Notice: even if sink pk(s) contains input upsert key we cannot optimize to UA only when the
* upsert key has columns outside sink pk, this differs from batch job's unique key inference.
*/
private def areUpsertKeysWithImmutableColsDifferentFromPk(sink: StreamPhysicalSink): Boolean = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The word "Different" implies an equality comparison (the old method areUpsertKeysDifferentFromPk indeed used !exists(_.equals(sinkPks))). However, the new logic actually checks a subset/coverage relationship, so the method name should more accurately reflect the "coverage"(satisfy) semantics.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed it to canUpsertKeysWithImmutableColsSatisfyPk

val immutableCols = fmq.getImmutableColumns(sink.getInput)
val effectiveImmutableCols =
if (immutableCols != null) immutableCols else ImmutableBitSet.of()
// check: uk ⊆ sinkPks AND sinkPks ⊆ (uk ∪ immutableCols)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The double negation might be readability issue here.

A minimal rewrite version(maybe slightly improvement or you can think of other writings):

val sinkPkCovered = changeLogUpsertKeys.exists { uk =>
  val ukWithinPk = sinkPks.contains(uk)
  val pkFullyCovered = uk.union(effectiveImmutableCols).contains(sinkPks)
  ukWithinPk && pkFullyCovered
}
!sinkPkCovered

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the code here and add an example in the method comment.

@xuyangzhong
Copy link
Copy Markdown
Contributor Author

@xuyangzhong +1 for the optimization! I left some comments there.

For the tests, we should add at least one test case triggering the DELETE_BY_KEY path with immutable columns.

Currently, immutable cols cannot be defined on sources that output rowkind DELETE, making it difficult to add tests. Therefore, I've decided to abandon this approach and revert the changes to SatisfyDeleteKindTraitVisitor, focusing only on modifications to SatisfyUpdateKindTraitVisitor. WDYT?

@lincoln-lil
Copy link
Copy Markdown
Contributor

@xuyangzhong +1 for the optimization! I left some comments there.
For the tests, we should add at least one test case triggering the DELETE_BY_KEY path with immutable columns.

Currently, immutable cols cannot be defined on sources that output rowkind DELETE, making it difficult to add tests. Therefore, I've decided to abandon this approach and revert the changes to SatisfyDeleteKindTraitVisitor, focusing only on modifications to SatisfyUpdateKindTraitVisitor. WDYT?

Sounds reasonable.

Copy link
Copy Markdown
Contributor

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuyangzhong Thanks for the update! +1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants