SrSouza
05/12/2020, 12:49 AMBroadcastChannel.openSubscriber().consumeAsFlow() and I use at the flow launchIn(Scope) that's return a Job, but I think that when a cancel this job, the Channel is not being canceled, when I use something like Flow.first , the channel is cancelled. How can I cancel a Flow that canceling it will cancel the Channel too?octylFractal
05/12/2020, 12:51 AMoctylFractal
05/12/2020, 12:52 AMoctylFractal
05/12/2020, 12:53 AMBroadcastChannel.asFlow() is probably what you should be doing if you want that semanticoctylFractal
05/12/2020, 12:54 AMSrSouza
05/12/2020, 12:58 AMval channel = ConflatedBroadcastChannel<Int>()
GlobalScope.launch {
for(i in 1..20) {
channel.send(i)
delay(500)
}
}
val job = channel.asFlow()
.onEach { println(it) }
.launchIn(GlobalScope)
delay(5000)
job.cancel(CancellationException())
delay(5000)octylFractal
05/12/2020, 12:58 AMchannel itselfSrSouza
05/12/2020, 1:01 AMc channel was not getting closed for receiving, but, when I use something like first{} , he get closed.
val c = channel.openSubscription()
val job = c.consumeAsFlow()
.onEach { println(it) }
.launchIn(GlobalScope)
delay(5000)
println(c.isClosedForReceive)
job.cancel(CancellationException())
println(c.isClosedForReceive)
delay(5000)octylFractal
05/12/2020, 1:01 AMSrSouza
05/12/2020, 1:02 AMjob.cancelSrSouza
05/12/2020, 1:05 AMBroadcastChannel.asFlow() when using job.cancel() cancel the subscription and the openSubscription().consumeAsFlow() don't?octylFractal
05/12/2020, 1:06 AMoctylFractal
05/12/2020, 1:08 AMoctylFractal
05/12/2020, 1:08 AMval job = c.consumeAsFlow()
.onEach { println(it) }
.launchIn(GlobalScope)
delay(5000)
println(c.isClosedForReceive)
job.cancel(CancellationException())
println(c.isClosedForReceive)
delay(1000)
println(c.isClosedForReceive)
then the last one prints trueoctylFractal
05/12/2020, 1:09 AMjob.cancel doesn't wait for the other end to receive the messageoctylFractal
05/12/2020, 1:11 AMSrSouza
05/12/2020, 1:14 AMSrSouza
05/12/2020, 1:15 AMval c = channel.openSubscription()
val job = c.consumeAsFlow()
.onEach { println(it) }
.launchIn(GlobalScope)
delay(5000)
println(c.isClosedForReceive)
job.cancel(CancellationException())
println(c.isClosedForReceive)
delay(2000)
println(c.isClosedForReceive)
delay(2000)
c.consumeAsFlow().onEach { println(it) }.launchIn(GlobalScope)
delay(1000)
println(c.isClosedForReceive)SrSouza
05/12/2020, 1:15 AM1
2
3
4
5
6
7
8
9
10
false
false
false
false
11
trueoctylFractal
05/12/2020, 1:24 AMoctylFractal
05/12/2020, 1:25 AMoctylFractal
05/12/2020, 1:27 AMoctylFractal
05/12/2020, 1:27 AM