Paul Woitaschek
05/06/2019, 2:34 PMlaunch {
state.collect {
Timber.i("isActive=$isActive, job=$Job")
render(it)
}
}
It prints isActive=false. How is that possible?Paul Woitaschek
05/06/2019, 2:39 PMTimber.i("${this@launch===this}") prints "true"streetsofboston
05/06/2019, 2:39 PMPaul Woitaschek
05/06/2019, 2:40 PMPaul Woitaschek
05/06/2019, 2:40 PMI was thinking about it and I know what’s the problem (which is unobvious and I will be happy if you file an issue about that):from a channel (orreceivein your case) is similar to `launch(start = ATOMIC)`: if coroutine is cancelled duringconsumeEachcall which is ready to return an element, coroutine is resumed normally, though it is already has status “cancelled”. You should manually check cancellation status 😞To answer the question “why the hell is it behaving like that”, consider the following situation: one coroutine sends elementreceiveto the channel, another onexit and immediately gets cancelled. In that case methodreceivewill throwreceive, but elementCancellationExceptionis already consumed. Thus elementxis lost and cannot be recovered (and what ifxis socket or file which should be closed by the consumer?). To avoid that,xwith an element always resumes normally, but coroutine is already cancelled, so any suspension point will throwreceive. (edited)CE
Paul Woitaschek
05/06/2019, 2:41 PMPaul Woitaschek
05/06/2019, 2:42 PMcollect I have to call ensureActive() first before proceeding?streetsofboston
05/06/2019, 2:52 PMFlow api is still quite experimental. This could be either by design or it could be an issue. 🤷♂️
I hope it is not necessary to do ‘active’ checks….Paul Woitaschek
05/06/2019, 2:53 PMPaul Woitaschek
05/06/2019, 3:00 PMkotlin
@FlowPreview
fun main() {
val flow = flow {
while (true) {
emit(Unit)
}
}
val job = GlobalScope.launch {
flow.collect {
if (!isActive) {
throw IllegalStateException("DOH")
}
}
}
runBlocking {
job.cancelAndJoin()
}
}Paul Woitaschek
05/06/2019, 3:00 PMPaul Woitaschek
05/06/2019, 3:00 PMstreetsofboston
05/06/2019, 3:03 PMflow.collect {
coroutineScope {
if (!isActive) {
throw IllegalStateException("DOH")
}
}
}Paul Woitaschek
05/06/2019, 3:04 PM@FlowPreview
fun main() {
val flow = flow {
while (true) {
emit(Unit)
}
}
val job = GlobalScope.launch {
flow.collect {
coroutineScope {
if (!isActive) {
throw IllegalStateException("DOH")
}
}
}
}
runBlocking {
job.cancelAndJoin()
}
}Paul Woitaschek
05/06/2019, 3:05 PMPaul Woitaschek
05/06/2019, 3:05 PMensureActive as the first statement of collectPaul Woitaschek
05/06/2019, 3:09 PMPaul Woitaschek
05/06/2019, 3:13 PMVsevolod Tolstopyatov [JB]
05/06/2019, 3:34 PMisActive may return false because cancellation is cooperative.
E.g. if you do something like
val job = launch {
while (isActive) { /* do nothing */ }
println("I was concurrently cancelled") // (1)
}
delay(100)
job.cancelAndJoin()
do you expect (1) to be printed?
The flow sample doesn’t really differ from this simplified snippetVsevolod Tolstopyatov [JB]
05/06/2019, 3:35 PMisActive states in its doc: Returns true when this job is active -- it was already started and has not completed nor was cancelled yetPaul Woitaschek
05/07/2019, 1:56 PM