Conversation
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
45e64ff to
6c37cb5
Compare
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
| # event loop explicitly here, the python process will not exit. | ||
| # It reamins stuck for 5 minutes until liveness and readiness probe | ||
| # fails enough times and k8s sends a SIGTERM | ||
| asyncio.get_event_loop().stop() |
There was a problem hiding this comment.
| asyncio.get_event_loop().stop() | |
| asyncio.get_running_loop().stop() |
asyncio.get_event_loop() is deprecated when called outside a running loop. Use asyncio.get_running_loop() instead ?
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
vigith
left a comment
There was a problem hiding this comment.
I am doing a cursory review, please let me know if this is valid.
| shutdown_task = asyncio.create_task(_watch_for_shutdown()) | ||
| try: | ||
| await server.wait_for_termination() | ||
| except asyncio.CancelledError: |
There was a problem hiding this comment.
if this happens, then rest of the code after except won't be invoked, correct?
There was a problem hiding this comment.
The asyncio.CancelledError will be raised when event loop shutdown or the task is cancelled explicitly. This will cause the block of code under except asyncio.CancelledError to execute. We want to ignore this exception.
All other exceptions will be caught in the BaseException catching blocks, which are categorized as critical and mostly indicate a UDF error, which we should propagate to numa.
There was a problem hiding this comment.
can you add a comment why we are doing so for posterity?
There was a problem hiding this comment.
I checked this part of the code again. There are no BaseException catching here. The reason is added under CancelledError exception block.
# SIGTERM received — aiorun cancels all tasks. We must stop
# the gRPC server explicitly so its __del__ doesn't try to
# schedule a coroutine on the already-closed event loop. I was seeing something like below due to Python's GC during shutdown of the server:
Exception ignored in: <function _Server.__del__ at 0x...>
Traceback (most recent call last):
File ".../grpc/aio/_server.py", line ..., in __del__
self._loop.call_soon_threadsafe(...)
RuntimeError: Event loop is closed
RuntimeError: cannot schedule new futures after shutdownI will update the comment with more details.
| return | ||
|
|
||
| except BaseException as e: | ||
| _LOGGER.critical("Reduce Error", exc_info=True) |
There was a problem hiding this comment.
do we need both the critical logs? one with err_msg seems to contain more info, you might want to prefix "Reduce Error: "
There was a problem hiding this comment.
in packages/pynumaflow/pynumaflow/accumulator/servicer/async_servicer.py i see a bigger diff with calls to .stop() while this file doesn't seem to have it. Is that ok? :)
There was a problem hiding this comment.
Both use different approaches.
Reducer handles exceptions directly - the servicer itself directly iterates requests, manages tasks, and yields responses. Errors in __invoke_reduce propagate as exceptions up to the servicer's ReduceFn where _shutdown_event.set() is called.
In accumulator, errors from the task manager flow through the queue to the servicer, which then triggers shutdown. So AccumulateFn has 4 shutdown points: consumer loop error, consumer CancelledError, producer await error, producer CancelledError. Meanwhile the task manager also has its own error handling that puts errors on the result queue (but does not set _shutdown_event).
This approach felt easier for accumulator.
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Tested all vertex types by running it in a pipeline
kubectl delete pod) as well as UDF exceptionNOTE: multiproc map was not tested since numaflow Rust version doesn't support it. I only checked the behaviour on pod deletion.