Paul Woitaschek
05/06/2019, 2:34 PMlaunch {
      state.collect {
        Timber.i("isActive=$isActive, job=$Job")
        render(it)
      }
    }isActive=falsePaul Woitaschek
05/06/2019, 2:39 PMTimber.i("${this@launch===this}")streetsofboston
05/06/2019, 2:39 PMPaul Woitaschek
05/06/2019, 2:40 PMPaul Woitaschek
05/06/2019, 2:40 PMI was thinking about it and I know what’s the problem (which is unobvious and I will be happy if you file an issue about that):from a channel (orreceivein your case) is similar to `launch(start = ATOMIC)`: if coroutine is cancelled duringconsumeEachcall which is ready to return an element, coroutine is resumed normally, though it is already has status “cancelled”. You should manually check cancellation status 😞To answer the question “why the hell is it behaving like that”, consider the following situation: one coroutine sends elementreceiveto the channel, another onexit and immediately gets cancelled. In that case methodreceivewill throwreceive, but elementCancellationExceptionis already consumed. Thus elementxis lost and cannot be recovered (and what ifxis socket or file which should be closed by the consumer?). To avoid that,xwith an element always resumes normally, but coroutine is already cancelled, so any suspension point will throwreceive. (edited)CE
Paul Woitaschek
05/06/2019, 2:41 PMPaul Woitaschek
05/06/2019, 2:42 PMcollectensureActive()streetsofboston
05/06/2019, 2:52 PMFlowPaul Woitaschek
05/06/2019, 2:53 PMPaul Woitaschek
05/06/2019, 3:00 PMkotlin
@FlowPreview
fun main() {
  val flow = flow {
    while (true) {
      emit(Unit)
    }
  }
  val job = GlobalScope.launch {
    flow.collect {
      if (!isActive) {
        throw IllegalStateException("DOH")
      }
    }
  }
  runBlocking {
    job.cancelAndJoin()
  }
}Paul Woitaschek
05/06/2019, 3:00 PMPaul Woitaschek
05/06/2019, 3:00 PMstreetsofboston
05/06/2019, 3:03 PMflow.collect {
      coroutineScope {
        if (!isActive) {
          throw IllegalStateException("DOH")
        }
      }
    }Paul Woitaschek
05/06/2019, 3:04 PM@FlowPreview
fun main() {
  val flow = flow {
    while (true) {
      emit(Unit)
    }
  }
  val job = GlobalScope.launch {
    flow.collect {
      coroutineScope {
        if (!isActive) {
          throw IllegalStateException("DOH")
        }
      }
    }
  }
  runBlocking {
    job.cancelAndJoin()
  }
}Paul Woitaschek
05/06/2019, 3:05 PMPaul Woitaschek
05/06/2019, 3:05 PMensureActivePaul Woitaschek
05/06/2019, 3:09 PMPaul Woitaschek
05/06/2019, 3:13 PMVsevolod Tolstopyatov [JB]
05/06/2019, 3:34 PMisActivefalseval job = launch {
    while (isActive) { /* do nothing */ }
    println("I was concurrently cancelled") // (1)
}
delay(100)
job.cancelAndJoin()Vsevolod Tolstopyatov [JB]
05/06/2019, 3:35 PMisActiveReturns true when this job is active -- it was already started and has not completed nor was cancelled yetPaul Woitaschek
05/07/2019, 1:56 PM