Paul Woitaschek
05/06/2019, 2:34 PMlaunch {
state.collect {
Timber.i("isActive=$isActive, job=$Job")
render(it)
}
}
It prints isActive=false
. How is that possible?Paul Woitaschek
05/06/2019, 2:39 PMTimber.i("${this@launch===this}")
prints "true"streetsofboston
05/06/2019, 2:39 PMPaul Woitaschek
05/06/2019, 2:40 PMPaul Woitaschek
05/06/2019, 2:40 PMI was thinking about it and I know what’s the problem (which is unobvious and I will be happy if you file an issue about that):from a channel (orreceive
in your case) is similar to `launch(start = ATOMIC)`: if coroutine is cancelled duringconsumeEach
call which is ready to return an element, coroutine is resumed normally, though it is already has status “cancelled”. You should manually check cancellation status 😞To answer the question “why the hell is it behaving like that”, consider the following situation: one coroutine sends elementreceive
to the channel, another onex
it and immediately gets cancelled. In that case methodreceive
will throwreceive
, but elementCancellationException
is already consumed. Thus elementx
is lost and cannot be recovered (and what ifx
is socket or file which should be closed by the consumer?). To avoid that,x
with an element always resumes normally, but coroutine is already cancelled, so any suspension point will throwreceive
. (edited)CE
Paul Woitaschek
05/06/2019, 2:41 PMPaul Woitaschek
05/06/2019, 2:42 PMcollect
I have to call ensureActive()
first before proceeding?streetsofboston
05/06/2019, 2:52 PMFlow
api is still quite experimental. This could be either by design or it could be an issue. 🤷♂️
I hope it is not necessary to do ‘active’ checks….Paul Woitaschek
05/06/2019, 2:53 PMPaul Woitaschek
05/06/2019, 3:00 PMkotlin
@FlowPreview
fun main() {
val flow = flow {
while (true) {
emit(Unit)
}
}
val job = GlobalScope.launch {
flow.collect {
if (!isActive) {
throw IllegalStateException("DOH")
}
}
}
runBlocking {
job.cancelAndJoin()
}
}
Paul Woitaschek
05/06/2019, 3:00 PMPaul Woitaschek
05/06/2019, 3:00 PMstreetsofboston
05/06/2019, 3:03 PMflow.collect {
coroutineScope {
if (!isActive) {
throw IllegalStateException("DOH")
}
}
}
Paul Woitaschek
05/06/2019, 3:04 PM@FlowPreview
fun main() {
val flow = flow {
while (true) {
emit(Unit)
}
}
val job = GlobalScope.launch {
flow.collect {
coroutineScope {
if (!isActive) {
throw IllegalStateException("DOH")
}
}
}
}
runBlocking {
job.cancelAndJoin()
}
}
Paul Woitaschek
05/06/2019, 3:05 PMPaul Woitaschek
05/06/2019, 3:05 PMensureActive
as the first statement of collectPaul Woitaschek
05/06/2019, 3:09 PMPaul Woitaschek
05/06/2019, 3:13 PMVsevolod Tolstopyatov [JB]
05/06/2019, 3:34 PMisActive
may return false
because cancellation is cooperative.
E.g. if you do something like
val job = launch {
while (isActive) { /* do nothing */ }
println("I was concurrently cancelled") // (1)
}
delay(100)
job.cancelAndJoin()
do you expect (1) to be printed?
The flow sample doesn’t really differ from this simplified snippetVsevolod Tolstopyatov [JB]
05/06/2019, 3:35 PMisActive
states in its doc: Returns true when this job is active -- it was already started and has not completed nor was cancelled yet
Paul Woitaschek
05/07/2019, 1:56 PM