Skip to content

Avoid duplicate trigger-rule upstream-count queries per scheduling pass#67672

Open
kaxil wants to merge 2 commits into
apache:mainfrom
astronomer:trigger-rule-upstream-count-memo
Open

Avoid duplicate trigger-rule upstream-count queries per scheduling pass#67672
kaxil wants to merge 2 commits into
apache:mainfrom
astronomer:trigger-rule-upstream-count-memo

Conversation

@kaxil
Copy link
Copy Markdown
Member

@kaxil kaxil commented May 29, 2026

Summary

TriggerRuleDep sizes a task's upstream set with a SELECT task_id, count(*) ... GROUP BY task_id, and it only runs that when one of the upstreams is mapped. The problem: if several downstream tasks all depend on the same mapped upstream, each of them runs the identical query during the same scheduling pass. This caches the result on the DepContext for the duration of a pass, so it runs once per distinct upstream set instead of once per downstream.

Impact

The win scales with how many downstream tasks sit behind a mapped upstream. Measured on a DAG with one mapped upstream (5 instances) feeding 60 plain downstream tasks, counting just this query during one scheduling pass:

upstream-count queries per pass
before 60
after 1

That count recurs every update_state pass, so across a run it's roughly downstreams x passes queries collapsing to passes. Each query is individually cheap (sub-millisecond), so this is about cutting database round-trips, not query time. It happens in the per-run scheduling work, not the serialized critical section, so it's a latency win for mapping-heavy DAGs rather than a throughput-ceiling change.

How it works

  • Key is (dag_id, run_id, frozenset(upstream_task_ids)), stored on DepContext, which is already built once per _get_ready_tis pass and reused for every TI in that pass (the same object that caches finished_tis).
  • It only kicks in when the task isn't inside a mapped task group. That's the branch where the predicate is plain task_id IN (upstream_ids) and comes out the same for every downstream with the same upstreams. Tasks inside a mapped task group have per map-index predicates, so they keep running their own query.
  • upstream_setup is still summed in the caller from the cached rows, so the setup count stays right per downstream.

Staleness

It follows finished_tis (a per-pass snapshot), and I clear it in _get_ready_tis on expansion, since that's when a mapped task's instance count actually changes mid-pass. The revise-map-index path can change counts too, but it only adds not-yet-finished instances, and an unfinished upstream keeps the count-based rules from going ready, so a stale value there doesn't change the outcome.

TriggerRuleDep runs a `SELECT task_id, count(*) ... GROUP BY task_id` per
downstream task to size its upstream set, but only when an upstream is mapped.
When many downstreams share the same mapped upstream, each issues an identical
query within the same scheduling pass.

Memoize the result on DepContext (one scheduling pass, same lifetime as
finished_tis), keyed by (dag_id, run_id, frozenset of direct-upstream task_ids).
Only the simple case is cached, where the predicate is exactly
`task_id IN (upstream_ids)`; downstreams inside a mapped task group keep their
own per-instance map-index query. The cache is cleared in _get_ready_tis when a
mapped task expands and changes its instance count.
@kaxil kaxil added the full tests needed We need to run full set of tests for this PR to merge label May 29, 2026
@kaxil kaxil added this to the Airflow 3.3.0 milestone May 29, 2026
@kaxil kaxil requested a review from uranusjr May 29, 2026 00:53
@kaxil kaxil marked this pull request as ready for review May 30, 2026 00:47
@kaxil kaxil requested review from XD-DENG and ashb as code owners May 30, 2026 00:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant