ReceiveChannel#consumeAsFlow() never completes? I...
# flow
a
ReceiveChannel#consumeAsFlow() never completes? I wrote a short program to test this
Copy code
fun main() = runBlocking<Unit> {
    val test = Test()

    launch(<http://Dispatchers.IO|Dispatchers.IO>) {
        println("Receive 1")
        test.getResponseChannel()
            .collect { println("Collect 1 $it") }
    }

    launch(<http://Dispatchers.IO|Dispatchers.IO>) {
        println("Receive 2")
        test.getResponseChannel()
            .collect {
                println("Collect 2 $it")
            }
    }

    launch(<http://Dispatchers.IO|Dispatchers.IO>) {
        println("Receive 3")
        test.getResponseChannel()
            .collect {
                println("Collect 3 $it")
            }
    }

    println("Sending A")
    test.responseChannel.send("A")
    println("Sending B")
    test.responseChannel.send("B")
    println("Sending C")
    test.responseChannel.send("C")
}

class Test {
    val responseChannel = BroadcastChannel<String>(BUFFERED)

    fun getResponseChannel(): Flow<String> {
        val channel = responseChannel.openSubscription()
        return channel
            .consumeAsFlow()
            .onCompletion {
                println("Cancelled")
                channel.cancel()
            }
    }
}
Here you will see that
onCompletion()
and hence “Cancelled” never prints. My requirement is to be able to add and detach observers to a broadcast channel. It that achievable?
The child channels/flows only complete when upstream closes or the channel closes. Who will close the channel/flow?
a
Broadcast channel is meant to live throughout the process lifecycle in my use case. I just want to subscribe and unsubscribe on the fly
is that even possible?
d
Yes, but then you have to close the flow or the underlying (child)channel
You only get the onCompleted if the underlying channel gets closed. The flow stays ‚active‘ until this happens
a
Right, the only way that I see now is to send
ReceiveChannel
to my upper layers all the way to ViewModels and call
consumeAsFlow
there. Save this channel and call
cancel
on
collect
.
otherwise I am not sure how and when to close the flow without
OnCompletion
d
What’s the lifetime scope of this consumer/flow? If you have a coroutinescope that gets cancelled then it will close the flow as well (if you collect from this scope), right?
a
its defined by
viewModelScope
if you are familiar with Android AAC.
d
I‘m not, but if this is a coroutine scope then just make sure to call flow‘s collect from this scope. Then structured concurrency takes care of it.
a
This scope doesn’t get destroyed actually.. but I see what you mean. I can probably create a child scope and collect flow there.
d
Yes. You need something that knows the lifetime of this flow
1
a
yeah cool. I will try that and let you know. To be honest my basics around coroutines and flow are not as strong as of now. Sorry for trouble!
okay so creating a child coroutine and cancelling is explicitly in
collect
does the trick for me. Its a bit non obvious and convoluted right now but works till I get something better. Thanks for going deep in my problem and all your help.