[FLINK-39577][table] Reuse Calcite's COALESCE to apply simplifications from RexSimplify#28066
[FLINK-39577][table] Reuse Calcite's COALESCE to apply simplifications from RexSimplify#28066snuyanzin wants to merge 5 commits into
COALESCE to apply simplifications from RexSimplify#28066Conversation
|
|
||
| // ~ Methods ---------------------------------------------------------------- | ||
|
|
||
| // ----- FLINK MODIFICATION BEGIN ----- |
There was a problem hiding this comment.
In calcite's properties there is a flag to turn it off
the problem with that flag is that it turns off for all functions, not only for coalesce
There was a problem hiding this comment.
Not sure I can follow. What does the flag do?
There was a problem hiding this comment.
Can you say more about what this modification does?
There was a problem hiding this comment.
it's a standard way of marking places in classes copied from Calcite what exactly area is changed in this classes
There was a problem hiding this comment.
Sorry, I do understand that we are overriding / changing something from Calcite.
I am trying to understand what we are changing about the Calcite class.
There was a problem hiding this comment.
by default Calcite rewrites COALESCE with CASE ... WHEN and we don't want that
| */ | ||
| @Internal | ||
| @Value.Enclosing | ||
| public class RemoveUnreachableCoalesceArgumentsRule |
There was a problem hiding this comment.
completely no need for this as we rely on Calcite's RexSimplify
| CoerceInputsRule.Config.DEFAULT | ||
| .withCoerceNames(false) | ||
| .withConsumerRelClass(classOf[LogicalUnion]) | ||
| .toRule, | ||
| // ensure intersect set operator have the same row type | ||
| new CoerceInputsRule(classOf[LogicalIntersect], false), | ||
| CoerceInputsRule.Config.DEFAULT | ||
| .withCoerceNames(false) | ||
| .withConsumerRelClass(classOf[LogicalIntersect]) | ||
| .toRule, | ||
| // ensure except set operator have the same row type | ||
| new CoerceInputsRule(classOf[LogicalMinus], false), | ||
| CoerceInputsRule.Config.DEFAULT | ||
| .withCoerceNames(false) | ||
| .withConsumerRelClass(classOf[LogicalMinus]) | ||
| .toRule, |
There was a problem hiding this comment.
replace deprecated calls
| <Resource name="ast"> | ||
| <![CDATA[ | ||
| LogicalProject(EXPR$0=[COALESCE($1, _UTF-16LE'-')]) | ||
| LogicalProject(EXPR$0=[$1]) |
There was a problem hiding this comment.
Calcite was able to detect that the arg is not null
| <Resource name="ast"> | ||
| <![CDATA[ | ||
| LogicalProject(field2=[$1], transactionId=[COALESCE($0.data.nested.trId)]) | ||
| LogicalProject(field2=[$1], transactionId=[CAST($0.data.nested.trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]) |
There was a problem hiding this comment.
there is only one arg, no need for coalesce
| } | ||
| } | ||
|
|
||
| def generateCoalesce( |
There was a problem hiding this comment.
this will also fix performance drop for coalesce
mentioned at FLINK-38468
| if (hasNulls && nonNullNodes.size() == operands.size()) { | ||
| return SqlLiteral.createNull(pos); | ||
| } |
There was a problem hiding this comment.
I am having trouble understanding this condition.
For each operand, we either set hasNulls or add to the size of the nonNullNodes, right?
Given that, I don't understand how we this could be true. When hasNulls is true, then there's at least one operand which is not added to nonNullNodes, right?
There was a problem hiding this comment.
i rewrote this part in proper way
| <Resource name="ast"> | ||
| <![CDATA[ | ||
| LogicalProject(EXPR$0=[COALESCE($1, _UTF-16LE'-')]) | ||
| LogicalProject(EXPR$0=[$1]) |
There was a problem hiding this comment.
Since RemoveUnreachableCoalesceArgumentsRule.java has been deleted, should this file be renamed? (non-blocking nit)
There was a problem hiding this comment.
I'm fin with current name it still checks for unreachable args
do you have a better name to propose?
| SqlParserPos pos = call.getParserPosition(); | ||
| List<SqlNode> nodes = new ArrayList<>(); | ||
| for (SqlNode operand : operands) { | ||
| nodes.add(Objects.requireNonNullElseGet(operand, () -> SqlLiteral.createNull(pos))); |
There was a problem hiding this comment.
The Calcite version doesn't seem to be checking if an operand is null.
Is some path creating COALESCE(...,null,...) with a Java-null operand?
There was a problem hiding this comment.
it does, however it does it for CaseCall
| SqlParserPos pos = call.getParserPosition(); | ||
| List<SqlNode> nodes = new ArrayList<>(); | ||
| for (SqlNode operand : operands) { | ||
| if (!SqlUtil.isNullLiteral(operand, false)) { |
There was a problem hiding this comment.
Is it intentional that we are not checking for casts?
Could something interesting happen if someone trying to cast a wider type to null?
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| /** The class copied from Calcite in order to turn off COALESCE rewrite with CASE ... WHEN ... */ |
There was a problem hiding this comment.
Do we want to add a note that we are removing the NULLs?
fdc5635 to
36c0c5d
Compare
|
|
||
| // ~ Methods ---------------------------------------------------------------- | ||
|
|
||
| // ----- FLINK MODIFICATION BEGIN ----- |
There was a problem hiding this comment.
Not sure I can follow. What does the flag do?
| // → declared DECIMAL(10, 4) (Calcite widening rule: | ||
| // d = max(p1-s1, p2-s2) = max(3, 6) = 6, scale = max(2,4) = 4, precision = 10). |
There was a problem hiding this comment.
We shouldn't care about Calcite widening rules, but Flink ones. Defined in FlinkTypeSystem and LogicalTypeMerging.
| return specs.stream(); | ||
| } | ||
|
|
||
| private static List<TestSetSpec> allTypesBasic() { |
There was a problem hiding this comment.
Please also add a restore test as a final test whether 2.3 plans (e.g. containing coalesce instead of COALESCE) work correctly.
There was a problem hiding this comment.
Thanks for the change, @snuyanzin. This drops additional code and Calcite's algorithm is actually smarter, so +1.
The PR lgtm in general. Almost all test updates make sense. I just found one issue, take a look
| .onFieldsWithData(null, 100) | ||
| .andDataTypes(INT().nullable(), INT().notNull()) | ||
| .withFunction(ThrowingFunction.class) | ||
| .testTableApiRuntimeError( |
There was a problem hiding this comment.
Could you add the same negative control for sql with testSqlRuntimeError?
| <Resource name="ast"> | ||
| <![CDATA[ | ||
| LogicalProject(order_id_str=[CAST(COALESCE($3.order_id, $0)):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL]) | ||
| LogicalProject(order_id_str=[CAST($3.order_id):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL]) |
There was a problem hiding this comment.
I think this is incorrect: on a LEFT JOIN with no match, $3 (= order_details_row.r) is NULL, so $3.order_id is NULL too - but the result type with the CAST still claims NOT NULL. With the original COALESCE we'd fall back to a.order_id exactly in that case
There was a problem hiding this comment.
ok, finally managed to find a solution for this
submitted a separate PR #28181
There was a problem hiding this comment.
I rebased on that PR, now this problem is gone
… to relational algebra failed to preserve datatypes
What is the purpose of the change
Based on top of [FLINK-39695][table] Outer join on nested field fails with conversion to relational algebra failed to preserve datatypes
There are multiple simplification in Calcite around
COALESCEthe problem is that they are not applicable in Flink because it's custom implementation
Luckily we are in a phase where replacement is not painful
The PR is making that
Brief change log
Use Calcite's
COALESCEVerifying this change
Existing
COALESCEtestsDoes this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation
Was generative AI tooling used to co-author this PR?