fix: Avoid UDF call in map streaming from starving eventloop#349
fix: Avoid UDF call in map streaming from starving eventloop#349
Conversation
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #349 +/- ##
==========================================
- Coverage 92.68% 92.65% -0.03%
==========================================
Files 67 67
Lines 3513 3514 +1
Branches 229 229
==========================================
Hits 3256 3256
Misses 193 193
- Partials 64 65 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
| # The starvation can happen if the UDF code yields messages using regular | ||
| # for-loop (non async). See the sample code in https://github.com/numaproj/numaflow-python/issues/342 |
There was a problem hiding this comment.
So this is a bug on how the user is writing the for loop, right? The should use async forbut using for. Is my understanding correct?
There was a problem hiding this comment.
If they make a blocking call in the middle, the event loop should get blocked out as well.
There was a problem hiding this comment.
Though @BulkBeing when we were using the generator pattern in the previous version, we don't see this behavior because we are yielding right away? Instead of storing and yielding in different methods?
There was a problem hiding this comment.
Okay we were not doing parallel invocations at that point, so only one message was getting processed regardless
There was a problem hiding this comment.
@BulkBeing can we try to put the user code in an async loop and test once?
There was a problem hiding this comment.
Yea, in the previous version (0.9.0) where it was working for the user, we were not using async queue.
Python logs something like this as well if the task was stuck without yielding (the code I used yields 1 value every second for 20 seconds):
2026-04-03 10:49:50,256 asyncio WARNING Executing <Task finished name='Task-75' coro=<AsyncMapStreamServicer._invoke_map_stream() done, defined at /app/.venv/lib/python3.13/site-packages/pynumaflow/mapstreamer/servicer/async_servicer.py:118> result=None created at /usr/local/lib/python3.13/asyncio/tasks.py:410> took 20.143 seconds
Let me try converting it to async for loop.
There was a problem hiding this comment.
It works fine (without asyncio.sleep(0)) if the UDF uses async generator (async for with yield):
async for msg in processor_monitor.get_messages():
yielded += 1
log.info("Yielding message %d", yielded)
yield msgNote that it results in more changes like changing get_messages to async, using an asyncio.AsyncQueue to communicate between background thread and main thread, passing event loop to background thread and using the queue like this: self.loop.call_soon_threadsafe(self.queue.put_nowait, msg) (AsyncQueue isn't thread safe).
In short, with get_messages(), there is real await points like msg = await self.queue.get().
Fixes #342