Skip to content

Add probe-side runtime filters for inner joins in the multi-stage engine#18848

Open
yashmayya wants to merge 1 commit into
apache:masterfrom
yashmayya:inner-join-runtime-filter
Open

Add probe-side runtime filters for inner joins in the multi-stage engine#18848
yashmayya wants to merge 1 commit into
apache:masterfrom
yashmayya:inner-join-runtime-filter

Conversation

@yashmayya

@yashmayya yashmayya commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

What

Adds probe-side runtime filters for equi-INNER joins in the multi-stage query engine (MSE). When
the build (right) side of a hash join is small or selective, the planner builds a reducer from its
join keys and pushes it down to the probe (left) leaf scan, so the probe table drops rows
that cannot possibly match before they are shuffled across the network into the join.

This is the INNER-join counterpart of the existing SEMI-join dynamic broadcast
(PinotJoinToDynamicBroadcastRule). It is disabled by default.

Why

For the classic fact ⋈ dim shape — a large fact table joined to a small dimension table (or a heavily
filtered build side) — the MSE today hash-shuffles the entire probe (fact) side into the join stage,
even though only the rows whose join key appears on the (tiny) build side can contribute to the result.
That wastes scan, serialization, and network bandwidth proportional to the whole fact table rather than
to the matching subset.

The SEMI-join path already solves the analogous problem by replacing the join with a leaf IN filter,
but that rewrite is only legal for semi-joins (which emit left columns only). An inner join projects
columns from both sides, so the join must still run. This PR therefore makes the filter additive:
the real hash join is left completely intact, and we only add a reducer on the probe leaf.

How it works

After exchange insertion (POST_LOGICAL), PinotJoinToInnerRuntimeFilterRule rewrites an eligible
inner join:

        [ Inner Join ]   (unchanged — still hash-shuffles both sides)
        /            \
   [xChange L]    [xChange R]
       /                \
 [RuntimeFilter]    [build subtree]
    /        \
[probe leaf] [PIPELINE_BREAKER xChange]
                   |
            [ build keys: Project(rightKeys) -> Filter(IS NOT NULL) -> limit(maxBuildRows + 1) ]
  • The join and both of its HASH exchanges are kept verbatim — execution and results are identical to
    before; the filter is purely additive.
  • A new RuntimeFilterRel / RuntimeFilterNode is grafted on top of the probe leaf subtree
    (input[0] = probe pipeline, pass-through; input[1] = a PIPELINE_BREAKER mailbox carrying the
    build-side join keys). The pipeline breaker runs the build side first and ships its keys to
    the probe-leaf worker, reusing the same mechanism as the SEMI dynamic broadcast.
  • At the probe leaf, ServerPlanRequestUtils.attachRuntimeFilter ANDs a tiered, no-false-negative
    reducer onto the V1 leaf query:
    • Exact IN for small key sets (at/below a build-key-row threshold), or for multi-key /
      BIG_DECIMAL keys. This is index-accelerated and drives segment pruning.
    • Bloom filter (IN_ID_SET) above the threshold, plus a BETWEEN(min, max) range predicate for
      numeric keys to enable cheap range-based segment pruning. Bloom keeps the wire/heap footprint bounded
      for high-cardinality build sides.
  • Because the real hash join is the source of truth, the reducer can be abandoned at any point
    (empty/over-cap build, oversized bloom, unsupported subtree, mixed-version cluster) with no effect on
    results. Bloom false positives are simply re-checked and discarded by the join.

This mirrors runtime/dynamic filtering in Trino, Impala, and Spark's InjectRuntimeFilter.

When it helps

  • Large fact table joined to a small or selectively-filtered dimension/build side.
  • The probe side is a leaf scan (table scan, optionally with single-input Project/Filter), so the
    filter can be pushed all the way down to segment scan.

It is not beneficial (and is best left off) when the build side is large or non-selective, or the
probe is cheap — there is no automatic selectivity-based gate yet, so enablement is opt-in (see below).

How to use

Per-join hint (selects the reducer mode):

