https://kotlinlang.org logo
#coroutines
Title
# coroutines
p

Paul Woitaschek

05/06/2019, 2:34 PM
I have a situation where collect emits in the cancelled state:
Copy code
launch {
      state.collect {
        Timber.i("isActive=$isActive, job=$Job")
        render(it)
      }
    }
It prints
isActive=false
. How is that possible?
@streetsofboston
Timber.i("${this@launch===this}")
prints "true"
s

streetsofboston

05/06/2019, 2:39 PM
Yeah… I realized that just after I answered your question… and that’s why i deleted my answer 🙂
p

Paul Woitaschek

05/06/2019, 2:40 PM
I had the same with recieve channel a while ago
I was thinking about it and I know what’s the problem (which is unobvious and I will be happy if you file an issue about that):
receive
from a channel (or
consumeEach
in your case) is similar to `launch(start = ATOMIC)`: if coroutine is cancelled during
receive
call which is ready to return an element, coroutine is resumed normally, though it is already has status “cancelled”. You should manually check cancellation status 😞To answer the question “why the hell is it behaving like that”, consider the following situation: one coroutine sends element
x
to the channel, another one
receive
it and immediately gets cancelled. In that case method
receive
will throw
CancellationException
, but element
x
is already consumed. Thus element
x
is lost and cannot be recovered (and what if
x
is socket or file which should be closed by the consumer?). To avoid that,
receive
with an element always resumes normally, but coroutine is already cancelled, so any suspension point will throw
CE
. (edited)
This is what @Vsevolod Tolstopyatov [JB] repeated, so does it apply here as well?
And does this mean that on each and every
collect
I have to call
ensureActive()
first before proceeding?
s

streetsofboston

05/06/2019, 2:52 PM
The
Flow
api is still quite experimental. This could be either by design or it could be an issue. 🤷‍♂️ I hope it is not necessary to do ‘active’ checks….
p

Paul Woitaschek

05/06/2019, 2:53 PM
@Vsevolod Tolstopyatov [JB] Could you provide some insights here?
Copy code
kotlin
@FlowPreview
fun main() {
  val flow = flow {
    while (true) {
      emit(Unit)
    }
  }

  val job = GlobalScope.launch {
    flow.collect {
      if (!isActive) {
        throw IllegalStateException("DOH")
      }
    }
  }

  runBlocking {
    job.cancelAndJoin()
  }
}
This throws
🙈
s

streetsofboston

05/06/2019, 3:03 PM
Just to try out: What happens if you do:
Copy code
flow.collect {
      coroutineScope {
        if (!isActive) {
          throw IllegalStateException("DOH")
        }
      }
    }
p

Paul Woitaschek

05/06/2019, 3:04 PM
Copy code
@FlowPreview
fun main() {
  val flow = flow {
    while (true) {
      emit(Unit)
    }
  }

  val job = GlobalScope.launch {
    flow.collect {
      coroutineScope {
        if (!isActive) {
          throw IllegalStateException("DOH")
        }
      }
    }
  }

  runBlocking {
    job.cancelAndJoin()
  }
}
Throws
Only thing that helps is calling
ensureActive
as the first statement of collect
@jw Do you understand what's going on here?
v

Vsevolod Tolstopyatov [JB]

05/06/2019, 3:34 PM
isActive
may return
false
because cancellation is cooperative. E.g. if you do something like
Copy code
val job = launch {
    while (isActive) { /* do nothing */ }
    println("I was concurrently cancelled") // (1)
}

delay(100)
job.cancelAndJoin()
do you expect (1) to be printed? The flow sample doesn’t really differ from this simplified snippet
also note that
isActive
states in its doc:
Returns true when this job is active -- it was already started and has not completed nor was cancelled yet
p

Paul Woitaschek

05/07/2019, 1:56 PM
@Vsevolod Tolstopyatov [JB] Could you respond in the ticket on the latest comments?
3 Views