I'm attempting to create a small wrapper for Share...
# coroutines
m
I'm attempting to create a small wrapper for SharedFlows to add some priority functionality and some better dependency injection usability. While doing this I'm observing some extremely odd behavior that I can't seem to figure out. Using this test class which mirrors my unit tests as much as possible you can see the behavior. Basically, I'm acquiring a lock to wait on the flow to be called in order to test the outcome, but the lock is never released because the subscriber is never called despite having clearly subscribed. The extreme weirdness comes in when I add certain specific
println
calls that reference variables in which case the problem disappears while that statement is present. The pasted test class will print the following when ran (and never complete):
Copy code
Registering
Registered
Emitting
Emitting here
Subscribers
0
Done
Suspending
Note that is says there are zero subscribers and finished emitting even though it was registered. However, when uncommenting lines 59 or 63, it still says there are zero subscribers, but the object is emitted to my registered collector and the program finishes. I am relatively new to Flows and have avoided them mostly because of these kinds of things, but I would love to make them work since they are exactly what I need. I feel like I must be doing something wrong. I appreciate any help!
d
Seems like there's a race. The scope on line 36 uses
Dispatchers.Default
, as no other dispatcher was specified. So,
launchIn
starts the collection procedure in parallel and may not do it in time. The lines
59
and
63
probably add enough of a delay for the
launchIn
to consistently be able to start collecting.
m
I thought of that as well and adding
delay(1)
in place of the
println
I believe also allows it to succeed. Adding a different dispatcher wouldn't necessarily guarantee there wouldn't still be a race condition. What would you suggest for ensuring the
collect
finishes before calling an
emit
?
d
You could do something like
.apply { scope.launch(start = CoroutineStart.UNDISPATCHED) { collect() } }
m
Interesting! I was previously unaware of those parameters. Gave that a try and it doesn't seem to be working immediately, but I'm moving things around to see
So that works great actually unless I use
tryEmit
rather than suspending
emit
. For some reason
collect
must be suspending at least once before completion and then again as a subscriber allowing the tryEmit to execute beforehand. If you have any ideas on addressing that specifically I'd appreciate it. However, your previous suggestion is the main problem solver and I appreciate the help!
d
Sorry, I don't follow. It would be easier if you rephrased it in simpler terms. For
MutableSharedFlow
not to just ignore the values, it must have subscribers, which is why the
collect
needs to suspend before the flow can receive values.
tryEmit
can fail if the internal buffer is full, then it returns
false
, but
emit
waits for when it can pass the value.
emit
actually first tries to do
tryEmit
and only suspends if it can't do so.
m
Here's an updated test that does some ongoing calls (more like a real world use case). For some reason the subscriber is only called 2 times out of the 10 calls. If I change to
tryEmit
then it does not call the subscriber at all unless I add
extraBufferCapacity
which it then again calls 2 times. Can you spot an obvious mistake I'm making here?
Looks that was something obvious with my test. I was calling
mutex.unlock()
when it wasn't locked and ignoring the exception. I appreciate your help with this! Looks like it's all good now