SELECT /*+ joinOptions(runtime_filter='auto') */ ...
FROM fact JOIN dim ON fact.key = dim.key
WHERE dim.attr = 'x'

runtime_filter accepts off | in | bloom | auto (exact IN below the threshold, else bloom).

Cluster-wide default (enable/disable only; defaults to auto when on):

pinot.broker.enable.runtime.filter.join=true

Per-query override: SET runtimeFilterJoin='on' (or off).

Thresholds & defaults

There is one user-facing switch (the enable flag / query option / hint). Every sizing threshold below is
a fixed constant in this first version — each affects only the filter's selectivity and size, never
correctness (the real hash join re-checks every surviving row), so they are intentionally not
cluster-configurable yet.

Knob Default Evaluated at Role
pinot.broker.enable.runtime.filter.join false broker config Cluster-wide enable. Overridable per query by runtimeFilterJoin and per join by the runtime_filter hint.
runtimeFilterJoin (query option) unset per query on/off enable switch; when on, defaults to the auto tier.
runtime_filter (join hint) unset per join off / in / bloom / auto — selects the reducer mode for that join.
max IN size 10000 leaf (runtime) auto emits an exact IN at/below this many build-key rows, and a bloom above it.
max build rows 1048576 (2^20) planner + leaf The build-key stage is capped at maxBuildRows + 1 rows. If the cap is hit the key set may be truncated/incomplete, so the leaf abandons the filter (results stay correct). This also bounds the pipeline-breaker memory. The planner cap and the leaf abandon read the same constant, so they cannot diverge.
bloom FPP 0.01 leaf (runtime) Target false-positive probability for the bloom tier. False positives only admit a few extra probe rows, which the join then discards.
bloom max bytes 16 MB leaf (runtime) If the serialized bloom would exceed this, the filter is abandoned (no predicate emitted).

Notes:

  • max IN size, bloom FPP, and bloom max bytes are applied on the probe-leaf server once the build
    keys are materialized; max build rows is enforced both as the planner's fetch cap on the build-key
    stage and as the leaf's truncation guard.
  • The build keys are not de-duplicated (no DISTINCT), so max IN size compares against the
    build-key row count, not the distinct-value count — the leaf IN/bloom dedups implicitly.
  • Exceeding max build rows, an oversized bloom, an empty/all-null build, or an unsupported probe shape
    all simply drop the filter — never a wrong result.

Correctness & safety

  • No false negatives. Exact IN is exact; a bloom never reports present-as-absent and its false
    positives are discarded by the real join; the BETWEEN(min, max) bounds cover every build key.
  • Null keys are excluded (they cannot match an inner equi-join) both at the planner (IS NOT NULL)
    and defensively at the leaf.
  • NaN float/double build keys keep the bloom membership but skip the range predicate (a finite
    range would wrongly drop probe NaN rows).
  • Truncation-safe. The build-key stage is capped at maxBuildRows + 1; if the cap is hit the key set
    is incomplete, so the filter is abandoned (the planner cap and the leaf abandon use the same constant).
  • Mixed-version. The only wire change is the new RuntimeFilterNode proto variant; the default-off
    flag is the guard. Enabling the flag (or using the hint) mid-rolling-upgrade can fail queries on
    not-yet-upgraded servers — documented on the config constant.
  • The reducer is built and applied entirely at the planner and the leaf-stage query, so it requires no
    changes to join execution itself.

Testing

  • PinotJoinToInnerRuntimeFilterRuleTest — rule firing/plan shape, probe-key/build-key value alignment,
    multi-key, hint/flag/query-option gating, pipeline-breaker distribution, negative cases.
  • ServerPlanRequestUtilsTest — exact-IN, bloom + range-prune, AUTO tiering, multi-key, empty/all-null
    build, null-key skip, NaN range omission, maxBytes/maxBuildRows abandon, existing-filter merge.
  • PlanNodeDeserializerTest — mixed-version graceful failure on the new proto variant.
  • RuntimeFilterJoinIntegrationTest — end-to-end cluster self-joins asserting results are identical
    with the filter on (in/bloom/auto) and off, across INT/LONG/DOUBLE/STRING/mixed-type/null/multi-key/
    empty-build cases.
  • Full pinot-query-planner (1321) and pinot-query-runtime (4431) suites pass — no regressions.

