From 819c4900d8da4fe58bff1aea154d7ddfd34ffb98 Mon Sep 17 00:00:00 2001 From: kalluripradeep Date: Sat, 23 May 2026 11:05:03 +0100 Subject: [PATCH 1/2] Fix missing task instance history for failed tasks outside of RUNNING state Closes #67238 Closes #65366 When a task fails from states like QUEUED or DEFERRED, or is failed by the executor or trigger, its state is not RUNNING. The history recording in fetch_handle_failure_context incorrectly assumed that only RUNNING tasks needed their history recorded, which led to missing tries in task_instance_history when CeleryExecutor or KubernetesExecutor failed tasks or triggers failed them. Now, we record history for all states except RESTARTING (since RESTARTING tasks were already cleared and had their history recorded). --- airflow-core/src/airflow/models/taskinstance.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 9cc8d6a627140..c39f1b5c64bbd 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1806,9 +1806,9 @@ def fetch_handle_failure_context( if task and fail_fast: _stop_remaining_tasks(task_instance=ti, session=session) else: - if ti.state == TaskInstanceState.RUNNING: - # If the task instance is in the running state, it means it raised an exception and - # about to retry so we record the task instance history. For other states, the task + if ti.state != TaskInstanceState.RESTARTING: + # If the task instance is NOT in the restarting state, it means it raised an exception and + # about to retry so we record the task instance history. For the restarting state, the task # instance was cleared and already recorded in the task instance history. ti.prepare_db_for_next_try(session) From 67fa256a4e4bdafa74e1350ac262ba2a721b2529 Mon Sep 17 00:00:00 2001 From: kalluripradeep Date: Sat, 23 May 2026 11:48:49 +0100 Subject: [PATCH 2/2] Fix detached session object in test_taskinstance.py --- airflow-core/tests/unit/models/test_taskinstance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 3b4dceb628d03..9181cf18f16b9 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -2366,7 +2366,7 @@ def test_handle_failure_updates_queued_task_updates_state(self, dag_maker): dr = dag_maker.create_dagrun() ti = dr.get_task_instance(task.task_id) ti.state = State.QUEUED - session.merge(ti) + ti = session.merge(ti) session.flush() assert ti.state == State.QUEUED ti.handle_failure("test queued ti", test_mode=True)