feat(providers/celery): add worker-health-check command to detect catatonic state after broker restart#63583
Conversation
|
@antonio-mello-ai This PR has been converted to draft because it does not yet meet our Pull Request quality criteria. Issues found:
What to do next:
Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. If you have questions, feel free to ask on the Airflow Slack. |
…alth-check command - Update count assertion from 9 to 10 in test_celery_commands_count - Add "worker-health-check" to parametrize list in test_celery_subcommands_defined Closes apache#63583 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Hey @antonio-mello-ai, the catatonic worker problem after broker restarts is a real pain — we've seen similar issues in production, so this is a welcome addition. The two-stage check (ping then active_queues) gives good diagnostic clarity. A few things I noticed:
No timeout on the inspect calls. If the broker is unreachable or slow, inspector = celery_app.control.inspect(destination=[hostname], timeout=5.0)No exception handling around the inspect calls. If the broker is fully down (not just restarted), these calls could raise Missing test for broker connection errors. The 7 tests cover the happy path and the various catatonic states well, but there's no test for what happens when Minor: Overall the approach is solid — detecting catatonic workers through |
All reference issues are close ones. Do we have any open issue or a confirmation from Celery that this is an open bug? |
8c0b4de to
3bf4dbd
Compare
…alth-check command - Update count assertion from 9 to 10 in test_celery_commands_count - Add "worker-health-check" to parametrize list in test_celery_subcommands_defined Closes apache#63583 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Rebased on latest Addressing @eladkal's question: is this still an open bug?The referenced issues are closed, but the bug persists in recent Celery versions:
Addressing @Vamsi-klu's feedbackAll five points addressed in this push:
All 9 tests passing, ruff + pre-commit hooks clean. |
|
I don't see reason for doing any kind of workaround from Airflow when the Celery issue quoted last response is March 2025. Maybe consider submitting a fix to upstream library? |
|
@eladkal Fair point — submitting upstream fixes instead. I've identified the root cause and submitted two companion PRs:
These two fixes together address the "catatonic worker" problem at the root. If they're accepted upstream, the health check in this PR becomes unnecessary. Happy to close this PR and track progress on the upstream fixes instead, or keep it open as a defense-in-depth measure until the upstream fixes land in a release — your call. |
|
Cool. So when both accepted and released upstream we can modify this PR to just update the minimum version of the libraries. |
eladkal
left a comment
There was a problem hiding this comment.
Temporary block to avoid accidental merge
5174feb to
ff8842f
Compare
…alth-check command - Update count assertion from 9 to 10 in test_celery_commands_count - Add "worker-health-check" to parametrize list in test_celery_subcommands_defined Closes apache#63583 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
+1 to @eladkal - Smells like should be fixed in Celery. Rationale also is that if we would check the active queues, it might be the queue attachment was administratively removed for the worker, e.g. via Flower. So just checking the registration for a queue is not a health signal on its own. |
…alth-check command - Update count assertion from 9 to 10 in test_celery_commands_count - Add "worker-health-check" to parametrize list in test_celery_subcommands_defined Closes apache#63583 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
ff8842f to
5d1516f
Compare
…alth-check command - Update count assertion from 9 to 10 in test_celery_commands_count - Add "worker-health-check" to parametrize list in test_celery_subcommands_defined Closes apache#63583 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
5d1516f to
39ddff8
Compare
…alth-check command - Update count assertion from 9 to 10 in test_celery_commands_count - Add "worker-health-check" to parametrize list in test_celery_subcommands_defined Closes apache#63583 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
fe12b19 to
363ea12
Compare
…alth-check command - Update count assertion from 9 to 10 in test_celery_commands_count - Add "worker-health-check" to parametrize list in test_celery_subcommands_defined Closes apache#63583 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…alth-check command - Update count assertion from 9 to 10 in test_celery_commands_count - Add "worker-health-check" to parametrize list in test_celery_subcommands_defined Closes apache#63583 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
363ea12 to
2989e70
Compare
…alth-check command - Update count assertion from 9 to 10 in test_celery_commands_count - Add "worker-health-check" to parametrize list in test_celery_subcommands_defined Closes apache#63583 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2989e70 to
091e679
Compare
…alth-check command - Update count assertion from 9 to 10 in test_celery_commands_count - Add "worker-health-check" to parametrize list in test_celery_subcommands_defined Closes apache#63583 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
091e679 to
e54e291
Compare
…te after broker restart After a Redis broker restart, a Celery worker can enter a catatonic state where it reconnects at transport level (celery inspect ping returns OK) but silently loses its queue consumer registrations (active_queues returns None). Tasks pile up in the queue and are never picked up. The existing health check in docker-compose and Kubernetes only uses `celery inspect ping`, which does not detect this state. This change adds a new `airflow celery worker-health-check` CLI command that performs a two-stage check: 1. Ping check: verifies the worker is alive and reachable. 2. Active queues check: verifies the worker has registered queue consumers. If either check fails, the command exits with a non-zero status so that container orchestrators (Docker, Kubernetes) can restart the worker. The docker-compose.yaml health check for airflow-worker is updated to use the new command instead of the raw `celery inspect ping`. Closes apache#63580 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…alth-check command - Update count assertion from 9 to 10 in test_celery_commands_count - Add "worker-health-check" to parametrize list in test_celery_subcommands_defined Closes apache#63583 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Replace os.EX_UNAVAILABLE with SystemExit(message) for portability - Add timeout=5.0 to inspect() calls for defense in depth - Add try/except around inspect calls for clean broker error handling - Add tests for broker unreachable scenarios (ping and active_queues) - Move import socket to module-level Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
e54e291 to
7424543
Compare
Summary
airflow celery worker-health-checkCLI command that detects the "catatonic worker" state where a Celery worker is alive but has silently lost its queue consumer registration after a Redis broker restartcelery inspect ping) does not detect this state — the worker responds to ping but consumes no tasksinspect.ping()to verify the worker is alive, then (2)inspect.active_queues()to verify it has registered queue consumersRoot Cause
This addresses a known upstream Celery bug (celery/celery#8030, celery/celery#9054, celery/celery#8990) where after a Redis broker restart, the worker reconnects at the transport level but fails to re-register its consumer on the queue. The worker process stays alive,
celery inspect pingreturns OK, butinspect.active_queues()returnsNone. Tasks pile up in the Redis queue and are never consumed.The partial fix in celery/celery#8796 did not fully resolve the issue, and it persists across Celery 5.2.x through 5.5.x with Redis broker.
This PR takes a defensive approach on Airflow's side — instead of waiting for an upstream fix, we improve the health check to detect and recover from the catatonic state by triggering a container restart.
Changes
celery_command.py: Newworker_health_check()function with two-stage verification (ping + active_queues)definition.py: Registerworker-health-checkCLI command with optional-Hhostname argumentdocker-compose.yaml: Update worker health check fromcelery inspect pingtoairflow celery worker-health-checktest_celery_command.py: 7 new tests covering all scenarios (healthy, ping failure, catatonic states, auto-hostname)Test Plan
socket.gethostname()when-Hnot providedCloses #63580
🤖 Generated with Claude Code
Co-Authored-By: Claude Opus 4.6 noreply@anthropic.com