Limitations / future work

  • No automatic, statistics-driven enablement yet (opt-in via hint/flag); a future change can gate it on
    cardinality/selectivity estimates.
  • The build side is materialized a second time for the key broadcast; a shared spool would avoid this.
  • Bloom is single-key (composite-key tuple-encoding deferred); partitioned (both-sides-hash) joins use
    the broadcast path. Only the logical (HEP) planner is wired; wiring it into the v2 MSE physical
    optimizer is a follow-up.

@yashmayya yashmayya added multi-stage Related to the multi-stage query engine feature New functionality labels Jun 24, 2026
@codecov-commenter

codecov-commenter commented Jun 24, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 60.31746% with 150 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.77%. Comparing base (5696fd1) to head (aa360ee).
⚠️ Report is 16 commits behind head on master.

Files with missing lines Patch % Lines
...ry/runtime/plan/server/ServerPlanRequestUtils.java 67.21% 37 Missing and 3 partials ⚠️
.../runtime/plan/server/ServerPlanRequestVisitor.java 0.00% 18 Missing ⚠️
...he/pinot/query/planner/explain/PlanNodeMerger.java 0.00% 13 Missing ⚠️
...e/pinot/query/runtime/InStageStatsTreeBuilder.java 0.00% 12 Missing ⚠️
...e/rel/rules/PinotJoinToInnerRuntimeFilterRule.java 84.84% 5 Missing and 5 partials ⚠️
...inot/query/planner/plannode/RuntimeFilterNode.java 54.54% 6 Missing and 4 partials ⚠️
.../query/planner/logical/EquivalentStagesFinder.java 0.00% 7 Missing ⚠️
.../query/planner/logical/PlanNodeToRelConverter.java 0.00% 6 Missing ⚠️
...ry/planner/explain/PhysicalExplainPlanVisitor.java 0.00% 4 Missing ⚠️
...inot/query/planner/serde/PlanNodeDeserializer.java 60.00% 3 Missing and 1 partial ⚠️
... and 13 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18848      +/-   ##
============================================
- Coverage     64.77%   64.77%   -0.01%     
- Complexity     1319     1322       +3     
============================================
  Files          3392     3396       +4     
  Lines        210971   211454     +483     
  Branches      33120    33230     +110     
============================================
+ Hits         136662   136964     +302     
- Misses        63289    63437     +148     
- Partials      11020    11053      +33     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.77% <60.31%> (-0.01%) ⬇️
temurin 64.77% <60.31%> (-0.01%) ⬇️
unittests 64.76% <60.31%> (-0.01%) ⬇️
unittests1 56.99% <60.63%> (+<0.01%) ⬆️
unittests2 37.13% <6.34%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@yashmayya yashmayya force-pushed the inner-join-runtime-filter branch 2 times, most recently from aa360ee to 248d799 Compare June 24, 2026 20:53
When the build side of an equi-inner-join is small/selective, build a reducer (exact IN below a
threshold, else a bloom filter plus a min/max range predicate) from its distinct join keys and push
it down to the probe-side leaf scan via a pipeline-breaker edge, so the probe (fact) table drops
non-matching rows before they are shuffled into the join. The real hash join still runs and remains
the source of truth, so the filter is a no-false-negative optimization that can be omitted at any time.

Generalizes the existing SEMI dynamic-broadcast machinery to inner joins (the join is kept and the
filter is additive). Disabled by default; enabled via the runtime_filter join hint or the
pinot.broker.enable.runtime.filter.join cluster flag / runtimeFilterJoin query option.
@yashmayya yashmayya force-pushed the inner-join-runtime-filter branch from 248d799 to 89fb53c Compare June 24, 2026 23:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New functionality multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants