Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 22 additions & 46 deletions airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import Annotated

from fastapi import Depends, HTTPException, status
from sqlalchemy import and_, func, select
from sqlalchemy import select, union_all
from sqlalchemy.orm import defaultload

from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
Expand Down Expand Up @@ -171,53 +171,29 @@ def get_dags(
)
favorite_dag_ids = set(session.scalars(favorites_select))

# Populate the last 'dag_runs_limit' DagRuns for each Dag
recent_runs_subquery = (
select(
DagRun.dag_id,
DagRun.run_after,
func.rank()
.over(
partition_by=DagRun.dag_id,
order_by=DagRun.run_after.desc(),
recent_dag_runs: list = []
if dags:
recent_runs_branches = [
select(
DagRun.id,
DagRun.dag_id,
DagRun.run_id,
DagRun.end_date,
DagRun.logical_date,
DagRun.run_after,
DagRun.start_date,
DagRun.state,
)
.label("rank"),
.where(DagRun.dag_id == dag.dag_id)
.order_by(DagRun.run_after.desc())
.limit(dag_runs_limit)
.subquery()
for dag in dags
]
recent_runs_union = union_all(*(select(branch) for branch in recent_runs_branches)).subquery()
recent_dag_runs = list(
session.execute(select(recent_runs_union).order_by(recent_runs_union.c.run_after.desc()))
)
.where(DagRun.dag_id.in_([dag.dag_id for dag in dags]))
.order_by(DagRun.run_after.desc())
.subquery()
)

recent_dag_runs_select = (
select(
recent_runs_subquery.c.run_after,
DagRun.id,
DagRun.dag_id,
DagRun.run_id,
DagRun.end_date,
DagRun.logical_date,
DagRun.run_after,
DagRun.start_date,
DagRun.state,
DagRun.duration.expression, # type: ignore[attr-defined]
)
.join(
DagRun,
and_(
DagRun.dag_id == recent_runs_subquery.c.dag_id,
DagRun.run_after == recent_runs_subquery.c.run_after,
),
)
.where(recent_runs_subquery.c.rank <= dag_runs_limit)
.group_by(
recent_runs_subquery.c.run_after,
DagRun.run_after,
DagRun.id,
)
.order_by(recent_runs_subquery.c.run_after.desc())
)

recent_dag_runs = session.execute(recent_dag_runs_select)

# Fetch pending HITL actions for each Dag if we are not certain whether some of the Dag might contain HITL actions
pending_actions_by_dag_id: dict[str, list[HITLDetail]] = {dag.dag_id: [] for dag in dags}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ def historical_metrics(
current_time = timezone.utcnow()
permitted_dag_ids = cast("set[str]", readable_dags_filter.value)

end_bound = end_date if end_date is not None else current_time
dag_run_filters = [
func.coalesce(DagRun.start_date, current_time) >= start_date,
func.coalesce(DagRun.end_date, current_time) <= func.coalesce(end_date, current_time),
DagRun.run_after >= start_date,
DagRun.run_after <= end_bound,
DagRun.dag_id.in_(permitted_dag_ids),
]

Expand Down Expand Up @@ -127,13 +128,6 @@ def dag_stats(
) -> DashboardDagStatsResponse:
"""Return basic Dag stats with counts of Dags in various states."""
permitted_dag_ids = cast("set[str]", readable_dags_filter.value)
latest_dates_subq = (
select(DagRun.dag_id, func.max(DagRun.logical_date).label("max_logical_date"))
.where(DagRun.logical_date.is_not(None))
.where(DagRun.dag_id.in_(permitted_dag_ids))
.group_by(DagRun.dag_id)
.subquery()
)

# Active Dags need another query from DagModel, as a Dag may not have any runs but still be active
active_count_query = (
Expand All @@ -145,30 +139,28 @@ def dag_stats(
)
active_count = session.execute(active_count_query).scalar_one()

# Other metrics are based on latest DagRun states
latest_runs_cte = (
select(
DagModel.dag_id,
DagModel.is_paused,
DagRun.state,
)
.join(DagModel, DagRun.dag_id == DagModel.dag_id)
.join(
latest_dates_subq,
(DagRun.dag_id == latest_dates_subq.c.dag_id)
& (DagRun.logical_date == latest_dates_subq.c.max_logical_date),
)
latest_state = (
select(DagRun.state)
.where(DagRun.dag_id == DagModel.dag_id, DagRun.logical_date.is_not(None))
.order_by(DagRun.logical_date.desc())
.limit(1)
.correlate(DagModel)
.scalar_subquery()
)
latest_runs_subq = (
select(latest_state.label("state"))
.select_from(DagModel)
.where(DagModel.is_stale == false())
.where(DagRun.dag_id.in_(permitted_dag_ids))
.cte()
.where(DagModel.dag_id.in_(permitted_dag_ids))
.subquery()
)
combined_runs_query = select(
func.coalesce(func.sum(case((latest_runs_cte.c.state == DagRunState.FAILED, 1))), 0).label("failed"),
func.coalesce(func.sum(case((latest_runs_cte.c.state == DagRunState.RUNNING, 1))), 0).label(
func.coalesce(func.sum(case((latest_runs_subq.c.state == DagRunState.FAILED, 1))), 0).label("failed"),
func.coalesce(func.sum(case((latest_runs_subq.c.state == DagRunState.RUNNING, 1))), 0).label(
"running"
),
func.coalesce(func.sum(case((latest_runs_cte.c.state == DagRunState.QUEUED, 1))), 0).label("queued"),
).select_from(latest_runs_cte)
func.coalesce(func.sum(case((latest_runs_subq.c.state == DagRunState.QUEUED, 1))), 0).label("queued"),
).select_from(latest_runs_subq)

counts = session.execute(combined_runs_query).one()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def make_dag_runs(dag_maker, session, time_machine):
run_type=DagRunType.SCHEDULED,
logical_date=date,
start_date=date,
run_after=date,
)

run2 = dag_maker.create_dagrun(
Expand All @@ -79,6 +80,7 @@ def make_dag_runs(dag_maker, session, time_machine):
run_type=DagRunType.ASSET_TRIGGERED,
logical_date=date + timedelta(days=1),
start_date=date + timedelta(days=1),
run_after=date + timedelta(days=1),
)

run3 = dag_maker.create_dagrun(
Expand All @@ -87,6 +89,7 @@ def make_dag_runs(dag_maker, session, time_machine):
run_type=DagRunType.SCHEDULED,
logical_date=date + timedelta(days=2),
start_date=date + timedelta(days=2),
run_after=date + timedelta(days=2),
)

run3.end_date = None
Expand All @@ -96,6 +99,7 @@ def make_dag_runs(dag_maker, session, time_machine):
state=DagRunState.QUEUED,
run_type=DagRunType.SCHEDULED,
logical_date=date + timedelta(days=3),
run_after=date + timedelta(days=3),
)

for ti in run1.task_instances:
Expand Down Expand Up @@ -264,13 +268,13 @@ class TestHistoricalMetricsDataEndpoint:
},
),
(
{"start_date": "2023-02-02T00:00", "end_date": "2023-06-02T00:00"},
{"start_date": "2023-02-02T00:00", "end_date": "2023-02-03T12:00"},
{
"dag_run_states": {"failed": 1, "queued": 0, "running": 0, "success": 0},
"dag_run_states": {"failed": 1, "queued": 0, "running": 1, "success": 0},
"task_instance_states": {
"deferred": 0,
"failed": 2,
"no_status": 0,
"no_status": 2,
"queued": 0,
"removed": 0,
"restarting": 0,
Expand Down
Loading