Add probe-side runtime filters for inner joins in the multi-stage engine#18848
Open
yashmayya wants to merge 1 commit into
Open
Add probe-side runtime filters for inner joins in the multi-stage engine#18848yashmayya wants to merge 1 commit into
yashmayya wants to merge 1 commit into
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18848 +/- ##
============================================
- Coverage 64.77% 64.77% -0.01%
- Complexity 1319 1322 +3
============================================
Files 3392 3396 +4
Lines 210971 211454 +483
Branches 33120 33230 +110
============================================
+ Hits 136662 136964 +302
- Misses 63289 63437 +148
- Partials 11020 11053 +33
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
aa360ee to
248d799
Compare
When the build side of an equi-inner-join is small/selective, build a reducer (exact IN below a threshold, else a bloom filter plus a min/max range predicate) from its distinct join keys and push it down to the probe-side leaf scan via a pipeline-breaker edge, so the probe (fact) table drops non-matching rows before they are shuffled into the join. The real hash join still runs and remains the source of truth, so the filter is a no-false-negative optimization that can be omitted at any time. Generalizes the existing SEMI dynamic-broadcast machinery to inner joins (the join is kept and the filter is additive). Disabled by default; enabled via the runtime_filter join hint or the pinot.broker.enable.runtime.filter.join cluster flag / runtimeFilterJoin query option.
248d799 to
89fb53c
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Adds probe-side runtime filters for equi-
INNERjoins in the multi-stage query engine (MSE). Whenthe build (right) side of a hash join is small or selective, the planner builds a reducer from its
join keys and pushes it down to the probe (left) leaf scan, so the probe table drops rows
that cannot possibly match before they are shuffled across the network into the join.
This is the
INNER-join counterpart of the existingSEMI-join dynamic broadcast(
PinotJoinToDynamicBroadcastRule). It is disabled by default.Why
For the classic fact ⋈ dim shape — a large fact table joined to a small dimension table (or a heavily
filtered build side) — the MSE today hash-shuffles the entire probe (fact) side into the join stage,
even though only the rows whose join key appears on the (tiny) build side can contribute to the result.
That wastes scan, serialization, and network bandwidth proportional to the whole fact table rather than
to the matching subset.
The
SEMI-join path already solves the analogous problem by replacing the join with a leafINfilter,but that rewrite is only legal for semi-joins (which emit left columns only). An inner join projects
columns from both sides, so the join must still run. This PR therefore makes the filter additive:
the real hash join is left completely intact, and we only add a reducer on the probe leaf.
How it works
After exchange insertion (
POST_LOGICAL),PinotJoinToInnerRuntimeFilterRulerewrites an eligibleinner join:
before; the filter is purely additive.
RuntimeFilterRel/RuntimeFilterNodeis grafted on top of the probe leaf subtree(
input[0]= probe pipeline, pass-through;input[1]= aPIPELINE_BREAKERmailbox carrying thebuild-side join keys). The pipeline breaker runs the build side first and ships its keys to
the probe-leaf worker, reusing the same mechanism as the SEMI dynamic broadcast.
ServerPlanRequestUtils.attachRuntimeFilterANDs a tiered, no-false-negativereducer onto the V1 leaf query:
INfor small key sets (at/below a build-key-row threshold), or for multi-key /BIG_DECIMALkeys. This is index-accelerated and drives segment pruning.IN_ID_SET) above the threshold, plus aBETWEEN(min, max)range predicate fornumeric keys to enable cheap range-based segment pruning. Bloom keeps the wire/heap footprint bounded
for high-cardinality build sides.
(empty/over-cap build, oversized bloom, unsupported subtree, mixed-version cluster) with no effect on
results. Bloom false positives are simply re-checked and discarded by the join.
This mirrors runtime/dynamic filtering in Trino, Impala, and Spark's
InjectRuntimeFilter.When it helps
Project/Filter), so thefilter can be pushed all the way down to segment scan.
It is not beneficial (and is best left off) when the build side is large or non-selective, or the
probe is cheap — there is no automatic selectivity-based gate yet, so enablement is opt-in (see below).
How to use
Per-join hint (selects the reducer mode):
runtime_filteracceptsoff|in|bloom|auto(exactINbelow the threshold, else bloom).Cluster-wide default (enable/disable only; defaults to
autowhen on):Per-query override:
SET runtimeFilterJoin='on'(oroff).Thresholds & defaults
There is one user-facing switch (the enable flag / query option / hint). Every sizing threshold below is
a fixed constant in this first version — each affects only the filter's selectivity and size, never
correctness (the real hash join re-checks every surviving row), so they are intentionally not
cluster-configurable yet.
pinot.broker.enable.runtime.filter.joinfalseruntimeFilterJoinand per join by theruntime_filterhint.runtimeFilterJoin(query option)on/offenable switch; whenon, defaults to theautotier.runtime_filter(join hint)off/in/bloom/auto— selects the reducer mode for that join.10000autoemits an exactINat/below this many build-key rows, and a bloom above it.1048576(2^20)maxBuildRows + 1rows. If the cap is hit the key set may be truncated/incomplete, so the leaf abandons the filter (results stay correct). This also bounds the pipeline-breaker memory. The planner cap and the leaf abandon read the same constant, so they cannot diverge.0.0116 MBNotes:
max IN size,bloom FPP, andbloom max bytesare applied on the probe-leaf server once the buildkeys are materialized;
max build rowsis enforced both as the planner's fetch cap on the build-keystage and as the leaf's truncation guard.
DISTINCT), somax IN sizecompares against thebuild-key row count, not the distinct-value count — the leaf
IN/bloom dedups implicitly.max build rows, an oversized bloom, an empty/all-null build, or an unsupported probe shapeall simply drop the filter — never a wrong result.
Correctness & safety
INis exact; a bloom never reports present-as-absent and its falsepositives are discarded by the real join; the
BETWEEN(min, max)bounds cover every build key.IS NOT NULL)and defensively at the leaf.
NaNfloat/double build keys keep the bloom membership but skip the range predicate (a finiterange would wrongly drop probe
NaNrows).maxBuildRows + 1; if the cap is hit the key setis incomplete, so the filter is abandoned (the planner cap and the leaf abandon use the same constant).
RuntimeFilterNodeproto variant; the default-offflag is the guard. Enabling the flag (or using the hint) mid-rolling-upgrade can fail queries on
not-yet-upgraded servers — documented on the config constant.
changes to join execution itself.
Testing
PinotJoinToInnerRuntimeFilterRuleTest— rule firing/plan shape, probe-key/build-key value alignment,multi-key, hint/flag/query-option gating, pipeline-breaker distribution, negative cases.
ServerPlanRequestUtilsTest— exact-IN, bloom + range-prune, AUTO tiering, multi-key, empty/all-nullbuild, null-key skip,
NaNrange omission,maxBytes/maxBuildRowsabandon, existing-filter merge.PlanNodeDeserializerTest— mixed-version graceful failure on the new proto variant.RuntimeFilterJoinIntegrationTest— end-to-end cluster self-joins asserting results are identicalwith the filter on (in/bloom/auto) and off, across INT/LONG/DOUBLE/STRING/mixed-type/null/multi-key/
empty-build cases.
pinot-query-planner(1321) andpinot-query-runtime(4431) suites pass — no regressions.Limitations / future work
cardinality/selectivity estimates.
the broadcast path. Only the logical (HEP) planner is wired; wiring it into the v2 MSE physical
optimizer is a follow-up.