Marc Knaup
08/07/2021, 10:16 PMsuspend fun main() {
flow {
flowOf(1).collectLatest { emit(it) }
}.collect { println(it) }
}
IllegalStateException: Flow invariant is violated
ephemient
08/08/2021, 3:01 AMjava.lang.IllegalStateException: Flow invariant is violated:
Emission from another coroutine is detected.
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
suspend fun main() {
channelFlow {
flowOf(1).collectLatest { send(it) }
}.collect { println(it) }
}
works as expectedMarc Knaup
08/08/2021, 10:22 PMcollectLatest
.
It’s not clear that collectLatest
launches multiple coroutines. And it doesn’t do anything concurrently. The explanation at the function that throws the exception only mentions launch
as an example.ephemient
08/09/2021, 1:29 AMFlow.*Latest()
methods, but it is effectively equivalent to
suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
coroutineScope {
fold(null) { previous: Job?, value ->
previous?.cancelAndJoin()
launch(start = CoroutineStart.UNDISPATCHED) {
action(value)
}
}
}
}
Marc Knaup
08/09/2021, 4:28 AM