Skip to content

fix: Graceful shutdown for all UDFs#337

Merged
BulkBeing merged 15 commits intomainfrom
graceful-shutdown
Mar 18, 2026
Merged

fix: Graceful shutdown for all UDFs#337
BulkBeing merged 15 commits intomainfrom
graceful-shutdown

Conversation

@BulkBeing
Copy link
Contributor

@BulkBeing BulkBeing commented Mar 16, 2026

Tested all vertex types by running it in a pipeline

NOTE: multiproc map was not tested since numaflow Rust version doesn't support it. I only checked the behaviour on pod deletion.

Screenshot 2026-03-17 at 8 33 49 AM Screenshot 2026-03-17 at 8 33 27 AM

@codecov
Copy link

codecov bot commented Mar 16, 2026

Codecov Report

❌ Patch coverage is 77.00258% with 89 lines in your changes missing coverage. Please review.
✅ Project coverage is 92.50%. Comparing base (bcdc4c3) to head (e0e2624).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...flow/pynumaflow/sourcer/servicer/async_servicer.py 74.24% 6 Missing and 11 partials ⚠️
...flow/pynumaflow/reducer/servicer/async_servicer.py 75.00% 5 Missing and 2 partials ⚠️
...ow/pynumaflow/accumulator/servicer/task_manager.py 16.66% 5 Missing ⚠️
.../pynumaflow/accumulator/servicer/async_servicer.py 88.23% 0 Missing and 4 partials ⚠️
...flow/pynumaflow/mapper/_servicer/_sync_servicer.py 87.09% 2 Missing and 2 partials ⚠️
...s/pynumaflow/pynumaflow/mapper/multiproc_server.py 50.00% 4 Missing ⚠️
...w/pynumaflow/sourcetransformer/multiproc_server.py 50.00% 4 Missing ⚠️
...pynumaflow/sourcetransformer/servicer/_servicer.py 87.09% 2 Missing and 2 partials ⚠️
.../pynumaflow/pynumaflow/accumulator/async_server.py 66.66% 2 Missing and 1 partial ⚠️
.../pynumaflow/pynumaflow/batchmapper/async_server.py 66.66% 3 Missing ⚠️
... and 14 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #337      +/-   ##
==========================================
- Coverage   94.34%   92.50%   -1.84%     
==========================================
  Files          67       67              
  Lines        3182     3509     +327     
  Branches      179      229      +50     
==========================================
+ Hits         3002     3246     +244     
- Misses        145      199      +54     
- Partials       35       64      +29     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@BulkBeing BulkBeing marked this pull request as ready for review March 17, 2026 06:02
@BulkBeing BulkBeing requested a review from yhl25 March 17, 2026 06:07
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
# event loop explicitly here, the python process will not exit.
# It reamins stuck for 5 minutes until liveness and readiness probe
# fails enough times and k8s sends a SIGTERM
asyncio.get_event_loop().stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
asyncio.get_event_loop().stop()
asyncio.get_running_loop().stop()

asyncio.get_event_loop() is deprecated when called outside a running loop. Use asyncio.get_running_loop() instead ?

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@BulkBeing BulkBeing requested a review from vigith March 18, 2026 00:13
Copy link
Member

@vigith vigith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am doing a cursory review, please let me know if this is valid.

shutdown_task = asyncio.create_task(_watch_for_shutdown())
try:
await server.wait_for_termination()
except asyncio.CancelledError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this happens, then rest of the code after except won't be invoked, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The asyncio.CancelledError will be raised when event loop shutdown or the task is cancelled explicitly. This will cause the block of code under except asyncio.CancelledError to execute. We want to ignore this exception.
All other exceptions will be caught in the BaseException catching blocks, which are categorized as critical and mostly indicate a UDF error, which we should propagate to numa.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment why we are doing so for posterity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked this part of the code again. There are no BaseException catching here. The reason is added under CancelledError exception block.

# SIGTERM received — aiorun cancels all tasks. We must stop
# the gRPC server explicitly so its __del__ doesn't try to
# schedule a coroutine on the already-closed event loop. 

I was seeing something like below due to Python's GC during shutdown of the server:

  Exception ignored in: <function _Server.__del__ at 0x...>
  Traceback (most recent call last):
    File ".../grpc/aio/_server.py", line ..., in __del__
      self._loop.call_soon_threadsafe(...)
  RuntimeError: Event loop is closed

RuntimeError: cannot schedule new futures after shutdown

I will update the comment with more details.

return

except BaseException as e:
_LOGGER.critical("Reduce Error", exc_info=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need both the critical logs? one with err_msg seems to contain more info, you might want to prefix "Reduce Error: "

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in packages/pynumaflow/pynumaflow/accumulator/servicer/async_servicer.py i see a bigger diff with calls to .stop() while this file doesn't seem to have it. Is that ok? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both use different approaches.
Reducer handles exceptions directly - the servicer itself directly iterates requests, manages tasks, and yields responses. Errors in __invoke_reduce propagate as exceptions up to the servicer's ReduceFn where _shutdown_event.set() is called.

In accumulator, errors from the task manager flow through the queue to the servicer, which then triggers shutdown. So AccumulateFn has 4 shutdown points: consumer loop error, consumer CancelledError, producer await error, producer CancelledError. Meanwhile the task manager also has its own error handling that puts errors on the result queue (but does not set _shutdown_event).

This approach felt easier for accumulator.

Copy link
Member

@vigith vigith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update with comments

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@BulkBeing BulkBeing merged commit 7a908f2 into main Mar 18, 2026
10 of 11 checks passed
@BulkBeing BulkBeing deleted the graceful-shutdown branch March 18, 2026 04:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants