Nino
05/05/2022, 10:24 AMyield()
but I'm lost. I'd expect this code to print [1, 2, 3] \n [1, 2, 3, 4] \n End
but it doesn't... Why ?
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val mutableStateFlow = MutableStateFlow(emptyList<Int>())
val collectJob = launch {
combine(
flowOf(listOf(1, 2, 3)),
mutableStateFlow
) { list1, list2 ->
list1 + list2
}.collect { list ->
println(list)
}
}
yield()
mutableStateFlow.value = listOf(4)
yield()
collectJob.cancel()
println("End")
}
Playground : https://pl.kotl.in/EpSQVK30I
PS : with delay(100)
instead of yield()
, it works. With lower values, I get random results.Nick Allen
05/05/2022, 4:54 PMcombine
has to collect from those two Flows
concurrently which means it's launching coroutines inside and those coroutines are also contending for the thread.
yield
gives up the thread, which allows other coroutines to run. There's no rule about which other coroutines actually get to run, nor any rule that all other coroutines will be scheduled to make progress before the one that called yield
resumes.
It appears here that the first call to yield
gives up the thread allowing the explicitly launched coroutine to run until collect
suspends waiting on values to be emitted from the result of combine
. The the top level coroutine resumes and the second yield
gives up the thread allowing the internal implementation of combine
to receive values from it's arguments and put those into a Channel
. Then the Job
is cancelled before your explicit collect
call is able to read anything from the Channel
.
To figure out what was happening when I updated collectJob and played with adding/removing yield calls:
val collectJob = launch {
println("inner job")
combine(
flowOf(listOf(1, 2, 3)).onEach { println("first flow") },
mutableStateFlow.onEach { println("second flow") }
) { list1, list2 ->
list1 + list2
}.collect { list ->
println(list)
}
}
julian
05/05/2022, 5:08 PMprintlln
in the lambda argument to combine
and it never ran, combine
and saw that, as @Nick Allen says, the emissions are transformed after they're pulled from the channel, not before they're put on the channel, as I'd thought.julian
05/05/2022, 5:19 PMyield
after the second one. Now the result is as expected:
[1, 2, 3, 4]
End
yield
is the one that actually allows collect
to become unsuspended. Whereas the second yield
un-suspends machinery with combine
.julian
05/05/2022, 6:53 PMyield
allows these coroutines that collect the two flows to run. And the third yield
allows the outer coroutine within which these are launched to proceed past the suspension point here, where the channel is being read. This reading was suspended because, at the time it was executed, the two flow collection coroutines hadn't run yet, so the channel was empty, with nothing to read.Nino
05/07/2022, 9:58 AMyield()
is useless with Flows because depending of the flow used (or their implementation which might change with future versions), it might or might not work the same ?julian
05/07/2022, 4:21 PMcombine
, the kotlinx.coroutines
library uses yield
to good effect, allowing combined flows to take turns emitting, i.e. to try be fair.Nick Allen
05/07/2022, 5:49 PMyield
is a false solution in general for affecting the order in which code runs. This isn't unique to Flow
. Any code relying on yield is at the mercy of the implementation details of the CoroutineDispatcher
being used. Methods like Job.join
, Deferred.wait
, Channel.receive
allow you to explicitly and robustly affect the ordering of coroutine code.julian
05/07/2022, 6:44 PMAny code relying on yield is at the mercy of the implementation details of thebeing used.CoroutineDispatcher
Nick Allen
05/07/2022, 7:10 PMyield
basically just hands control of the thread back to the CoroutineDispatcher
, at which point it's up to the CoroutineDispatcher
to choose what to run next. There's no guarantee that the CoroutineDispatcher
won't just immediately schedule the coroutine that yielded sometimes.