Seth Madison
03/24/2023, 9:41 PMtransform
. I have a flow along the lines of`sharedFlow -> transformWhile -> map` , and I see the map
run even when the downstream collector has completed. Is this expected?
Example in the thread.fun testFlo() {
val coroutineScope = CoroutineScope(Dispatchers.Default)
val sharedFlo = MutableSharedFlow<Int>(0, Int.MAX_VALUE, BufferOverflow.SUSPEND)
coroutineScope.launch {
var i = 0
repeat(100) {
sharedFlo.emit(i)
i +=1
delay(10)
}
}
var mapCount = 0
val mappedFlo = sharedFlo.transformWhile {
emit(it)
true
}.map {
mapCount += 1
if (mapCount > 2) throw RuntimeException("FAIL")
it
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
runBlocking {
mappedFlo
.take(2)
.catch { println("caught error $it") }
.collect {}
}
}
ephemient
03/24/2023, 9:50 PMSeth Madison
03/24/2023, 10:27 PMephemient
03/24/2023, 10:29 PMflowOn
due to the channel buffer) the upstream emits asynchronously to the downstream collectorSeth Madison
03/24/2023, 10:34 PMhotFlow.map { if (it.someVal) throw RuntimeException() }.collect { }
Is there any way to prevent that exception from getting thrown the collect has completed?ephemient
03/24/2023, 10:36 PMSharingStarted
but it's asynchronous so you can't use it to control thisSeth Madison
03/24/2023, 10:39 PMephemient
03/24/2023, 10:40 PMSeth Madison
03/24/2023, 10:47 PMscope a = CoroutineScope()
scope b = CoroutineScope()
hotFlow = flow {
}.shareIn(a)
b.launch {
hotFlow.map { if (it.someVal) throw RuntimeException() }.collect {}
}
a
or b
?ephemient
03/24/2023, 11:19 PMSeth Madison
03/24/2023, 11:21 PM