-
Notifications
You must be signed in to change notification settings - Fork 26
fix: Avoid UDF call in map streaming from starving eventloop #349
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+154
−1
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
129 changes: 129 additions & 0 deletions
129
packages/pynumaflow/tests/mapstream/test_async_map_stream_streaming.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| """ | ||
| Test that MapStreamAsyncServer streams messages incrementally even when the | ||
| user handler yields via a regular for-loop (no await between yields). | ||
|
|
||
| Regression test for https://github.com/numaproj/numaflow-python/issues/342 | ||
|
|
||
| Root cause: asyncio.Queue.put() on an unbounded queue never suspends, so the | ||
| MapFn consumer task was starved and couldn't stream responses to gRPC until | ||
| the handler completed. Fix: asyncio.sleep(0) after each put in the servicer. | ||
| """ | ||
|
|
||
| import logging | ||
| import threading | ||
| import time | ||
| from collections import deque | ||
| from collections.abc import AsyncIterable | ||
|
|
||
| import grpc | ||
| import pytest | ||
|
|
||
| from pynumaflow import setup_logging | ||
| from pynumaflow.mapstreamer import Datum, MapStreamAsyncServer, Message | ||
| from pynumaflow.proto.mapper import map_pb2_grpc | ||
| from tests.conftest import create_async_loop, start_async_server, teardown_async_server | ||
| from tests.mapstream.utils import request_generator | ||
|
|
||
| LOGGER = setup_logging(__name__) | ||
|
|
||
| pytestmark = pytest.mark.integration | ||
|
|
||
| SOCK_PATH = "unix:///tmp/async_map_stream_streaming.sock" | ||
|
|
||
| NUM_MESSAGES = 5 | ||
| PRODUCE_INTERVAL_SECS = 0.2 | ||
|
|
||
|
|
||
| async def slow_streaming_handler(keys: list[str], datum: Datum) -> AsyncIterable[Message]: | ||
| """ | ||
| Handler that produces messages from a background thread with a delay | ||
| between each, and yields them via a tight for-loop with NO await. | ||
| This is the pattern from issue #342. | ||
| """ | ||
| messages: deque[Message] = deque() | ||
|
|
||
| def _produce(): | ||
| for i in range(NUM_MESSAGES): | ||
| messages.append(Message(f"msg-{i}".encode(), keys=keys)) | ||
| time.sleep(PRODUCE_INTERVAL_SECS) | ||
|
|
||
| thread = threading.Thread(target=_produce) | ||
| thread.start() | ||
|
|
||
| while thread.is_alive(): | ||
| # Tight loop: regular for, no await — the pattern that triggers #342 | ||
| while messages: | ||
| yield messages.popleft() | ||
|
|
||
| thread.join() | ||
| while messages: | ||
| yield messages.popleft() | ||
|
|
||
|
|
||
| async def _start_server(udfs): | ||
| server = grpc.aio.server() | ||
| map_pb2_grpc.add_MapServicer_to_server(udfs, server) | ||
| server.add_insecure_port(SOCK_PATH) | ||
| logging.info("Starting server on %s", SOCK_PATH) | ||
| await server.start() | ||
| return server, SOCK_PATH | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def streaming_server(): | ||
| loop = create_async_loop() | ||
| server_obj = MapStreamAsyncServer(map_stream_instance=slow_streaming_handler) | ||
| udfs = server_obj.servicer | ||
| server = start_async_server(loop, _start_server(udfs)) | ||
| yield loop | ||
| teardown_async_server(loop, server) | ||
|
|
||
|
|
||
| @pytest.fixture() | ||
| def streaming_stub(streaming_server): | ||
| return map_pb2_grpc.MapStub(grpc.insecure_channel(SOCK_PATH)) | ||
|
|
||
|
|
||
| def test_messages_stream_incrementally(streaming_stub): | ||
| """ | ||
| Verify that messages are streamed to the client as they are produced, | ||
| not batched until the handler completes. | ||
|
|
||
| The handler produces NUM_MESSAGES messages with PRODUCE_INTERVAL_SECS between | ||
| each. If streaming works, the first message should arrive well before the | ||
| last one is produced (total production time = NUM_MESSAGES * PRODUCE_INTERVAL_SECS). | ||
| """ | ||
| generator_response = streaming_stub.MapFn( | ||
| request_iterator=request_generator(count=1, session=1) | ||
| ) | ||
|
|
||
| # Consume handshake | ||
| handshake = next(generator_response) | ||
| assert handshake.handshake.sot | ||
|
|
||
| # Collect messages with their arrival timestamps | ||
| arrival_times = [] | ||
| result_count = 0 | ||
| for msg in generator_response: | ||
| if hasattr(msg, "status") and msg.status.eot: | ||
| continue | ||
| arrival_times.append(time.monotonic()) | ||
| result_count += 1 | ||
|
|
||
| assert result_count == NUM_MESSAGES, f"Expected {NUM_MESSAGES} messages, got {result_count}" | ||
|
|
||
| # If messages streamed incrementally, the time span between the first and | ||
| # last arrival should be a significant portion of the total production time. | ||
| # If they were batched, they'd all arrive within a few milliseconds of each other. | ||
| total_production_time = NUM_MESSAGES * PRODUCE_INTERVAL_SECS | ||
| first_to_last = arrival_times[-1] - arrival_times[0] | ||
|
|
||
| # The spread should be at least 40% of production time if streaming works. | ||
| # If batched, the spread would be near zero (~1-5ms). | ||
| min_expected_spread = total_production_time * 0.4 | ||
| assert first_to_last >= min_expected_spread, ( | ||
| f"Messages arrived too close together ({first_to_last:.3f}s spread), " | ||
| f"expected at least {min_expected_spread:.3f}s. " | ||
| f"This indicates messages were batched instead of streamed. " | ||
| f"Arrival gaps: {[f'{b - a:.3f}s' for a, b in zip(arrival_times, arrival_times[1:])]}" | ||
| ) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is a bug on how the user is writing the
forloop, right? The should useasync forbut usingfor. Is my understanding correct?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they make a blocking call in the middle, the event loop should get blocked out as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though @BulkBeing when we were using the generator pattern in the previous version, we don't see this behavior because we are yielding right away? Instead of storing and yielding in different methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay we were not doing parallel invocations at that point, so only one message was getting processed regardless
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BulkBeing can we try to put the user code in an async loop and test once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, in the previous version (
0.9.0) where it was working for the user, we were not using async queue.Python logs something like this as well if the task was stuck without yielding (the code I used yields 1 value every second for 20 seconds):
Let me try converting it to
async forloop.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works fine (without
asyncio.sleep(0)) if the UDF uses async generator (async forwithyield):Note that it results in more changes like changing
get_messagestoasync, using anasyncio.AsyncQueueto communicate between background thread and main thread, passing event loop to background thread and using the queue like this:self.loop.call_soon_threadsafe(self.queue.put_nowait, msg)(AsyncQueueisn't thread safe).In short, with
get_messages(), there is real await points likemsg = await self.queue.get().