Skip to content

Commit 3bf4dbd

Browse files
Address review feedback: harden worker-health-check command
- Replace os.EX_UNAVAILABLE with SystemExit(message) for portability - Add timeout=5.0 to inspect() calls for defense in depth - Add try/except around inspect calls for clean broker error handling - Add tests for broker unreachable scenarios (ping and active_queues) - Move import socket to module-level Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ef99184 commit 3bf4dbd

2 files changed

Lines changed: 70 additions & 22 deletions

File tree

providers/celery/src/airflow/providers/celery/cli/celery_command.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from __future__ import annotations
2121

2222
import logging
23+
import socket
2324
import sys
2425
import time
2526
from contextlib import contextmanager, suppress
@@ -487,42 +488,53 @@ def worker_health_check(args):
487488
488489
See: https://github.com/apache/airflow/issues/63580
489490
"""
490-
import os
491-
import socket
492-
493491
# This needs to be imported locally to not trigger Providers Manager initialization
494492
from airflow.providers.celery.executors.celery_executor import app as celery_app
495493

496494
hostname = args.celery_hostname or f"celery@{socket.gethostname()}"
497495

498-
inspect = celery_app.control.inspect(destination=[hostname])
496+
inspect = celery_app.control.inspect(destination=[hostname], timeout=5.0)
497+
498+
try:
499+
# Step 1: Ping check — worker must be reachable
500+
log.info("Checking worker liveness via ping: %s", hostname)
501+
ping_result = inspect.ping()
502+
except Exception:
503+
log.exception("Worker health check failed: unable to reach broker while pinging worker %s", hostname)
504+
raise SystemExit("Worker health check failed: broker connection error during ping")
499505

500-
# Step 1: Ping check — worker must be reachable
501-
log.info("Checking worker liveness via ping: %s", hostname)
502-
ping_result = inspect.ping()
503506
if not ping_result or hostname not in ping_result:
504507
log.error(
505508
"Worker health check failed: worker %s did not respond to ping. The worker process may be down.",
506509
hostname,
507510
)
508-
raise SystemExit(os.EX_UNAVAILABLE)
511+
raise SystemExit(f"Worker health check failed: worker {hostname} did not respond to ping")
509512

510513
log.info("Ping check passed for worker: %s", hostname)
511514

512-
# Step 2: Active queues check — worker must have registered consumers.
513-
# A worker in catatonic state (e.g. after Redis broker restart) responds to
514-
# ping but has silently lost its queue consumer registrations, so
515-
# active_queues() returns None or an empty list for that worker.
516-
log.info("Checking active queue consumers for worker: %s", hostname)
517-
active_queues_result = inspect.active_queues()
515+
try:
516+
# Step 2: Active queues check — worker must have registered consumers.
517+
# A worker in catatonic state (e.g. after Redis broker restart) responds to
518+
# ping but has silently lost its queue consumer registrations, so
519+
# active_queues() returns None or an empty list for that worker.
520+
log.info("Checking active queue consumers for worker: %s", hostname)
521+
active_queues_result = inspect.active_queues()
522+
except Exception:
523+
log.exception(
524+
"Worker health check failed: unable to reach broker while checking queues for %s", hostname
525+
)
526+
raise SystemExit("Worker health check failed: broker connection error during active_queues check")
527+
518528
if not active_queues_result or hostname not in active_queues_result:
519529
log.error(
520530
"Worker health check failed: worker %s is alive but has no registered queue consumers. "
521531
"This may indicate a catatonic state caused by a broker restart (e.g. Redis). "
522532
"The worker will not pick up new tasks until restarted.",
523533
hostname,
524534
)
525-
raise SystemExit(os.EX_UNAVAILABLE)
535+
raise SystemExit(
536+
f"Worker health check failed: worker {hostname} has no active queue consumers (catatonic state)"
537+
)
526538

527539
worker_queues = active_queues_result[hostname]
528540
if not worker_queues:
@@ -532,7 +544,9 @@ def worker_health_check(args):
532544
"The worker will not pick up new tasks until restarted.",
533545
hostname,
534546
)
535-
raise SystemExit(os.EX_UNAVAILABLE)
547+
raise SystemExit(
548+
f"Worker health check failed: worker {hostname} has empty queue consumer list (catatonic state)"
549+
)
536550

537551
queue_names = [queue["name"] for queue in worker_queues if "name" in queue]
538552
log.info(

providers/celery/tests/unit/celery/cli/test_celery_command.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ def test_health_check_fails_when_ping_returns_none(self, mock_inspect):
688688
with pytest.raises(SystemExit) as exc_info:
689689
celery_command.worker_health_check(args)
690690

691-
assert exc_info.value.code != 0
691+
assert "did not respond to ping" in str(exc_info.value)
692692
# active_queues must NOT be called if ping already failed
693693
mock_instance.active_queues.assert_not_called()
694694

@@ -707,9 +707,43 @@ def test_health_check_fails_when_worker_not_in_ping_result(self, mock_inspect):
707707
with pytest.raises(SystemExit) as exc_info:
708708
celery_command.worker_health_check(args)
709709

710-
assert exc_info.value.code != 0
710+
assert "did not respond to ping" in str(exc_info.value)
711711
mock_instance.active_queues.assert_not_called()
712712

713+
@pytest.mark.db_test
714+
@mock.patch("airflow.providers.celery.executors.celery_executor.app.control.inspect")
715+
def test_health_check_fails_when_broker_unreachable(self, mock_inspect):
716+
"""Broker is down — inspect.ping() raises an exception. Health check must fail cleanly."""
717+
hostname = "celery@disconnected-host"
718+
args = self.parser.parse_args(["celery", "worker-health-check", "-H", hostname])
719+
720+
mock_instance = MagicMock()
721+
mock_instance.ping.side_effect = ConnectionError("Connection refused")
722+
mock_inspect.return_value = mock_instance
723+
724+
with pytest.raises(SystemExit) as exc_info:
725+
celery_command.worker_health_check(args)
726+
727+
assert "broker connection error" in str(exc_info.value)
728+
mock_instance.active_queues.assert_not_called()
729+
730+
@pytest.mark.db_test
731+
@mock.patch("airflow.providers.celery.executors.celery_executor.app.control.inspect")
732+
def test_health_check_fails_when_active_queues_raises(self, mock_inspect):
733+
"""Broker fails during active_queues check — health check must fail cleanly."""
734+
hostname = "celery@flaky-host"
735+
args = self.parser.parse_args(["celery", "worker-health-check", "-H", hostname])
736+
737+
mock_instance = MagicMock()
738+
mock_instance.ping.return_value = {hostname: {"ok": "pong"}}
739+
mock_instance.active_queues.side_effect = ConnectionError("Connection lost")
740+
mock_inspect.return_value = mock_instance
741+
742+
with pytest.raises(SystemExit) as exc_info:
743+
celery_command.worker_health_check(args)
744+
745+
assert "broker connection error" in str(exc_info.value)
746+
713747
@pytest.mark.db_test
714748
@mock.patch("airflow.providers.celery.executors.celery_executor.app.control.inspect")
715749
def test_health_check_fails_when_active_queues_returns_none(self, mock_inspect):
@@ -735,7 +769,7 @@ def test_health_check_fails_when_active_queues_returns_none(self, mock_inspect):
735769
with pytest.raises(SystemExit) as exc_info:
736770
celery_command.worker_health_check(args)
737771

738-
assert exc_info.value.code != 0
772+
assert "catatonic state" in str(exc_info.value)
739773
mock_instance.ping.assert_called_once()
740774
mock_instance.active_queues.assert_called_once()
741775

@@ -760,7 +794,7 @@ def test_health_check_fails_when_worker_not_in_active_queues(self, mock_inspect)
760794
with pytest.raises(SystemExit) as exc_info:
761795
celery_command.worker_health_check(args)
762796

763-
assert exc_info.value.code != 0
797+
assert "catatonic state" in str(exc_info.value)
764798

765799
@pytest.mark.db_test
766800
@mock.patch("airflow.providers.celery.executors.celery_executor.app.control.inspect")
@@ -783,7 +817,7 @@ def test_health_check_fails_when_active_queues_list_is_empty(self, mock_inspect)
783817
with pytest.raises(SystemExit) as exc_info:
784818
celery_command.worker_health_check(args)
785819

786-
assert exc_info.value.code != 0
820+
assert "catatonic state" in str(exc_info.value)
787821

788822
@pytest.mark.db_test
789823
@mock.patch("socket.gethostname")
@@ -805,7 +839,7 @@ def test_health_check_defaults_hostname_from_socket(self, mock_inspect, mock_get
805839
celery_command.worker_health_check(args)
806840

807841
# Verify inspect was targeted at the auto-resolved hostname
808-
mock_inspect.assert_called_once_with(destination=[expected_hostname])
842+
mock_inspect.assert_called_once_with(destination=[expected_hostname], timeout=5.0)
809843

810844

811845
class TestLoggerSetupHandler:

0 commit comments

Comments
 (0)