-
Notifications
You must be signed in to change notification settings - Fork 152
Make async functions mappable #493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| @gen_test() | ||
| def test_map_async_tornado(): | ||
| @gen.coroutine | ||
| def add_tor(x=0, y=0): | ||
| return x + y | ||
|
|
||
| async def add_native(x=0, y=0): | ||
| return x + y | ||
|
|
||
| source = Stream(asynchronous=True) | ||
| L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list() | ||
|
|
||
| yield source.emit(0) | ||
|
|
||
| yield gen.moment # Must yield to the event loop to ensure it finished | ||
| assert L == [3] | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_map_async(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a Tornado user so I still do not really understand the implication of using it vs native asyncio so I wrote the test twice to show that either harness will run coroutines from each other.
| assert_eq(pd.concat(L), expected) | ||
|
|
||
|
|
||
| @flaky(max_runs=3, min_passes=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed this one leaving an upstream around on occasion. It might be related to GC changes in 3.13+.
781e691 to
c102bbd
Compare
|
I believe there are other places in the code where iscoroutinefunction (or isawaitable ?) to decide what to do? It could be argued that, from the current state of the package, all nodes should be asynchronous... |
Things I learned about Tornado and asyncio:
I ended up figuring out a way to mimic Akka Streams's parallelism parameter for mapAsync which warrants keeping I have definitely run into problems in stream processing when the mapping function is heavy-weight and the system was creating a strong happens-before relationship between mapping successive elements so I wanted to give an option to make that a weaker condition than forcing a total ordering on each await. |
You mean the use of I am wondering whether we should be adding any further tornado-specific functionality at this points. If people make their async functions with |
In Akka Streams, the API is designed such that In a prior job, we had a problem where one of our enrichment functions used
In this case, I was able to support Tornado coroutines without making them a primary design decision. I made the docstring examples for |
I tried map with a coroutine and it failed spectacularly:
```
@gen_test()
def test_map_async_tornado():
@gen.coroutine
def add_tor(x=0, y=0):
return x + y
source = Stream(asynchronous=True)
L = source.map(add_tor, y=1).map(add_tor, y=2).sink_to_list()
yield source.emit(0)
yield gen.moment # yield to the event loop to ensure it finished
> assert L == [3]
E assert [<Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")>] == [3]
E
E At index 0 diff: <Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")> != 3
E
E Full diff:
E [
E - 3,
E + <Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")>,
E ]
```
So I made a new `map_async` that uses native asyncio plumbing to await
the coroutine before feeding it downstream.
The background task can't return obviously if we want the stream to continue operating.
Use an asyncio.Queue of the tasks to ensure that arrival and departure order of elements match. Asserts back pressure when a new value arrives via update but the work queue is full. Because asyncio.Queue cannot peak, the parallelism factor is not precise as the worker callback can have either zero or one task in hand but it must free up a slot in the queue to do so. Under pressure, the parallelism will generally be `(parallelism + 1)` instead of `parallelism` as given in the `__init__` as one Future will be in the awaited in the worker callback while the queue fills up from update calls.
martindurant
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After (too much) thinking about this, I don't see an obvious way to simplify things. It ends up more complicated than I would have thought, because of passing the queue around - but the user doesn't need to know anything about that.
| result = await task | ||
| except Exception as e: | ||
| logger.exception(e) | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only way to exit the loop. There should probably be a stop() method, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timed_window, timed_window_unique, delay, buffer, and latest all use the same while True: ... construct for their work callback.
Fun Fact: the event loop itself only holds a weak reference to any task so when the enclosing node is GCed, the underlying task can be swept away as long as it is not currently running. Once the queue starves it will get stuck waiting on an item that will never come and never schedule back in.
https://docs.python.org/3.14/library/asyncio-task.html#asyncio.create_task
I'm not actually sure that raising the exception is correct as it will kill the worker task and clog the stream. map raises the exception from update which should blow up the entire stream directly, right?
| self._release_refs(metadata) | ||
|
|
||
| async def _wait_for_work_slot(self): | ||
| while self.work_queue.full(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was worried this would end up a busy loop eating the CPU - but if the queue is full, there must be coroutines waiting, so the sleep below will always yiel;d the loop to something else, right? I think, then, that this is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, await asyncio.sleep(0) is the defined way to yield the loop:
https://docs.python.org/3.14/library/asyncio-task.html#asyncio.sleep
Sleep always yields so any other tasks on the loop have priority for next schedule slot, not just the ones held in the work queue here, but we are guaranteed to have at least one of those since the queue is full and we have the work callback. If the work items are all long running and they are blocked (say on IO) and the queue is full then back pressure will propagate upsteam via the Task enclosing _insert_job which cannot progress until this loop exits and eventually the only task on the loop can make progress is technically this one, but it will always immediately yield the loop so as soon as any other task can make progress, that task will take the loop.
| await self._wait_for_work_slot() | ||
| coro = self.func(x, *self.args, **self.kwargs) | ||
| task = self._create_task(coro) | ||
| await self.work_queue.put((task, metadata)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a race possible with the await here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not within the semantics of traditional interpreter. In free-threaded mode, maybe? The asyncio Queue is not thread safe but within an event loop (which must run entirely within a single thread) the get/put pair will not yield the event loop until the internal state of the Queue is consistent and they have achieved the requested action. If they could not complete the action in the current state, they block themselves on a Future that can only complete once the complementary action resolves entirely and once that Future comes back with a result, they do not yield the loop until they are done modifying the internal deque.
Creates new map_async API on Stream and updates the documentation around async work to note it.
I tried map with a coroutine and it failed spectacularly:
So I made a new
map_asyncthat uses native asyncio plumbing to await the coroutine before feeding it downstream.The distinguishing of
mapandmap_asyncwas inspired by prior work with mapAsync from Akka Streams as the use of anasyncio.Queueand a callback running on the loop to drain it seems like a decent amount of overhead to avoid when we do not need it.I'm still trying to figure out if inspect would work to adapt dynamically given either a native
async defor a Tornadogen.coroutine. If so, diverging the API would not be needed.