Hi, my colleagues and me had a discussion around a...
# coroutines
l
Hi, my colleagues and me had a discussion around a callback flow that was not cancellation-cooperative:
Copy code
internal fun ScheduledAdItem.adItemStatusAsFlow() = callbackFlow {
    trySendBlocking(adItemStatus)
    val statusListener = AdItemStatusListener { _, status -> trySendBlocking(status) }
    addStatusListener(statusListener)
    awaitClose { removeStatusListener(statusListener) }
}
We figured that this is likely due to the
trySendBlocking
which does a
runBlocking
. We then pivoted to the following:
Copy code
internal fun ScheduledAdItem.adItemStatusAsFlow() = callbackFlow {
    send(adItemStatus)
    val statusListener = AdItemStatusListener { _, status -> launch { send(status) }}
    addStatusListener(statusListener)
    awaitClose { removeStatusListener(statusListener) }
}
Which we then figured that it might not guarantee the order of events if run on a coroutine dispatcher with multiple threads. What would be the recommended way to implement a cancellation cooperative callback flow with guaranteed event order? Thanks in advance!
k
Channel.send
is fair, meaning that the first coroutine to invoke
send
will be the first to actually send the element, regardless of if its suspended or not. https://kotlinlang.org/docs/channels.html#channels-are-fair
So, it should preserve order.
l
thanks for the fast response. ๐Ÿ™‡ but what about the
launch
itself? wouldn't this suspend and potentially be not fair on a say default coroutine dispatcher?
such as in this example?
k
It's possible I suppose. You could use
launch(start=UNDISPATCHED)
to ensure the same fairness guarantees as the channel offers.
l
Ah, interesting, I didn't know about that. Would this be the recommended way to do it?
k
It depends on your use case. I have very rarely used undispatched, but it comes in handy sometimes.
e
why
trySendBlocking
in a
launch
anyway? if you're in a suspend scope, just use
trySend
or
send
itself
๐Ÿ‘ 1
but yeah
launch
depends on the dispatcher and doesn't guarantee fairness in general
l
The
trySendBlocking
is not in a launch, only the
send
is.
(or am I missing something? ๐Ÿ˜…)
k
If I had to guess, the callback for
AddItemStatusListener
is synchronous
So a launch is necessary.
l
AddItemStatusListener
is synchronous, sorry for the missing information.
๐Ÿ‘ 1
e
oh I mixed up the two versions in the original post then
๐Ÿ‘ 1
the
Undiapatched
dispatcher does start running the coroutine in the caller, so it should do the trick for this case
however it also means the coroutine will resume in the context of wherever resumed it, which could be some random thread
l
Any thoughts on cancellation in this case? The
launch
would no longer suspend but the send would still, so should be good?
๐Ÿ‘ 1
k
I think generally this is made more complicated by running it on a mulithreaded dispatcher. If you can avoid that things would be simpler.
l
Hmm, yea, we would run it on the main dispatcher anyways most likely but it doesn't feel good to define this in a way it only works if called from the correct context.
v
the
Undiapatched
dispatcher does start running the coroutine in the caller, so it should do the trick for this case
however it also means the coroutine will resume in the context of wherever resumed it, which could be some random thread
But
start = CoroutineStart.UNDISPATCHED
should work without that caveat?
e
oh oops
yes, I meant the Unconfined dispatcher. the Undispatched start mode does not do that
somehow I missed the
start=
in the reply and then forgot the tiny difference in name
sorry, maybe I need more coffee first
โ˜• 1
v
It's a really unfortunate naming collision
f
The correct solution IMHO if you want to retain all the events from the callback is to use
trySend
(not blocked!) to send the event to the
callbackFlow
and make the underlying channel buffered (
UNLIMITED
!) with the
.buffer
operator after
callbackFlow
nod 1
l
Thanks for all the input, this is what we settled for in the end. It seems like this is not cooperative for cancellations as well so we added a
yield
after collection of the flow to ensure we don't process any values in case the coroutine was cancelled.
(we didn't set the buffer to
UNLIMITED
though cause we know that the number of callbacks is << 64 which is the default buffer size)