Avoid duplicate trigger-rule upstream-count queries per scheduling pass#67672
Open
kaxil wants to merge 2 commits into
Open
Avoid duplicate trigger-rule upstream-count queries per scheduling pass#67672kaxil wants to merge 2 commits into
kaxil wants to merge 2 commits into
Conversation
TriggerRuleDep runs a `SELECT task_id, count(*) ... GROUP BY task_id` per downstream task to size its upstream set, but only when an upstream is mapped. When many downstreams share the same mapped upstream, each issues an identical query within the same scheduling pass. Memoize the result on DepContext (one scheduling pass, same lifetime as finished_tis), keyed by (dag_id, run_id, frozenset of direct-upstream task_ids). Only the simple case is cached, where the predicate is exactly `task_id IN (upstream_ids)`; downstreams inside a mapped task group keep their own per-instance map-index query. The cache is cleared in _get_ready_tis when a mapped task expands and changes its instance count.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
TriggerRuleDepsizes a task's upstream set with aSELECT task_id, count(*) ... GROUP BY task_id, and it only runs that when one of the upstreams is mapped. The problem: if several downstream tasks all depend on the same mapped upstream, each of them runs the identical query during the same scheduling pass. This caches the result on theDepContextfor the duration of a pass, so it runs once per distinct upstream set instead of once per downstream.Impact
The win scales with how many downstream tasks sit behind a mapped upstream. Measured on a DAG with one mapped upstream (5 instances) feeding 60 plain downstream tasks, counting just this query during one scheduling pass:
That count recurs every
update_statepass, so across a run it's roughlydownstreams x passesqueries collapsing topasses. Each query is individually cheap (sub-millisecond), so this is about cutting database round-trips, not query time. It happens in the per-run scheduling work, not the serialized critical section, so it's a latency win for mapping-heavy DAGs rather than a throughput-ceiling change.How it works
(dag_id, run_id, frozenset(upstream_task_ids)), stored onDepContext, which is already built once per_get_ready_tispass and reused for every TI in that pass (the same object that cachesfinished_tis).task_id IN (upstream_ids)and comes out the same for every downstream with the same upstreams. Tasks inside a mapped task group have per map-index predicates, so they keep running their own query.upstream_setupis still summed in the caller from the cached rows, so the setup count stays right per downstream.Staleness
It follows
finished_tis(a per-pass snapshot), and I clear it in_get_ready_tison expansion, since that's when a mapped task's instance count actually changes mid-pass. The revise-map-index path can change counts too, but it only adds not-yet-finished instances, and an unfinished upstream keeps the count-based rules from going ready, so a stale value there doesn't change the outcome.