[FLINK-39708][table] Support TO_CHANGELOG retract/upsert stream -> upsert stream with set semantics#28199
[FLINK-39708][table] Support TO_CHANGELOG retract/upsert stream -> upsert stream with set semantics#28199gustavodemorais wants to merge 5 commits into
Conversation
…sert stream with set semantics
…ources in TO_CHANGELOG
raminqaf
left a comment
There was a problem hiding this comment.
Thanks for the contribution left some comments!
| #### Avoiding ChangelogNormalize for upsert sources | ||
|
|
||
| When the input is an upsert source (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive. When `PARTITION BY` is provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit the corresponding kinds: | ||
|
|
||
| * Omit `UPDATE_BEFORE` from `op_mapping` to skip `UPDATE_BEFORE` materialization. | ||
| * If the source emits partial `DELETE` events (only the keys flow through, common with Flink's `upsert-kafka` connector or other key-compacted topics), it's necessary to omit `DELETE` from `op_mapping` to skip the full-`DELETE` materialization step that also happens in `ChangelogNormalize`. |
There was a problem hiding this comment.
I think, this is way to detailed for the Flink user. I think if the user wants to partition their data on a specific key they do it and if they don't they will a just avoid using parition by clause
There was a problem hiding this comment.
I agree this is advanced but I wanted to have this information somewhere and can't think of a better place yet - maybe a new session called performance optimization at the bottom? So people only get to it if necessary. I can imagine advanced users wanting to tune their jobs to not have ChangelogNormalize and this explains how
There was a problem hiding this comment.
I'll remove the whole section #28199 (comment)
| } | ||
|
|
||
| /** True when any key in {@code map}, split on comma and trimmed, equals {@code expected}. */ | ||
| private static boolean mapKeysContain(final Map<?, ?> map, final String expected) { |
There was a problem hiding this comment.
How about we do this?
| private static boolean mapKeysContain(final Map<?, ?> map, final String expected) { | |
| private static boolean mapKeysContain(final Map<String, String> map, final String expected) { |
and remove the cast?
| if (opMapping != null) { | ||
| // Only user-supplied mappings are validated. The default mapping covers all kinds by | ||
| // design and is harmless for insert-only or upsert inputs. | ||
| validateAgainstInputChangelogMode(this.rawOpMap, tableSemantics); | ||
| } |
There was a problem hiding this comment.
Could you elaborate why we need this? Is there a way to have test for this?
There was a problem hiding this comment.
This is a nice validation that doesn't allow users to try to map codes which can't be produced by the source. So if you add "UPDATE_AFTER -> u" mapping but you're reading from an INSERT only source, you'll never see that and you get a validation error telling you to only add events that can happen
There was a problem hiding this comment.
I did add a test for that efd5e4a#diff-6c5fc06e852e1c187a10f95345d9168afe4d2683eae2e5f217a81c05af45afe0R555
There was a problem hiding this comment.
Do we really need this validation? IMO we can just remove it and allow the PTF to succeeded.
In your tests if the SIMPLE_SOURCE is only producing Inserts and no deletes the 'DELETE', 'D'. Insert-only or retract source + dead 'DELETE' -> 'D': kind never arrives, no match, pure no-op.
Would be happy to know if we gain any other advantages on this besides better UX.
There was a problem hiding this comment.
It's for better UX, yes. This only happens once in the constructor and help users to wire things correctly so I see no reason not to do it
If I user tries to map updates on an insert only source, we can tell them "hey, this doesn't make sense". Any reason why we wouldn't help users if we can perform the check?
| final Set<RowKind> unsupported = | ||
| mapping.keySet().stream() | ||
| .filter(kind -> !inputMode.contains(kind)) | ||
| .collect(Collectors.toCollection(TreeSet::new)); |
There was a problem hiding this comment.
Why a TreeSet and not a normal HashSet?
There was a problem hiding this comment.
Can't we just use anyMatch here?
There was a problem hiding this comment.
with anyMatch we couldn't list all ops that aren't present in the input.
But a List<RowKind> would suffice as well (keySet ensures no duplicates).
There was a problem hiding this comment.
Changed to final List<RowKind> unsupported = mapping.keySet().stream().filter(kind -> !inputMode.contains(kind)).toList()
fhueske
left a comment
There was a problem hiding this comment.
Thanks for the PR @gustavodemorais.
I left a few comments with questions and suggestions.
Cheers, Fabian
|
|
||
| #### Avoiding ChangelogNormalize for upsert sources | ||
|
|
||
| When the input is an upsert source (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive. When `PARTITION BY` is provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit the corresponding kinds: |
There was a problem hiding this comment.
| When the input is an upsert source (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive. When `PARTITION BY` is provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit the corresponding kinds: | |
| When a query includes an upsert source input (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner typically inserts a `ChangelogNormalize` operator to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive. | |
| By wrapping the upsert source input in a properly configured `TO_CHANGELOG` function, we can avoid the `ChangelogNormalize` operator. For this, the function's table input requires a `PARTITION BY` and an `op_mapping` that does not emit the corresponding kinds: |
I would rephase this a bit and make clear that this is in the context of the TO_CHANGELOG function.
There was a problem hiding this comment.
I wonder if this argument isn't a bit misleading. We "promise" that users can avoid ChangelogNormatlize by using TO_CHANGELOG but don't explain what it really means.
They would need to "manually" handle all changes because Flink only sees an append-only stream.
So it is certainly not a silver bullet and only applicable in a few cases that allow for manual change handling (which most users won't be able to do, IMO).
So isn't this more like an escape hatch for expert users?
If so, it should be clearly positioned as such, IMO.
There was a problem hiding this comment.
I think you have a point. In the future, when we support state, the idea is that both to and from changelog can indeed replace ChangelogNormalize in some cases. I think it's currently not the case and can be misleading and not easy for them to understand. I'll remove this whole section for now and give it a go again when we eventually add state support.
| static TraitCondition mapArgIncludesKey(final String argName, final String key) { | ||
| return new BuiltInCondition( | ||
| BuiltInCondition.Kind.MAP_ARG_INCLUDES_KEY, | ||
| List.of(argName, key), | ||
| ctx -> | ||
| ctx.getScalarArgument(argName, Map.class) | ||
| .map(map -> mapKeysContain(map, key)) | ||
| .orElse(true)); | ||
| } |
There was a problem hiding this comment.
would it make sense to have this defined more generic like
| static TraitCondition mapArgIncludesKey(final String argName, final String key) { | |
| return new BuiltInCondition( | |
| BuiltInCondition.Kind.MAP_ARG_INCLUDES_KEY, | |
| List.of(argName, key), | |
| ctx -> | |
| ctx.getScalarArgument(argName, Map.class) | |
| .map(map -> mapKeysContain(map, key)) | |
| .orElse(true)); | |
| } | |
| static <X> TraitCondition argMatches(final String argName, final Class<X> argClass, final Predicate<X> predicate) { | |
| return new BuiltInCondition( | |
| BuiltInCondition.Kind.ARG_MATCHES, | |
| List.of(argName, argClass, predicate), | |
| ctx -> | |
| ctx.getScalarArgument(argName, argClass) | |
| .stream().anyMatch(predicate)); | |
| } |
There was a problem hiding this comment.
I liked the idea! I had to spend some time into this since this is a new API we just added that will be used by other PTF users. But we were already expecting to add more generic methods to it as we go - so your suggestion fits.
For this case, I had to add or, argIsPresent and argMatches because mapArgIncludesKey had multiple checks which we want to make generic. All pieces all now reusable and like how it looks like now. I kept the mapArgIncludesKey using the argMatches under the hood for readability
| When the input is an upsert source (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive. When `PARTITION BY` is provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit the corresponding kinds: | ||
|
|
||
| * Omit `UPDATE_BEFORE` from `op_mapping` to skip `UPDATE_BEFORE` materialization. | ||
| * If the source emits partial `DELETE` events (only the keys flow through, common with Flink's `upsert-kafka` connector or other key-compacted topics), it's necessary to omit `DELETE` from `op_mapping` to skip the full-`DELETE` materialization step that also happens in `ChangelogNormalize`. |
There was a problem hiding this comment.
isn't it a bit unintuitive to support partial deletes by not declaring a DELETE key in the map?
The function would still emit deletes (partial, not full) although they aren't configured.
Or am I misunderstanding something?
There was a problem hiding this comment.
I'll remove the whole section #28199 (comment)
| final Set<RowKind> unsupported = | ||
| mapping.keySet().stream() | ||
| .filter(kind -> !inputMode.contains(kind)) | ||
| .collect(Collectors.toCollection(TreeSet::new)); |
There was a problem hiding this comment.
with anyMatch we couldn't list all ops that aren't present in the input.
But a List<RowKind> would suffice as well (keySet ensures no duplicates).
fhueske
left a comment
There was a problem hiding this comment.
Thanks for the update @gustavodemorais.
Just one more suggestions (plus a nit and a question.
PR looks good otherwise, Fabian
| "+I[Alice, U, 10]", | ||
| "+I[Bob, U, 20]", | ||
| "+I[Alice, U, 30]") |
There was a problem hiding this comment.
Ah, now I understand the implications of not having a DELETE in the op_mapping.
I thought we would still forward the partial delete, but we are not (it just ensures that we don't waste state on tracking deletes).
Curious, is there a way to forward partial deletes?
|
|
||
| /** | ||
| * True when the named scalar argument is present and its value matches {@code predicate}. False | ||
| * when the argument is absent or cannot be resolved as a literal of {@code argClass}. |
There was a problem hiding this comment.
nit
| * when the argument is absent or cannot be resolved as a literal of {@code argClass}. | |
| * when the argument is absent, cannot be resolved as a literal of {@code argClass}, or {@code predicate} evaluates to `false`. |
| /** | ||
| * True when the named {@code MAP<STRING, STRING>} scalar argument is present and contains | ||
| * {@code key} among its keys. False when the argument is absent or cannot be resolved as a | ||
| * literal {@link Map}. | ||
| * | ||
| * <p>Also matches compound keys: if a key contains commas (e.g. {@code "INSERT,UPDATE_AFTER"}), | ||
| * each comma-separated part is trimmed and compared against {@code key} - useful for mappings | ||
| * where one entry covers multiple kinds. | ||
| */ | ||
| @SuppressWarnings({"rawtypes", "unchecked"}) | ||
| static TraitCondition mapArgIncludesKey(final String argName, final String key) { | ||
| return argMatches( | ||
| argName, Map.class, map -> mapKeysContain((Map<String, String>) map, key)); | ||
| } |
There was a problem hiding this comment.
Not sure if we should add this condition method.
The hard cast to Map<String,String> and the custom key check with split() and trim() is specific for TO_CHANGELOG and not very generic, IMO.
If we would move mapKeysContain() as a helper to BuildInFunctionDefinitions (or some other (utils) class and make it public static), we also wouldn't need the or and not construct in the withConditionalTrait() method because argMatches() checks for arg presence.
What is the purpose of the change
Lets TO_CHANGELOG produce an upsert stream (instead of always retract) when called with PARTITION BY and an op_mapping that omits UPDATE_BEFORE and/or DELETE. The planner can then skip the stateful ChangelogNormalize operator that would otherwise materialize those rows.
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving): yes - TraitCondition is @PublicEvolving and gains a new factoryDocumentation
Was generative AI tooling used to co-author this PR?
2.1.117 (Claude Code) & Opus 4.7