Hi folks, I have a Flow that was created using `Br...
# coroutines
s
Hi folks, I have a Flow that was created using
BroadcastChannel.openSubscriber().consumeAsFlow()
and I use at the flow
launchIn(Scope)
that's return a Job, but I think that when a cancel this job, the Channel is not being canceled, when I use something like
Flow.first
, the channel is cancelled. How can I cancel a Flow that canceling it will cancel the Channel too?
o
you'll have to manually glue that together, because by default I think it cancels the subscription which is reasonable
ah, no
BroadcastChannel.asFlow()
is probably what you should be doing if you want that semantic
hmm, no, that also is "3) If the flow consumer fails with an exception, subscription is cancelled."
s
So, something like this should do the unsubscription, right?
Copy code
val channel = ConflatedBroadcastChannel<Int>()

GlobalScope.launch {
    for(i in 1..20) {
        channel.send(i)

        delay(500)
    }
}

val job = channel.asFlow()
    .onEach { println(it) }
    .launchIn(GlobalScope)

delay(5000)

job.cancel(CancellationException())

delay(5000)
o
it will cancel the subscription, but not the
channel
itself
s
I was testing something like this, and the
c
channel was not getting closed for receiving, but, when I use something like
first{}
, he get closed.
Copy code
val c = channel.openSubscription()
val job = c.consumeAsFlow()
    .onEach { println(it) }
    .launchIn(GlobalScope)

delay(5000)

println(c.isClosedForReceive)
job.cancel(CancellationException())
println(c.isClosedForReceive)

delay(5000)
o
you would need to check for cancellation in the flow -- it does not check for it itself
s
I'm asking this for not have a Channel leak here. Because I test it, and the Flow do not run a after the
job.cancel
About this snippet above, the
BroadcastChannel.asFlow()
when using
job.cancel()
cancel the subscription and the
openSubscription().consumeAsFlow()
don't?
o
sorry to be misleading, I think they're nearly the same
yea, okay, I see what's happening -- you're checking it way too fast
if I do:
Copy code
val job = c.consumeAsFlow()
        .onEach { println(it) }
        .launchIn(GlobalScope)
    delay(5000)
    println(c.isClosedForReceive)
    job.cancel(CancellationException())
    println(c.isClosedForReceive)
    delay(1000)
    println(c.isClosedForReceive)
then the last one prints
true
job.cancel
doesn't wait for the other end to receive the message
it seems to be a coinflip on when it becomes visible, probably due to memory semantics of the JVM
s
I think it is unsubscribing but the Channel is not close yet be cause the Flow get closed before consume the last value from the Channel
With this snippet
Copy code
val c = channel.openSubscription()
val job = c.consumeAsFlow()
    .onEach { println(it) }
    .launchIn(GlobalScope)

delay(5000)

println(c.isClosedForReceive)
job.cancel(CancellationException())
println(c.isClosedForReceive)

delay(2000)
println(c.isClosedForReceive)

delay(2000)
c.consumeAsFlow().onEach { println(it) }.launchIn(GlobalScope)

delay(1000)
println(c.isClosedForReceive)
I got this result
Copy code
1
2
3
4
5
6
7
8
9
10
false
false
false
false
11
true
o
yes, that seems right
kind of weird that canceling the receive end doesn't drop all the items
seems like this might be a bug? I'm honestly not sure