I think I am misunderstanding the semantics of `tr...
# coroutines
s
I think I am misunderstanding the semantics of
transform
. I have a flow along the lines of`sharedFlow -> transformWhile -> map` , and I see the
map
run even when the downstream collector has completed. Is this expected? Example in the thread.
Copy code
fun testFlo() {
    val coroutineScope = CoroutineScope(Dispatchers.Default)
    val sharedFlo = MutableSharedFlow<Int>(0, Int.MAX_VALUE, BufferOverflow.SUSPEND)

    coroutineScope.launch {
        var i = 0
        repeat(100) {
            sharedFlo.emit(i)
            i +=1
            delay(10)
        }
    }
	
    var mapCount = 0
    val mappedFlo = sharedFlo.transformWhile { 
        emit(it)
        true
    }.map {
        mapCount += 1
        if (mapCount > 2) throw RuntimeException("FAIL")
        it
    }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)

    runBlocking {
        mappedFlo
            .take(2)
            .catch { println("caught error $it") }
            .collect {}
    }
}
In this example, the catch will get triggered every so often. Worse yet, I think there are cases where the exception is thrown and the catch doesn’t kick in.
e
there is a difference in behavior between cold streams (values produced when demanded) and hot streams (values pushed to consumers)
this simplified example will consistently trigger "hot" but not "cold": https://pl.kotl.in/BmqYuv-VK
s
Right, I guess I just would’ve assumed that even for a hot flow, the value would never end up making it to the map. Like, I would assume that once the subscribing flow is cancelled, everything upstream from it UP TO the hot flow would get cancelled atomically.
e
in the hot case (and also to a limited extent in
flowOn
due to the channel buffer) the upstream emits asynchronously to the downstream collector
s
Got it. And I suppose there’s no way to know if the downstream is still subscribed? To take a simple example:
Copy code
hotFlow.map { if (it.someVal) throw RuntimeException() }.collect { }
Is there any way to prevent that exception from getting thrown the collect has completed?
Because right now, short of a CoroutineExceptionHandler on the scope of the hotFlow I can’t see a way to prevent that from crashing my app.
Right?
e
you can get some feedback with
SharingStarted
but it's asynchronous so you can't use it to control this
s
The CEH would need to be on the scope of the collect, right? Or would it be on the scope of the hotflow? Intuitively I would think it would be on the collect, since that’s what is running the map, but since the collect already completed, I’m confused.
e
CEH doesn't do anything in most scopes because they propagate cancellation
GlobalScope and SupervisorScope don't propagate cancellation so you could add the handler to them
(in which case your app will not crash regardless of whether you install a handler or not)
s
Got it. Makes sense.
erm, actually..
I guess the thing I’m confused about is which scope the crash ends up propagating to.
Like:
Copy code
scope a = CoroutineScope()
scope b = CoroutineScope()

hotFlow = flow {
}.shareIn(a)

b.launch {
  hotFlow.map { if (it.someVal) throw RuntimeException() }.collect {}
}
In that example, if the exception triggers after the collect completes, does the exception go to
a
or
b
?
e
b
s
I guess that makes sense