From c548b6c80394dd6c6904ba2b282685c3efe130a6 Mon Sep 17 00:00:00 2001 From: dengyh Date: Wed, 24 Jun 2026 15:30:06 +0800 Subject: [PATCH] fix: schedule task retries on transient db connection error When the database endpoint is briefly unreachable (e.g. MySQL connect timeout, errno 110), `Schedule.objects.get` raised an OperationalError that was swallowed by a broad `except` and immediately marked the schedule as FAIL, killing in-flight plugin polls on a momentary DB blip. Transient DB connection errors (OperationalError / InterfaceError) now trigger a Celery retry with exponential backoff, and only fall back to FAIL once retries are exhausted. Non-connection errors keep the original fail-fast behavior. Add unit tests for both the retry and give-up paths. Co-authored-by: Cursor --- .../runtime/schedule/celery/tasks.py | 23 ++++++++++- .../runtime/schedule/celery/test_tasks.py | 40 +++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/bk-plugin-framework/bk_plugin_framework/runtime/schedule/celery/tasks.py b/bk-plugin-framework/bk_plugin_framework/runtime/schedule/celery/tasks.py index f579023..2992b3b 100644 --- a/bk-plugin-framework/bk_plugin_framework/runtime/schedule/celery/tasks.py +++ b/bk-plugin-framework/bk_plugin_framework/runtime/schedule/celery/tasks.py @@ -12,6 +12,7 @@ import logging from celery import shared_task +from django.db import InterfaceError, OperationalError from bk_plugin_framework.envs import settings from bk_plugin_framework.hub import VersionHub @@ -22,6 +23,9 @@ logger = logging.getLogger("bk_plugin") +# 瞬时数据库连接类错误:DB 抖动可通过重试自愈,不应直接把调度判定为失败 +TRANSIENT_DB_EXC = (OperationalError, InterfaceError) + def _set_schedule_state(trace_id: str, state: State): try: @@ -30,12 +34,27 @@ def _set_schedule_state(trace_id: str, state: State): logger.exception("[execute] set schedule state error") -@shared_task(ignore_result=True) -def schedule(trace_id: str): +@shared_task(bind=True, ignore_result=True, max_retries=6, default_retry_delay=5) +def schedule(self, trace_id: str): local.set_trace_id(trace_id) try: schedule = Schedule.objects.get(trace_id=trace_id) + except TRANSIENT_DB_EXC as exc: + # DB 瞬时不可达:重试本次轮询而非判失败,避免误杀运行中的插件;重试用尽才兜底置失败 + if self.request.retries >= self.max_retries: + logger.error( + "[schedule_task] db unreachable, give up fetching schedule obj %s after %s retries" + % (trace_id, self.request.retries) + ) + _set_schedule_state(trace_id=trace_id, state=State.FAIL) + return + countdown = min(2 ** self.request.retries * 5, 60) + logger.warning( + "[schedule_task] transient db error when fetching schedule obj %s (retry=%s), retry in %ss: %s" + % (trace_id, self.request.retries, countdown, exc) + ) + raise self.retry(exc=exc, countdown=countdown) except Exception: logger.exception("[schedule_task] fetch schedule obj %s failed" % trace_id) _set_schedule_state(trace_id=trace_id, state=State.FAIL) diff --git a/bk-plugin-framework/tests/runtime/schedule/celery/test_tasks.py b/bk-plugin-framework/tests/runtime/schedule/celery/test_tasks.py index b4cf471..d4d1e6c 100644 --- a/bk-plugin-framework/tests/runtime/schedule/celery/test_tasks.py +++ b/bk-plugin-framework/tests/runtime/schedule/celery/test_tasks.py @@ -13,6 +13,7 @@ from unittest.mock import MagicMock, patch import pytest +from django.db import OperationalError from bk_plugin_framework.kit import State from bk_plugin_framework.runtime.schedule.celery import tasks @@ -108,3 +109,42 @@ def test_schedule__execute_success(self, trace_id, schedule_id): plugin_cls=VersionHub.all_plugins().get(schedule_obj.plugin_version), schedule=schedule_obj ) Schedule.objects.filter.assert_not_called() + + def test_schedule__transient_db_error_will_retry(self, trace_id, schedule_id): + Schedule = MagicMock() + db_err = OperationalError("(2003, \"Can't connect to MySQL server (110)\")") + Schedule.objects.get = MagicMock(side_effect=db_err) + + class _Retry(Exception): + pass + + with patch("bk_plugin_framework.runtime.schedule.celery.tasks.Schedule", Schedule): + with patch.object(tasks.schedule, "retry", side_effect=_Retry) as mock_retry: + with pytest.raises(_Retry): + tasks.schedule(trace_id) + + assert local.get_trace_id() == trace_id + + Schedule.objects.get.assert_called_once_with(trace_id=trace_id) + # 瞬时数据库连接错误应触发重试,而不是把调度直接置为失败 + mock_retry.assert_called_once() + assert mock_retry.call_args[1]["exc"] is db_err + assert mock_retry.call_args[1]["countdown"] == 5 + Schedule.objects.filter.assert_not_called() + + def test_schedule__transient_db_error_set_fail_when_retries_exhausted(self, trace_id, schedule_id): + Schedule = MagicMock() + Schedule.objects.get = MagicMock(side_effect=OperationalError()) + + with patch("bk_plugin_framework.runtime.schedule.celery.tasks.Schedule", Schedule): + with patch.object(tasks.schedule, "retry") as mock_retry: + with patch.object(tasks.schedule, "max_retries", 0): + tasks.schedule(trace_id) + + assert local.get_trace_id() == trace_id + + Schedule.objects.get.assert_called_once_with(trace_id=trace_id) + # 重试已耗尽:不再重试,兜底置为失败 + mock_retry.assert_not_called() + Schedule.objects.filter.assert_called_once_with(trace_id=trace_id) + Schedule.objects.filter(trace_id=trace_id).update.assert_called_once_with(state=State.FAIL.value)