Skip to content

feat(providers/celery): add worker-health-check command to detect catatonic state after broker restart#63583

Open
antonio-mello-ai wants to merge 3 commits intoapache:mainfrom
antonio-mello-ai:fix/celery-worker-health-check-active-queues
Open

feat(providers/celery): add worker-health-check command to detect catatonic state after broker restart#63583
antonio-mello-ai wants to merge 3 commits intoapache:mainfrom
antonio-mello-ai:fix/celery-worker-health-check-active-queues

Conversation

@antonio-mello-ai
Copy link
Contributor

Summary

  • Add new airflow celery worker-health-check CLI command that detects the "catatonic worker" state where a Celery worker is alive but has silently lost its queue consumer registration after a Redis broker restart
  • The existing health check (celery inspect ping) does not detect this state — the worker responds to ping but consumes no tasks
  • The new command performs a two-stage check: (1) inspect.ping() to verify the worker is alive, then (2) inspect.active_queues() to verify it has registered queue consumers
  • Update the official docker-compose health check to use the new command

Root Cause

This addresses a known upstream Celery bug (celery/celery#8030, celery/celery#9054, celery/celery#8990) where after a Redis broker restart, the worker reconnects at the transport level but fails to re-register its consumer on the queue. The worker process stays alive, celery inspect ping returns OK, but inspect.active_queues() returns None. Tasks pile up in the Redis queue and are never consumed.

The partial fix in celery/celery#8796 did not fully resolve the issue, and it persists across Celery 5.2.x through 5.5.x with Redis broker.

This PR takes a defensive approach on Airflow's side — instead of waiting for an upstream fix, we improve the health check to detect and recover from the catatonic state by triggering a container restart.

Changes

  • celery_command.py: New worker_health_check() function with two-stage verification (ping + active_queues)
  • definition.py: Register worker-health-check CLI command with optional -H hostname argument
  • docker-compose.yaml: Update worker health check from celery inspect ping to airflow celery worker-health-check
  • test_celery_command.py: 7 new tests covering all scenarios (healthy, ping failure, catatonic states, auto-hostname)

Test Plan

  • Worker healthy (ping + queues OK) → exits 0
  • Ping returns None → exits non-zero
  • Worker absent from ping result → exits non-zero
  • Catatonic: ping OK but active_queues returns None → exits non-zero
  • Catatonic: worker absent from active_queues → exits non-zero
  • Catatonic: empty queue list → exits non-zero
  • Auto-resolves hostname via socket.gethostname() when -H not provided
  • All 7 new tests passing
  • All pre-commit hooks pass

Closes #63580

🤖 Generated with Claude Code

Co-Authored-By: Claude Opus 4.6 noreply@anthropic.com

@potiuk
Copy link
Member

potiuk commented Mar 14, 2026

@antonio-mello-ai This PR has been converted to draft because it does not yet meet our Pull Request quality criteria.

Issues found:

  • Provider tests: Failing: provider distributions tests / Compat 2.11.1:P3.10:, provider distributions tests / Compat 3.0.6:P3.10:, provider distributions tests / Compat 3.1.8:P3.10:, Non-DB tests: providers / Non-DB-prov::3.10:celery...cncf.kubernet, Low dep tests: providers / All-prov:LowestDeps:14:3.10:celery...common.compat. Run provider tests with breeze run pytest <provider-test-path> -xvs. See Provider tests docs.

Note: Your branch is 10 commits behind main. Some check failures may be caused by changes in the base branch rather than by your PR. Please rebase your branch and push again to get up-to-date CI results.

What to do next:

  • The comment informs you what you need to do.
  • Fix each issue, then mark the PR as "Ready for review" in the GitHub UI - but only after making sure that all the issues are fixed.
  • Maintainers will then proceed with a normal review.

Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. If you have questions, feel free to ask on the Airflow Slack.

antonio-mello-ai added a commit to antonio-mello-ai/airflow that referenced this pull request Mar 14, 2026
…alth-check command

- Update count assertion from 9 to 10 in test_celery_commands_count
- Add "worker-health-check" to parametrize list in test_celery_subcommands_defined

Closes apache#63583

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@antonio-mello-ai antonio-mello-ai marked this pull request as ready for review March 14, 2026 16:32
@Vamsi-klu
Copy link
Contributor

Hey @antonio-mello-ai, the catatonic worker problem after broker restarts is a real pain — we've seen similar issues in production, so this is a welcome addition. The two-stage check (ping then active_queues) gives good diagnostic clarity. A few things I noticed:

os.EX_UNAVAILABLE is Unix-only. It doesn't exist on Windows, and every other SystemExit in this same file uses a descriptive string message (which gives exit code 1). Switching to something like raise SystemExit("Worker health check failed: no active queues") would be more consistent and portable. That also removes the need for the import os inside the function.

No timeout on the inspect calls. If the broker is unreachable or slow, inspect.ping() and inspect.active_queues() can hang indefinitely. Docker's container-level timeout provides some protection, but for a health check specifically, adding timeout=5.0 to the inspect() constructor would be good defense in depth. Something like:

inspector = celery_app.control.inspect(destination=[hostname], timeout=5.0)

No exception handling around the inspect calls. If the broker is fully down (not just restarted), these calls could raise ConnectionError or similar. Right now that would produce an unhandled traceback instead of a clean health check failure. A try/except that logs a clear message and exits non-zero would make the output much cleaner for orchestrators parsing health check logs.

Missing test for broker connection errors. The 7 tests cover the happy path and the various catatonic states well, but there's no test for what happens when inspect.ping() itself raises an exception (e.g., broker unreachable). That's probably the most common real-world failure scenario for a health check.

Minor: import socket is a stdlib module with no initialization side effects, so the "avoid triggering Providers Manager" rationale for local imports doesn't really apply to it. Could be at module level.

Overall the approach is solid — detecting catatonic workers through active_queues() is the right way to catch this specific upstream Celery bug. Just needs a bit of hardening around edge cases.

@eladkal
Copy link
Contributor

eladkal commented Mar 15, 2026

Root Cause
This addresses a known upstream Celery bug (celery/celery#8030, celery/celery#9054, celery/celery#8990) where after a Redis broker restart, the worker reconnects at the transport level but fails to re-register its consumer on the queue. The worker process stays alive, celery inspect ping returns OK, but inspect.active_queues() returns None. Tasks pile up in the Redis queue and are never consumed.

The partial fix in celery/celery#8796 did not fully resolve the issue, and it persists across Celery 5.2.x through 5.5.x with Redis broker.

All reference issues are close ones. Do we have any open issue or a confirmation from Celery that this is an open bug?

@antonio-mello-ai antonio-mello-ai force-pushed the fix/celery-worker-health-check-active-queues branch from 8c0b4de to 3bf4dbd Compare March 15, 2026 13:01
antonio-mello-ai added a commit to antonio-mello-ai/airflow that referenced this pull request Mar 15, 2026
…alth-check command

- Update count assertion from 9 to 10 in test_celery_commands_count
- Add "worker-health-check" to parametrize list in test_celery_subcommands_defined

Closes apache#63583

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@antonio-mello-ai
Copy link
Contributor Author

Rebased on latest main and addressed all feedback. Here's the summary:

Addressing @eladkal's question: is this still an open bug?

The referenced issues are closed, but the bug persists in recent Celery versions:

  • Discussion #9095: Multiple users reporting the same behavior with Celery 5.5.3 through Nov 2025 — workers reconnect but stop consuming tasks. Maintainer @auvipy acknowledged in May 2025: "help us to reproduce this on celery 5.5.x+ please"
  • celery/celery#9631 (open, Mar 2025): Celery consumers losing connection behind Redis HAProxy — same root mechanic
  • No Celery 5.6.x changelog mentions a fix for this reconnection behavior

Addressing @Vamsi-klu's feedback

All five points addressed in this push:

# Feedback Resolution
1 os.EX_UNAVAILABLE is Unix-only Replaced with raise SystemExit("descriptive message"), consistent with the rest of this file
2 No timeout on inspect calls Added timeout=5.0 to inspect() constructor
3 No exception handling for broker errors Added try/except around both ping() and active_queues() with log.exception() and clean exit
4 Missing test for broker connection errors Added two new tests: test_health_check_fails_when_broker_unreachable and test_health_check_fails_when_active_queues_raises
5 import socket can be module-level Moved to top-level imports

All 9 tests passing, ruff + pre-commit hooks clean.

@eladkal
Copy link
Contributor

eladkal commented Mar 15, 2026

I don't see reason for doing any kind of workaround from Airflow when the Celery issue quoted last response is March 2025. Maybe consider submitting a fix to upstream library?

@antonio-mello-ai
Copy link
Contributor Author

@eladkal Fair point — submitting upstream fixes instead.

I've identified the root cause and submitted two companion PRs:

  1. Kombu celery/kombu#2492_on_disconnect in the Redis transport removes the on_poll_start callback from the event loop on disconnect. During reconnection, a stale channel's _on_disconnect can fire after the new channel has re-registered, removing the new callback. The worker stays alive but never calls _register_BRPOP — tasks pile up and are never consumed. Fix: stop removing on_poll_start (it's idempotent).

  2. Celery celery/celery#10204synloop (gevent/eventlet path) lacks the hub.reset() cleanup that asynloop already has. Stale state persists across reconnection, preventing consumer re-registration. Fix: add the same hub.reset() pattern.

These two fixes together address the "catatonic worker" problem at the root. If they're accepted upstream, the health check in this PR becomes unnecessary. Happy to close this PR and track progress on the upstream fixes instead, or keep it open as a defense-in-depth measure until the upstream fixes land in a release — your call.

@eladkal
Copy link
Contributor

eladkal commented Mar 15, 2026

Cool. So when both accepted and released upstream we can modify this PR to just update the minimum version of the libraries.

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary block to avoid accidental merge

@antonio-mello-ai antonio-mello-ai force-pushed the fix/celery-worker-health-check-active-queues branch from 5174feb to ff8842f Compare March 20, 2026 14:36
antonio-mello-ai added a commit to antonio-mello-ai/airflow that referenced this pull request Mar 20, 2026
…alth-check command

- Update count assertion from 9 to 10 in test_celery_commands_count
- Add "worker-health-check" to parametrize list in test_celery_subcommands_defined

Closes apache#63583

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jscheffl
Copy link
Contributor

+1 to @eladkal - Smells like should be fixed in Celery.

Rationale also is that if we would check the active queues, it might be the queue attachment was administratively removed for the worker, e.g. via Flower. So just checking the registration for a queue is not a health signal on its own.

antonio-mello-ai added a commit to antonio-mello-ai/airflow that referenced this pull request Mar 21, 2026
…alth-check command

- Update count assertion from 9 to 10 in test_celery_commands_count
- Add "worker-health-check" to parametrize list in test_celery_subcommands_defined

Closes apache#63583

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@antonio-mello-ai antonio-mello-ai force-pushed the fix/celery-worker-health-check-active-queues branch from ff8842f to 5d1516f Compare March 21, 2026 19:09
antonio-mello-ai added a commit to antonio-mello-ai/airflow that referenced this pull request Mar 21, 2026
…alth-check command

- Update count assertion from 9 to 10 in test_celery_commands_count
- Add "worker-health-check" to parametrize list in test_celery_subcommands_defined

Closes apache#63583

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@antonio-mello-ai antonio-mello-ai force-pushed the fix/celery-worker-health-check-active-queues branch from 5d1516f to 39ddff8 Compare March 21, 2026 21:04
antonio-mello-ai added a commit to antonio-mello-ai/airflow that referenced this pull request Mar 21, 2026
…alth-check command

- Update count assertion from 9 to 10 in test_celery_commands_count
- Add "worker-health-check" to parametrize list in test_celery_subcommands_defined

Closes apache#63583

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@antonio-mello-ai antonio-mello-ai force-pushed the fix/celery-worker-health-check-active-queues branch 2 times, most recently from fe12b19 to 363ea12 Compare March 22, 2026 11:04
antonio-mello-ai added a commit to antonio-mello-ai/airflow that referenced this pull request Mar 22, 2026
…alth-check command

- Update count assertion from 9 to 10 in test_celery_commands_count
- Add "worker-health-check" to parametrize list in test_celery_subcommands_defined

Closes apache#63583

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
antonio-mello-ai added a commit to antonio-mello-ai/airflow that referenced this pull request Mar 22, 2026
…alth-check command

- Update count assertion from 9 to 10 in test_celery_commands_count
- Add "worker-health-check" to parametrize list in test_celery_subcommands_defined

Closes apache#63583

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@antonio-mello-ai antonio-mello-ai force-pushed the fix/celery-worker-health-check-active-queues branch from 363ea12 to 2989e70 Compare March 22, 2026 15:03
antonio-mello-ai added a commit to antonio-mello-ai/airflow that referenced this pull request Mar 22, 2026
…alth-check command

- Update count assertion from 9 to 10 in test_celery_commands_count
- Add "worker-health-check" to parametrize list in test_celery_subcommands_defined

Closes apache#63583

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@antonio-mello-ai antonio-mello-ai force-pushed the fix/celery-worker-health-check-active-queues branch from 2989e70 to 091e679 Compare March 22, 2026 17:05
antonio-mello-ai added a commit to antonio-mello-ai/airflow that referenced this pull request Mar 22, 2026
…alth-check command

- Update count assertion from 9 to 10 in test_celery_commands_count
- Add "worker-health-check" to parametrize list in test_celery_subcommands_defined

Closes apache#63583

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@antonio-mello-ai antonio-mello-ai force-pushed the fix/celery-worker-health-check-active-queues branch from 091e679 to e54e291 Compare March 22, 2026 23:03
antonio-mello-ai and others added 3 commits March 23, 2026 01:03
…te after broker restart

After a Redis broker restart, a Celery worker can enter a catatonic state
where it reconnects at transport level (celery inspect ping returns OK) but
silently loses its queue consumer registrations (active_queues returns None).
Tasks pile up in the queue and are never picked up.

The existing health check in docker-compose and Kubernetes only uses
`celery inspect ping`, which does not detect this state.

This change adds a new `airflow celery worker-health-check` CLI command that
performs a two-stage check:
1. Ping check: verifies the worker is alive and reachable.
2. Active queues check: verifies the worker has registered queue consumers.

If either check fails, the command exits with a non-zero status so that
container orchestrators (Docker, Kubernetes) can restart the worker.

The docker-compose.yaml health check for airflow-worker is updated to use
the new command instead of the raw `celery inspect ping`.

Closes apache#63580

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…alth-check command

- Update count assertion from 9 to 10 in test_celery_commands_count
- Add "worker-health-check" to parametrize list in test_celery_subcommands_defined

Closes apache#63583

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Replace os.EX_UNAVAILABLE with SystemExit(message) for portability
- Add timeout=5.0 to inspect() calls for defense in depth
- Add try/except around inspect calls for clean broker error handling
- Add tests for broker unreachable scenarios (ping and active_queues)
- Move import socket to module-level

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@antonio-mello-ai antonio-mello-ai force-pushed the fix/celery-worker-health-check-active-queues branch from e54e291 to 7424543 Compare March 23, 2026 01:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

CeleryExecutor worker health check does not detect catatonic state after Redis broker restart

5 participants