From 7cbf0ef7cf25885838156e4fc9da23baec5e844a Mon Sep 17 00:00:00 2001 From: Yuseok Jo Date: Sat, 30 May 2026 00:09:36 +0900 Subject: [PATCH 1/2] Speed up Dags list and dashboard queries on large DagRun tables --- .../api_fastapi/core_api/routes/ui/dags.py | 68 ++++++------------- .../core_api/routes/ui/dashboard.py | 48 ++++++------- .../core_api/routes/ui/test_dashboard.py | 10 ++- 3 files changed, 49 insertions(+), 77 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py index dfd3a71d815f5..7ca74d375dbb0 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py @@ -20,7 +20,7 @@ from typing import Annotated from fastapi import Depends, HTTPException, status -from sqlalchemy import and_, func, select +from sqlalchemy import select, union_all from sqlalchemy.orm import defaultload from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity @@ -171,53 +171,29 @@ def get_dags( ) favorite_dag_ids = set(session.scalars(favorites_select)) - # Populate the last 'dag_runs_limit' DagRuns for each Dag - recent_runs_subquery = ( - select( - DagRun.dag_id, - DagRun.run_after, - func.rank() - .over( - partition_by=DagRun.dag_id, - order_by=DagRun.run_after.desc(), + recent_dag_runs: list = [] + if dags: + recent_runs_branches = [ + select( + DagRun.id, + DagRun.dag_id, + DagRun.run_id, + DagRun.end_date, + DagRun.logical_date, + DagRun.run_after, + DagRun.start_date, + DagRun.state, ) - .label("rank"), + .where(DagRun.dag_id == dag.dag_id) + .order_by(DagRun.run_after.desc()) + .limit(dag_runs_limit) + .subquery() + for dag in dags + ] + recent_runs_union = union_all(*(select(branch) for branch in recent_runs_branches)).subquery() + recent_dag_runs = session.execute( + select(recent_runs_union).order_by(recent_runs_union.c.run_after.desc()) ) - .where(DagRun.dag_id.in_([dag.dag_id for dag in dags])) - .order_by(DagRun.run_after.desc()) - .subquery() - ) - - recent_dag_runs_select = ( - select( - recent_runs_subquery.c.run_after, - DagRun.id, - DagRun.dag_id, - DagRun.run_id, - DagRun.end_date, - DagRun.logical_date, - DagRun.run_after, - DagRun.start_date, - DagRun.state, - DagRun.duration.expression, # type: ignore[attr-defined] - ) - .join( - DagRun, - and_( - DagRun.dag_id == recent_runs_subquery.c.dag_id, - DagRun.run_after == recent_runs_subquery.c.run_after, - ), - ) - .where(recent_runs_subquery.c.rank <= dag_runs_limit) - .group_by( - recent_runs_subquery.c.run_after, - DagRun.run_after, - DagRun.id, - ) - .order_by(recent_runs_subquery.c.run_after.desc()) - ) - - recent_dag_runs = session.execute(recent_dag_runs_select) # Fetch pending HITL actions for each Dag if we are not certain whether some of the Dag might contain HITL actions pending_actions_by_dag_id: dict[str, list[HITLDetail]] = {dag.dag_id: [] for dag in dags} diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py index 923bd3b55d17b..612d5c7c35c0e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py @@ -63,9 +63,10 @@ def historical_metrics( current_time = timezone.utcnow() permitted_dag_ids = cast("set[str]", readable_dags_filter.value) + end_bound = end_date if end_date is not None else current_time dag_run_filters = [ - func.coalesce(DagRun.start_date, current_time) >= start_date, - func.coalesce(DagRun.end_date, current_time) <= func.coalesce(end_date, current_time), + DagRun.run_after >= start_date, + DagRun.run_after <= end_bound, DagRun.dag_id.in_(permitted_dag_ids), ] @@ -127,13 +128,6 @@ def dag_stats( ) -> DashboardDagStatsResponse: """Return basic Dag stats with counts of Dags in various states.""" permitted_dag_ids = cast("set[str]", readable_dags_filter.value) - latest_dates_subq = ( - select(DagRun.dag_id, func.max(DagRun.logical_date).label("max_logical_date")) - .where(DagRun.logical_date.is_not(None)) - .where(DagRun.dag_id.in_(permitted_dag_ids)) - .group_by(DagRun.dag_id) - .subquery() - ) # Active Dags need another query from DagModel, as a Dag may not have any runs but still be active active_count_query = ( @@ -145,30 +139,28 @@ def dag_stats( ) active_count = session.execute(active_count_query).scalar_one() - # Other metrics are based on latest DagRun states - latest_runs_cte = ( - select( - DagModel.dag_id, - DagModel.is_paused, - DagRun.state, - ) - .join(DagModel, DagRun.dag_id == DagModel.dag_id) - .join( - latest_dates_subq, - (DagRun.dag_id == latest_dates_subq.c.dag_id) - & (DagRun.logical_date == latest_dates_subq.c.max_logical_date), - ) + latest_state = ( + select(DagRun.state) + .where(DagRun.dag_id == DagModel.dag_id, DagRun.logical_date.is_not(None)) + .order_by(DagRun.logical_date.desc()) + .limit(1) + .correlate(DagModel) + .scalar_subquery() + ) + latest_runs_subq = ( + select(latest_state.label("state")) + .select_from(DagModel) .where(DagModel.is_stale == false()) - .where(DagRun.dag_id.in_(permitted_dag_ids)) - .cte() + .where(DagModel.dag_id.in_(permitted_dag_ids)) + .subquery() ) combined_runs_query = select( - func.coalesce(func.sum(case((latest_runs_cte.c.state == DagRunState.FAILED, 1))), 0).label("failed"), - func.coalesce(func.sum(case((latest_runs_cte.c.state == DagRunState.RUNNING, 1))), 0).label( + func.coalesce(func.sum(case((latest_runs_subq.c.state == DagRunState.FAILED, 1))), 0).label("failed"), + func.coalesce(func.sum(case((latest_runs_subq.c.state == DagRunState.RUNNING, 1))), 0).label( "running" ), - func.coalesce(func.sum(case((latest_runs_cte.c.state == DagRunState.QUEUED, 1))), 0).label("queued"), - ).select_from(latest_runs_cte) + func.coalesce(func.sum(case((latest_runs_subq.c.state == DagRunState.QUEUED, 1))), 0).label("queued"), + ).select_from(latest_runs_subq) counts = session.execute(combined_runs_query).one() diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py index ccfc2b26f7844..12d424b4f2ea6 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py @@ -71,6 +71,7 @@ def make_dag_runs(dag_maker, session, time_machine): run_type=DagRunType.SCHEDULED, logical_date=date, start_date=date, + run_after=date, ) run2 = dag_maker.create_dagrun( @@ -79,6 +80,7 @@ def make_dag_runs(dag_maker, session, time_machine): run_type=DagRunType.ASSET_TRIGGERED, logical_date=date + timedelta(days=1), start_date=date + timedelta(days=1), + run_after=date + timedelta(days=1), ) run3 = dag_maker.create_dagrun( @@ -87,6 +89,7 @@ def make_dag_runs(dag_maker, session, time_machine): run_type=DagRunType.SCHEDULED, logical_date=date + timedelta(days=2), start_date=date + timedelta(days=2), + run_after=date + timedelta(days=2), ) run3.end_date = None @@ -96,6 +99,7 @@ def make_dag_runs(dag_maker, session, time_machine): state=DagRunState.QUEUED, run_type=DagRunType.SCHEDULED, logical_date=date + timedelta(days=3), + run_after=date + timedelta(days=3), ) for ti in run1.task_instances: @@ -264,13 +268,13 @@ class TestHistoricalMetricsDataEndpoint: }, ), ( - {"start_date": "2023-02-02T00:00", "end_date": "2023-06-02T00:00"}, + {"start_date": "2023-02-02T00:00", "end_date": "2023-02-03T12:00"}, { - "dag_run_states": {"failed": 1, "queued": 0, "running": 0, "success": 0}, + "dag_run_states": {"failed": 1, "queued": 0, "running": 1, "success": 0}, "task_instance_states": { "deferred": 0, "failed": 2, - "no_status": 0, + "no_status": 2, "queued": 0, "removed": 0, "restarting": 0, From 526683788883520e75badf16067a95531891e16a Mon Sep 17 00:00:00 2001 From: Yuseok Jo Date: Sat, 30 May 2026 06:58:05 +0900 Subject: [PATCH 2/2] Fix mypy type error in get_dags recent runs query --- .../src/airflow/api_fastapi/core_api/routes/ui/dags.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py index 7ca74d375dbb0..88043b9c4e1a5 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py @@ -191,8 +191,8 @@ def get_dags( for dag in dags ] recent_runs_union = union_all(*(select(branch) for branch in recent_runs_branches)).subquery() - recent_dag_runs = session.execute( - select(recent_runs_union).order_by(recent_runs_union.c.run_after.desc()) + recent_dag_runs = list( + session.execute(select(recent_runs_union).order_by(recent_runs_union.c.run_after.desc())) ) # Fetch pending HITL actions for each Dag if we are not certain whether some of the Dag might contain HITL actions