Skip to content

[FLINK-39708][table] Support TO_CHANGELOG retract/upsert stream -> upsert stream with set semantics#28199

Open
gustavodemorais wants to merge 5 commits into
apache:masterfrom
confluentinc:to-changelog-upsert
Open

[FLINK-39708][table] Support TO_CHANGELOG retract/upsert stream -> upsert stream with set semantics#28199
gustavodemorais wants to merge 5 commits into
apache:masterfrom
confluentinc:to-changelog-upsert

Conversation

@gustavodemorais
Copy link
Copy Markdown
Contributor

What is the purpose of the change

Lets TO_CHANGELOG produce an upsert stream (instead of always retract) when called with PARTITION BY and an op_mapping that omits UPDATE_BEFORE and/or DELETE. The planner can then skip the stateful ChangelogNormalize operator that would otherwise materialize those rows.

Brief change log

  • Add TraitCondition.mapArgIncludesKey factory and BuiltInCondition.Kind.MAP_ARG_INCLUDES_KEY
  • Make REQUIRE_UPDATE_BEFORE and REQUIRE_FULL_DELETE conditional on op_mapping in TO_CHANGELOG
  • Reject user-supplied op_mapping entries referencing change kinds the input cannot produce
  • Add plan tests for the new conditional-trait combinations
  • Add semantic test for the new validation
  • Document the optimization under TO_CHANGELOG

Verifying this change

  • ToChangelogRestoreTest (existing)
  • ToChangelogTest (Added new)
  • ToChangelogSemanticTests (Added new)

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes - TraitCondition is @PublicEvolving and gains a new factory
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? (docs/content/docs/sql/reference/queries/changelog.md, JavaDoc on TraitCondition.mapArgIncludesKey and ToChangelogFunction.validateAgainstInputChangelogMode)

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

2.1.117 (Claude Code) & Opus 4.7

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 19, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@raminqaf raminqaf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution left some comments!

Comment on lines +384 to +389
#### Avoiding ChangelogNormalize for upsert sources

When the input is an upsert source (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive. When `PARTITION BY` is provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit the corresponding kinds:

* Omit `UPDATE_BEFORE` from `op_mapping` to skip `UPDATE_BEFORE` materialization.
* If the source emits partial `DELETE` events (only the keys flow through, common with Flink's `upsert-kafka` connector or other key-compacted topics), it's necessary to omit `DELETE` from `op_mapping` to skip the full-`DELETE` materialization step that also happens in `ChangelogNormalize`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, this is way to detailed for the Flink user. I think if the user wants to partition their data on a specific key they do it and if they don't they will a just avoid using parition by clause

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is advanced but I wanted to have this information somewhere and can't think of a better place yet - maybe a new session called performance optimization at the bottom? So people only get to it if necessary. I can imagine advanced users wanting to tune their jobs to not have ChangelogNormalize and this explains how

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove the whole section #28199 (comment)

}

/** True when any key in {@code map}, split on comma and trimmed, equals {@code expected}. */
private static boolean mapKeysContain(final Map<?, ?> map, final String expected) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we do this?

Suggested change
private static boolean mapKeysContain(final Map<?, ?> map, final String expected) {
private static boolean mapKeysContain(final Map<String, String> map, final String expected) {

and remove the cast?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +85 to +89
if (opMapping != null) {
// Only user-supplied mappings are validated. The default mapping covers all kinds by
// design and is harmless for insert-only or upsert inputs.
validateAgainstInputChangelogMode(this.rawOpMap, tableSemantics);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate why we need this? Is there a way to have test for this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice validation that doesn't allow users to try to map codes which can't be produced by the source. So if you add "UPDATE_AFTER -> u" mapping but you're reading from an INSERT only source, you'll never see that and you get a validation error telling you to only add events that can happen

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this validation? IMO we can just remove it and allow the PTF to succeeded.

In your tests if the SIMPLE_SOURCE is only producing Inserts and no deletes the 'DELETE', 'D'. Insert-only or retract source + dead 'DELETE' -> 'D': kind never arrives, no match, pure no-op.

Would be happy to know if we gain any other advantages on this besides better UX.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for better UX, yes. This only happens once in the constructor and help users to wire things correctly so I see no reason not to do it

If I user tries to map updates on an insert only source, we can tell them "hey, this doesn't make sense". Any reason why we wouldn't help users if we can perform the check?

final Set<RowKind> unsupported =
mapping.keySet().stream()
.filter(kind -> !inputMode.contains(kind))
.collect(Collectors.toCollection(TreeSet::new));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a TreeSet and not a normal HashSet?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just use anyMatch here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with anyMatch we couldn't list all ops that aren't present in the input.

But a List<RowKind> would suffice as well (keySet ensures no duplicates).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to final List<RowKind> unsupported = mapping.keySet().stream().filter(kind -> !inputMode.contains(kind)).toList()

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label May 19, 2026
Copy link
Copy Markdown
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @gustavodemorais.

I left a few comments with questions and suggestions.

Cheers, Fabian


#### Avoiding ChangelogNormalize for upsert sources

When the input is an upsert source (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive. When `PARTITION BY` is provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit the corresponding kinds:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
When the input is an upsert source (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive. When `PARTITION BY` is provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit the corresponding kinds:
When a query includes an upsert source input (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner typically inserts a `ChangelogNormalize` operator to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive.
By wrapping the upsert source input in a properly configured `TO_CHANGELOG` function, we can avoid the `ChangelogNormalize` operator. For this, the function's table input requires a `PARTITION BY` and an `op_mapping` that does not emit the corresponding kinds:

I would rephase this a bit and make clear that this is in the context of the TO_CHANGELOG function.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this argument isn't a bit misleading. We "promise" that users can avoid ChangelogNormatlize by using TO_CHANGELOG but don't explain what it really means.
They would need to "manually" handle all changes because Flink only sees an append-only stream.
So it is certainly not a silver bullet and only applicable in a few cases that allow for manual change handling (which most users won't be able to do, IMO).

So isn't this more like an escape hatch for expert users?
If so, it should be clearly positioned as such, IMO.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you have a point. In the future, when we support state, the idea is that both to and from changelog can indeed replace ChangelogNormalize in some cases. I think it's currently not the case and can be misleading and not easy for them to understand. I'll remove this whole section for now and give it a go again when we eventually add state support.

Comment on lines +81 to +89
static TraitCondition mapArgIncludesKey(final String argName, final String key) {
return new BuiltInCondition(
BuiltInCondition.Kind.MAP_ARG_INCLUDES_KEY,
List.of(argName, key),
ctx ->
ctx.getScalarArgument(argName, Map.class)
.map(map -> mapKeysContain(map, key))
.orElse(true));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to have this defined more generic like

Suggested change
static TraitCondition mapArgIncludesKey(final String argName, final String key) {
return new BuiltInCondition(
BuiltInCondition.Kind.MAP_ARG_INCLUDES_KEY,
List.of(argName, key),
ctx ->
ctx.getScalarArgument(argName, Map.class)
.map(map -> mapKeysContain(map, key))
.orElse(true));
}
static <X> TraitCondition argMatches(final String argName, final Class<X> argClass, final Predicate<X> predicate) {
return new BuiltInCondition(
BuiltInCondition.Kind.ARG_MATCHES,
List.of(argName, argClass, predicate),
ctx ->
ctx.getScalarArgument(argName, argClass)
.stream().anyMatch(predicate));
}

Copy link
Copy Markdown
Contributor Author

@gustavodemorais gustavodemorais May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked the idea! I had to spend some time into this since this is a new API we just added that will be used by other PTF users. But we were already expecting to add more generic methods to it as we go - so your suggestion fits.

For this case, I had to add or, argIsPresent and argMatches because mapArgIncludesKey had multiple checks which we want to make generic. All pieces all now reusable and like how it looks like now. I kept the mapArgIncludesKey using the argMatches under the hood for readability

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look cb8dbe8

When the input is an upsert source (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive. When `PARTITION BY` is provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit the corresponding kinds:

* Omit `UPDATE_BEFORE` from `op_mapping` to skip `UPDATE_BEFORE` materialization.
* If the source emits partial `DELETE` events (only the keys flow through, common with Flink's `upsert-kafka` connector or other key-compacted topics), it's necessary to omit `DELETE` from `op_mapping` to skip the full-`DELETE` materialization step that also happens in `ChangelogNormalize`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it a bit unintuitive to support partial deletes by not declaring a DELETE key in the map?
The function would still emit deletes (partial, not full) although they aren't configured.
Or am I misunderstanding something?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove the whole section #28199 (comment)

final Set<RowKind> unsupported =
mapping.keySet().stream()
.filter(kind -> !inputMode.contains(kind))
.collect(Collectors.toCollection(TreeSet::new));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with anyMatch we couldn't list all ops that aren't present in the input.

But a List<RowKind> would suffice as well (keySet ensures no duplicates).

Copy link
Copy Markdown
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @gustavodemorais.

Just one more suggestions (plus a nit and a question.

PR looks good otherwise, Fabian

Comment on lines +238 to +240
"+I[Alice, U, 10]",
"+I[Bob, U, 20]",
"+I[Alice, U, 30]")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I understand the implications of not having a DELETE in the op_mapping.
I thought we would still forward the partial delete, but we are not (it just ensures that we don't waste state on tracking deletes).

Curious, is there a way to forward partial deletes?


/**
* True when the named scalar argument is present and its value matches {@code predicate}. False
* when the argument is absent or cannot be resolved as a literal of {@code argClass}.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
* when the argument is absent or cannot be resolved as a literal of {@code argClass}.
* when the argument is absent, cannot be resolved as a literal of {@code argClass}, or {@code predicate} evaluates to `false`.

Comment on lines +109 to +122
/**
* True when the named {@code MAP<STRING, STRING>} scalar argument is present and contains
* {@code key} among its keys. False when the argument is absent or cannot be resolved as a
* literal {@link Map}.
*
* <p>Also matches compound keys: if a key contains commas (e.g. {@code "INSERT,UPDATE_AFTER"}),
* each comma-separated part is trimmed and compared against {@code key} - useful for mappings
* where one entry covers multiple kinds.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
static TraitCondition mapArgIncludesKey(final String argName, final String key) {
return argMatches(
argName, Map.class, map -> mapKeysContain((Map<String, String>) map, key));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we should add this condition method.
The hard cast to Map<String,String> and the custom key check with split() and trim() is specific for TO_CHANGELOG and not very generic, IMO.

If we would move mapKeysContain() as a helper to BuildInFunctionDefinitions (or some other (utils) class and make it public static), we also wouldn't need the or and not construct in the withConditionalTrait() method because argMatches() checks for arg presence.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants