https://kotlinlang.org logo
#coroutines
Title
# coroutines
t

Thomas

06/07/2019, 12:01 PM
Could anyone explain to me why this code throws an exception (on Android)? The
collect
and
cancel
functions are both called on the main thread. I tried this with both version 1.2.1 and 1.3.0-M1
Copy code
var cancelled = false
val job = GlobalScope.launch(Dispatchers.Main) {
    val flow = flow {
        while (true) {
            emit(Unit)
        }
    }
    flow.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
        .collect {
            if (cancelled) { // main thread
                throw IllegalStateException()
            }
        }
}

GlobalScope.launch(Dispatchers.Main) {
    delay(1000)
    job.cancel() // main thread
    cancelled = true
}
Here’s a sample project if you want to try it out
a

Allan Wang

06/07/2019, 12:12 PM
Doesn’t throw an error when using default dispatcher. What exception are you getting on Android?
t

Thomas

06/07/2019, 12:19 PM
@Allan Wang I’m getting the IllegalStateException I put in the sample code.
a

Allan Wang

06/07/2019, 12:21 PM
I haven’t used flow but I’d be weary of using your own booleans to check for cancellations. I also don’t think a cancellation request is immediate. It could be that your collect action executes before the job is cancelled but after you’ve toggled your boolean
t

Thomas

06/07/2019, 12:21 PM
@Allan Wang You are right that it works correctly with the default dispatcher. It crashes if you use the main dispatcher or a single thread context. Could you add the following code and replace the default dispatcher with
singleThread
in your test?
Copy code
val singleThread = newSingleThreadContext("single thread")
a

Allan Wang

06/07/2019, 12:24 PM
What about
Copy code
val singleThread = newSingleThreadContext("single thread")
var cancelled = false
val job = GlobalScope.launch(singleThread) {
    val flow = flow {
        while (true) {
            emit(Unit)
        }
    }
    flow.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
        .collect { if (!isActive) throw java.lang.IllegalStateException() }

}

GlobalScope.launch(singleThread) {
    delay(1000)
    job.cancel() // main thread
    cancelled = true
}
? Using
singleThread
in unit tests still doesn’t crash for me
(Change is
!isActive
)
t

Thomas

06/07/2019, 12:26 PM
Still crashes for me when running the app on a real device.
It could be that your collect action executes before the job is cancelled but after you’ve toggled your boolean
Then how should I cancel it in the onDestroy method and be 100% sure it does not execute anymore? I am getting null pointer exceptions in my project because the view is set to null when the job is cancelled. If you replace the` boolean` with a
View
object, how should I solve this then?
a

Allan Wang

06/07/2019, 12:26 PM
Can you add some message to the exception and ensure it’s actually from flow?
t

Thomas

06/07/2019, 12:29 PM
@Allan Wang Just did that and I got the exception with the message in the logcat. (https://gist.github.com/Thomas-Vos/cd63c738a66da9cdec3e34045e1ab830)
a

Allan Wang

06/07/2019, 12:33 PM
What if instead, you only emit while
isActive
, and avoid the check in your collector?
instead of always emitting
t

Thomas

06/07/2019, 12:37 PM
@Allan Wang Do you mean like this:
Copy code
val job = GlobalScope.launch(Dispatchers.Main) {
    val flow = flow {
        while (isActive) {
            emit(Unit)
        }
    }
    flow.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
        .collect {
            if (!isActive) {
                throw IllegalStateException("This exception is thrown from the sample code.")
            }
        }
}

GlobalScope.launch(Dispatchers.Main) {
    delay(1000)
    job.cancel() // main thread
}
because this code still crashes when running on my device.
a

Allan Wang

06/07/2019, 12:42 PM
At this point I don’t think
if (!isActive)
is needed. I guess there could still be a case where you emit before cancellation and collect afterwards. I wonder if it’s best to just ignore values at that point (instead of throwing an exception)
There’s also
flowWith
, which affects context downstream. Wouldn’t passing the main scope context (containing a job) allow it to cancel once you cancel the job?
t

Thomas

06/07/2019, 12:54 PM
I wonder if it’s best to just ignore values at that point
@Allan Wang That could be a way to work around this, but I still don’t get why this is even possible. This is different behaviour than RxJava disposables. I would think that cancelling a job in a single thread (main) should make sure that items cannot be collected anymore on that same thread.
a

Allan Wang

06/07/2019, 1:16 PM
Just curious, but are you getting the same crashes from unit tests? My initial code was wrong:
Copy code
@Test
fun a() {
    val singleThread = newSingleThreadContext("single thread")
    val job = GlobalScope.launch(singleThread) {
        val flow = flow {
            while (true) {
                println("Emit ${Thread.currentThread().name}")
                emit(Unit)
            }
        }
        flow.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
            .collect {
                println(Thread.currentThread().name)
                if (!isActive) throw java.lang.IllegalStateException()
            }

    }
    runBlocking {
        GlobalScope.launch(singleThread) {
            delay(100)
            job.cancel() // main thread
        }.join()
    }
}
Your comment makes sense to me. Cancelling from the single thread should stop subsequent runnables on the same thread
t

Thomas

06/07/2019, 1:26 PM
I just tried your unit test and it does not crash in the test. However, if I put that code in an Android app it does crash.
@Allan Wang What about this? Try this unit test:
Copy code
@Test
fun a() {
    // repeat code two times
    repeat(2) {
        val singleThread = newSingleThreadContext("single thread: $it")
        val job = GlobalScope.launch(singleThread) {
            val flow = flow {
                while (true) {
                    println("Emit ${Thread.currentThread().name}")
                    emit(Unit)
                }
            }
            flow.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
                .collect {
                    println(Thread.currentThread().name)
                    if (!isActive) throw java.lang.IllegalStateException()
                }

        }
        runBlocking {
            GlobalScope.launch(singleThread) {
                delay(100)
                job.cancel() // main thread
            }.join()
        }
    }
}
It does not crash. Now comment out the two
println
calls and it does crash. Do you know why this happens?
a

Allan Wang

06/07/2019, 1:36 PM
It doesn’t seem related with
println
. For me it crashed 2/10 times, one with
println
and one without. I guess safest is to just check for an active coroutine and ignore otherwise
t

Thomas

06/07/2019, 2:36 PM
@Allan Wang that’s probably what I am going to do. Do you think this behaviour is a bug? I don’t think it should be needed to add this check if all happens on the same thread.
a

Allan Wang

06/07/2019, 4:11 PM
@gildor? Someone else probably has more knowledge on this than I do.
t

Thomas

06/10/2019, 8:23 AM
@Allan Wang I created an issue on GitHub so someone with more knowledge can tell what is going on: https://github.com/Kotlin/kotlinx.coroutines/issues/1265
2 Views