As part of practicing coroutines and flow, I tried...
# flow
j
As part of practicing coroutines and flow, I tried an implementation of
amb/race
where I don't allow myself to use channels directly. Or, to put it differently, I use
SharedFlow
instead of channels. It runs fine except that final flow collection suspends indefinitely, even after the jobs used to share the losing flows have completed and the winning flow has emitted all its items. Why? I spent a couple days trying to figure out what might be going on, without success. Here's a gist of my work, including the code I'm using to test it, and the output generated. You'll see from the output that the flow correctly emits the exception thrown by the first flow, and cancels the third flow because it emits last (so it loses the race), and all the items in the winning flow are emitted. This is a learning exercise, so I'm interested in the shortest path to getting this implementation to work. I'm not looking for a completely different implementation that works. Also happy to create a project on Github with dependencies (I use Arrow's Either data-type) if anyone's interested in running the code directly. Let me know. Thank you!
I got some help on SO. Looks like this behavior is a result of how SharedFlow works. Collection of a SharedFlow suspends until the coroutine doing the collection is cancelled. Cancelling the job of the scope used to share a flow (i.e. as a parameter to
shareIn
) only terminates the upstream flow. It has no effect on SharedFlow collection suspension. This is why collection of the flow produced by my
amb/race
implementation never terminates. This is the case even if
SharingStarted.WhileSubscribed
is used - because as long as collection is suspended
WhileSubscribed
is still true.