Skip to content

[FLINK-39577][table] Reuse Calcite's COALESCE to apply simplifications from RexSimplify#28066

Open
snuyanzin wants to merge 5 commits into
apache:masterfrom
snuyanzin:flink39577
Open

[FLINK-39577][table] Reuse Calcite's COALESCE to apply simplifications from RexSimplify#28066
snuyanzin wants to merge 5 commits into
apache:masterfrom
snuyanzin:flink39577

Conversation

@snuyanzin
Copy link
Copy Markdown
Contributor

@snuyanzin snuyanzin commented Apr 29, 2026

What is the purpose of the change

Based on top of [FLINK-39695][table] Outer join on nested field fails with conversion to relational algebra failed to preserve datatypes

There are multiple simplification in Calcite around COALESCE
the problem is that they are not applicable in Flink because it's custom implementation
Luckily we are in a phase where replacement is not painful
The PR is making that

Brief change log

Use Calcite's COALESCE

Verifying this change

Existing COALESCE tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): ( no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no )
  • The runtime per-record code paths (performance sensitive): (no )
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
  • The S3 file system connector: (no )

Documentation

  • Does this pull request introduce a new feature? ( no)
  • If yes, how is the feature documented? (not applicable)

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)


// ~ Methods ----------------------------------------------------------------

// ----- FLINK MODIFICATION BEGIN -----
Copy link
Copy Markdown
Contributor Author

@snuyanzin snuyanzin Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In calcite's properties there is a flag to turn it off
the problem with that flag is that it turns off for all functions, not only for coalesce

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I can follow. What does the flag do?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you say more about what this modification does?

Copy link
Copy Markdown
Contributor Author

@snuyanzin snuyanzin Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a standard way of marking places in classes copied from Calcite what exactly area is changed in this classes

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I do understand that we are overriding / changing something from Calcite.

I am trying to understand what we are changing about the Calcite class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*/
@Internal
@Value.Enclosing
public class RemoveUnreachableCoalesceArgumentsRule
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completely no need for this as we rely on Calcite's RexSimplify

Comment on lines +117 to +130
CoerceInputsRule.Config.DEFAULT
.withCoerceNames(false)
.withConsumerRelClass(classOf[LogicalUnion])
.toRule,
// ensure intersect set operator have the same row type
new CoerceInputsRule(classOf[LogicalIntersect], false),
CoerceInputsRule.Config.DEFAULT
.withCoerceNames(false)
.withConsumerRelClass(classOf[LogicalIntersect])
.toRule,
// ensure except set operator have the same row type
new CoerceInputsRule(classOf[LogicalMinus], false),
CoerceInputsRule.Config.DEFAULT
.withCoerceNames(false)
.withConsumerRelClass(classOf[LogicalMinus])
.toRule,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace deprecated calls

<Resource name="ast">
<![CDATA[
LogicalProject(EXPR$0=[COALESCE($1, _UTF-16LE'-')])
LogicalProject(EXPR$0=[$1])
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calcite was able to detect that the arg is not null

<Resource name="ast">
<![CDATA[
LogicalProject(field2=[$1], transactionId=[COALESCE($0.data.nested.trId)])
LogicalProject(field2=[$1], transactionId=[CAST($0.data.nested.trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
Copy link
Copy Markdown
Contributor Author

@snuyanzin snuyanzin Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is only one arg, no need for coalesce

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 29, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

}
}

def generateCoalesce(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will also fix performance drop for coalesce
mentioned at FLINK-38468

Comment on lines +84 to +86
if (hasNulls && nonNullNodes.size() == operands.size()) {
return SqlLiteral.createNull(pos);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am having trouble understanding this condition.

For each operand, we either set hasNulls or add to the size of the nonNullNodes, right?

Given that, I don't understand how we this could be true. When hasNulls is true, then there's at least one operand which is not added to nonNullNodes, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i rewrote this part in proper way

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Apr 29, 2026
<Resource name="ast">
<![CDATA[
LogicalProject(EXPR$0=[COALESCE($1, _UTF-16LE'-')])
LogicalProject(EXPR$0=[$1])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since RemoveUnreachableCoalesceArgumentsRule.java has been deleted, should this file be renamed? (non-blocking nit)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fin with current name it still checks for unreachable args
do you have a better name to propose?

SqlParserPos pos = call.getParserPosition();
List<SqlNode> nodes = new ArrayList<>();
for (SqlNode operand : operands) {
nodes.add(Objects.requireNonNullElseGet(operand, () -> SqlLiteral.createNull(pos)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Calcite version doesn't seem to be checking if an operand is null.

Is some path creating COALESCE(...,null,...) with a Java-null operand?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does, however it does it for CaseCall

SqlParserPos pos = call.getParserPosition();
List<SqlNode> nodes = new ArrayList<>();
for (SqlNode operand : operands) {
if (!SqlUtil.isNullLiteral(operand, false)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that we are not checking for casts?

Could something interesting happen if someone trying to cast a wider type to null?


import static java.util.Objects.requireNonNull;

/** The class copied from Calcite in order to turn off COALESCE rewrite with CASE ... WHEN ... */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add a note that we are removing the NULLs?

@snuyanzin snuyanzin force-pushed the flink39577 branch 6 times, most recently from fdc5635 to 36c0c5d Compare May 4, 2026 07:40

// ~ Methods ----------------------------------------------------------------

// ----- FLINK MODIFICATION BEGIN -----
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I can follow. What does the flag do?

Comment on lines +227 to +228
// → declared DECIMAL(10, 4) (Calcite widening rule:
// d = max(p1-s1, p2-s2) = max(3, 6) = 6, scale = max(2,4) = 4, precision = 10).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't care about Calcite widening rules, but Flink ones. Defined in FlinkTypeSystem and LogicalTypeMerging.

return specs.stream();
}

private static List<TestSetSpec> allTypesBasic() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add a restore test as a final test whether 2.3 plans (e.g. containing coalesce instead of COALESCE) work correctly.

Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change, @snuyanzin. This drops additional code and Calcite's algorithm is actually smarter, so +1.

The PR lgtm in general. Almost all test updates make sense. I just found one issue, take a look

.onFieldsWithData(null, 100)
.andDataTypes(INT().nullable(), INT().notNull())
.withFunction(ThrowingFunction.class)
.testTableApiRuntimeError(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the same negative control for sql with testSqlRuntimeError?

<Resource name="ast">
<![CDATA[
LogicalProject(order_id_str=[CAST(COALESCE($3.order_id, $0)):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL])
LogicalProject(order_id_str=[CAST($3.order_id):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is incorrect: on a LEFT JOIN with no match, $3 (= order_details_row.r) is NULL, so $3.order_id is NULL too - but the result type with the CAST still claims NOT NULL. With the original COALESCE we'd fall back to a.order_id exactly in that case

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, finally managed to find a solution for this
submitted a separate PR #28181

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rebased on that PR, now this problem is gone

… to relational algebra failed to preserve datatypes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants