https://kotlinlang.org logo
#coroutines
Title
# coroutines
r

Robert von Massow

09/07/2020, 9:34 AM
so we’re experiencing some problems with flows and coroutines. general setup: we have a grpc stream that provides messages as flow. inside the collect we launch a new coroutine to do some heavy lifting. this is of course done in a different coroutine context (using the default dispatcher, we used a custom dispatcher before and had the same problem so we went back to the default). what we see now is that after some time, the inner launch is just not scheduled/executed, it’s not blocking, though, the launch actually returns and thus the flow is also consumed properly. we struggle hard reproducing or even just monitoring this issue. here is somewhat example code (our input arrives a lot slower, though). we at some point still see logs like “received” but no “processing” anymore. probably we are doing something really wrong, has anyone seen this before?
Copy code
runBlocking {
    val coroutineScope = CoroutineScope(EmptyCoroutineContext)
    val flowScope = CoroutineScope(EmptyCoroutineContext)
    flowScope.launch {
        val flow = flowOf(1)
        while (isActive)
            flow.collect { i ->
                println("received $i")
                coroutineScope.launch {
                    // Offload some hard work
                    println("processing $i")
                    Thread.sleep(1000)
                    println("done processing $i")
                }
            }
    }

    // just added so it doesn't terminate right away
    delay(10000)
}
z

Zach Klippenstein (he/him) [MOD]

09/07/2020, 7:45 PM
Is the inner scope getting cancelled somehow, which would make the launches noops?
r

Robert von Massow

09/08/2020, 7:09 AM
only on shutdown, so essentially no. what we did now is moved the inner scope to the unconfined dispatcher and added a delay in the beginning to allow for rescheduling. it looks good so far but we lose one ms every time for this workaround and also the code is somewhat not really understandable. frankly, we already thought about migrating the inner coroutine to threads 😕
z

Zach Klippenstein (he/him) [MOD]

09/08/2020, 8:02 AM
Does the code you posted actually repro the issue?
r

Robert von Massow

09/08/2020, 8:03 AM
unfortunately not, we were not able to reproduce the actual issue. this is more a structured example, sorry
z

Zach Klippenstein (he/him) [MOD]

09/08/2020, 8:25 AM
Hmm, nothing comes to mind without seeing the actual code.
r

Robert von Massow

09/08/2020, 9:11 AM
thanks for taking a look anyways, unfortunately the code is a lot more complicated and possibly half of it is under NDA or something 😞 i now added a metric for the coroutine scope isActive. we are not explicitly doing anything there, but i don’t trust myself anymore 😉 also uncaught exceptions could lead to that, too.
👍 1
2 Views