Lukas K-G
10/22/2025, 1:35 PMinternal fun ScheduledAdItem.adItemStatusAsFlow() = callbackFlow {
trySendBlocking(adItemStatus)
val statusListener = AdItemStatusListener { _, status -> trySendBlocking(status) }
addStatusListener(statusListener)
awaitClose { removeStatusListener(statusListener) }
}
We figured that this is likely due to the trySendBlocking which does a runBlocking.
We then pivoted to the following:
internal fun ScheduledAdItem.adItemStatusAsFlow() = callbackFlow {
send(adItemStatus)
val statusListener = AdItemStatusListener { _, status -> launch { send(status) }}
addStatusListener(statusListener)
awaitClose { removeStatusListener(statusListener) }
}
Which we then figured that it might not guarantee the order of events if run on a coroutine dispatcher with multiple threads.
What would be the recommended way to implement a cancellation cooperative callback flow with guaranteed event order?
Thanks in advance!kevin.cianfarini
10/22/2025, 1:38 PMChannel.send is fair, meaning that the first coroutine to invoke send will be the first to actually send the element, regardless of if its suspended or not. https://kotlinlang.org/docs/channels.html#channels-are-fairkevin.cianfarini
10/22/2025, 1:38 PMLukas K-G
10/22/2025, 1:40 PMlaunch itself? wouldn't this suspend and potentially be not fair on a say default coroutine dispatcher?Lukas K-G
10/22/2025, 1:41 PMkevin.cianfarini
10/22/2025, 1:42 PMlaunch(start=UNDISPATCHED) to ensure the same fairness guarantees as the channel offers.Lukas K-G
10/22/2025, 1:45 PMkevin.cianfarini
10/22/2025, 1:48 PMephemient
10/22/2025, 1:54 PMtrySendBlocking in a launch anyway? if you're in a suspend scope, just use trySend or send itselfephemient
10/22/2025, 1:55 PMlaunch depends on the dispatcher and doesn't guarantee fairness in generalLukas K-G
10/22/2025, 1:55 PMtrySendBlocking is not in a launch, only the send is.Lukas K-G
10/22/2025, 1:56 PMkevin.cianfarini
10/22/2025, 1:56 PMAddItemStatusListener is synchronouskevin.cianfarini
10/22/2025, 1:56 PMLukas K-G
10/22/2025, 1:56 PMAddItemStatusListener is synchronous, sorry for the missing information.ephemient
10/22/2025, 1:56 PMephemient
10/22/2025, 1:58 PMUndiapatched dispatcher does start running the coroutine in the caller, so it should do the trick for this caseephemient
10/22/2025, 2:00 PMLukas K-G
10/22/2025, 2:03 PMlaunch would no longer suspend but the send would still, so should be good?kevin.cianfarini
10/22/2025, 2:04 PMLukas K-G
10/22/2025, 2:05 PMVilgot Fredenberg
10/22/2025, 2:09 PMthedispatcher does start running the coroutine in the caller, so it should do the trick for this caseUndiapatched
however it also means the coroutine will resume in the context of wherever resumed it, which could be some random threadBut
start = CoroutineStart.UNDISPATCHED should work without that caveat?ephemient
10/22/2025, 2:10 PMephemient
10/22/2025, 2:11 PMephemient
10/22/2025, 2:12 PMstart= in the reply and then forgot the tiny difference in nameephemient
10/22/2025, 2:12 PMVilgot Fredenberg
10/22/2025, 2:12 PMfranztesca
10/22/2025, 4:33 PMtrySend (not blocked!) to send the event to the callbackFlow and make the underlying channel buffered (UNLIMITED!) with the .buffer operator after callbackFlowLukas K-G
10/23/2025, 5:34 AMyield after collection of the flow to ensure we don't process any values in case the coroutine was cancelled.Lukas K-G
10/23/2025, 5:49 AMUNLIMITED though cause we know that the number of callbacks is << 64 which is the default buffer size)