https://kotlinlang.org logo
#coroutines
Title
# coroutines
l

Lorenzo Testa

08/05/2019, 10:42 AM
I don't understand why this code throws
ClosedSendChannelException: Channel was closed
, can someone explain it to me?
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

@UseExperimental(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
    val scope = CoroutineScope(Job())
    val f = channelFlow {
        launch(scope.coroutineContext) {
            repeat(10) {
                delay(200L)
                send(it)
            }
        }
    }
    f.collect {
        println(it)
    }
    delay(3000L)
}
m

Marko Mitic

08/05/2019, 10:45 AM
I think
launch
ruins your day here
l

Lorenzo Testa

08/05/2019, 10:45 AM
yes, but in channelFlow documentation there is an example with launch to change dispatcher
I've tried also to change again context using
withContext
only when calling
send
to set the scope given by
channelFlow
but never emits anything
m

Marko Mitic

08/05/2019, 10:49 AM
Your scopes are probably mixing in unintended ways - channelFlow won't wait on launch to finish because it has different scope/context
Although, I didn't use flow and might be talking bs
l

Lorenzo Testa

08/05/2019, 10:55 AM
Oh, yes thank you! I've add a join after the launch and now works.
g

gildor

08/05/2019, 11:01 AM
Do not pass scope context to launch, it has own Job! You completely breaking structured concurrency by replacing job of launch! Just pass required dispatcher instead
You don't need any scope for this code
m

Marko Mitic

08/05/2019, 11:03 AM
yes,
channelFlow
will wait on the all coroutines launched from it's scope to finish, you just overrode the scope with your coroutineContext
a

Alexjok

08/05/2019, 1:26 PM
and also u don't need join for launch.
Copy code
fun main() = runBlocking {
    val f = channelFlow {
        launch {
            repeat(10) {
                delay(200L)
                send(it)
            }
        }
    }
    f.collect {
        println(it)
    }
    delay(3000L)
}
l

Lorenzo Testa

08/05/2019, 1:29 PM
Thank you all, I think what I actually wanted in my code was:
Copy code
launch(scope.coroutineContext) {
        f.collect {
            println(it)
        }
    }
g

gildor

08/05/2019, 1:31 PM
launch(scope.coroutineContext)
This is wrong for 99% of cases (and I know 0 cases for rest 1%)
Copy code
fun main() = runBlocking {
    val f = channelFlow {
        launch {
            repeat(10) {
                delay(200L)
                send(it)
            }
        }
    }
    f.collect {
        println(it)
    }
}
you even can replace
fun main() = runBlocking
with
suspend fun main()
, you don’t need any scope for this, you don’t have any background job
l

Lorenzo Testa

08/05/2019, 1:33 PM
mmm, why is wrong? I want it to stop send when the scope is cancelled
g

gildor

08/05/2019, 1:34 PM
yes, it’s wrong, it controlled by scope where you call
collect
, it’s how cancellation propagated
no need to control cancellation of
channelFlow
, you should control cancellaton of
collect
instead
l

Lorenzo Testa

08/05/2019, 1:36 PM
I understood that, so in the last code I showed that I wrapped
collect
instead on put launch inside
channelFlow
, maybe it wasn't clear or I'm not understanding what you're explaining
g

gildor

08/05/2019, 1:38 PM
anyway
launch(scope.coroutineContext)
is wrong
scope.launch { }
is what you need
l

Lorenzo Testa

08/05/2019, 1:39 PM
I thought they were the same, what's the difference?
m

Marko Mitic

08/05/2019, 1:39 PM
Convention 🙂
g

gildor

08/05/2019, 1:40 PM
because you already launch on another scope, and than replace job!
l

Lorenzo Testa

08/05/2019, 1:40 PM
so result is the same?
g

gildor

08/05/2019, 1:40 PM
for your example on practice there is no difference
but it’s huge difference in general, you essentially replace scope’s scope, it doesn’t belong anymore to receiver scope
and this is exactly the issue that cause bug from your original message
This is example of cancellation of flow: https://pl.kotl.in/-fth17qy-
Just want to be clear about my message:
because you already launch on another scope, and than replace job!
runBlocking
already provides scope and you run
launch
there, but than replace it with another Job, so you broke StructuredConcurrency, launch is not a part of parent scope anymore so parent scope may finish before your background coroutine (this is what happening in your first message) and instead will finish when new scope job will be cancelled, it’s not only confusing and error prone, but also I see no reason to do that, you should use parent scope in most of cases
l

Lorenzo Testa

08/05/2019, 1:54 PM
now is more clear, but I'm not sure that there is no reason to do that
Anyway thank you very much for the help and for your time!
g

gildor

08/05/2019, 2:02 PM
but I’m not sure that there is no reason to do that
I really don’t think that there is any valid use case for this. All coroutine builders designed to be launch only on particular scope,
launch
is extension function of CoroutineScope, so you cannot call
launch
without scope on receiver. Of course, you can try to inherit scope of one scope and replace some parts of it, but i would be much more explicit in this case and pass particular elements of scope, but I also don’t have any valid use cases for this
2 Views