diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 9cc8d6a627140..c39f1b5c64bbd 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1806,9 +1806,9 @@ def fetch_handle_failure_context( if task and fail_fast: _stop_remaining_tasks(task_instance=ti, session=session) else: - if ti.state == TaskInstanceState.RUNNING: - # If the task instance is in the running state, it means it raised an exception and - # about to retry so we record the task instance history. For other states, the task + if ti.state != TaskInstanceState.RESTARTING: + # If the task instance is NOT in the restarting state, it means it raised an exception and + # about to retry so we record the task instance history. For the restarting state, the task # instance was cleared and already recorded in the task instance history. ti.prepare_db_for_next_try(session) diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 3b4dceb628d03..9181cf18f16b9 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -2366,7 +2366,7 @@ def test_handle_failure_updates_queued_task_updates_state(self, dag_maker): dr = dag_maker.create_dagrun() ti = dr.get_task_instance(task.task_id) ti.state = State.QUEUED - session.merge(ti) + ti = session.merge(ti) session.flush() assert ti.state == State.QUEUED ti.handle_failure("test queued ti", test_mode=True)