If I create a new coroutine context and use `withC...
# coroutines
d
If I create a new coroutine context and use
withContext
to
launch
a bunch of `Job`s in that context, does that mean I don't have to explicitly
join
all those jobs at the end of the block?
d
Yes,
withContext() {...}
is a
CoroutineScope
, it will persist until all its children are done.
d
All right. This is starting to coalesce for me here. I think I was leaning too heavily on the `async`/`await` style of invocation in JS land.
Is this a good tree to be barking up, or does this functionality already exist within kotlinx.coroutines?
b
I'm kind of confused by 1)
GlobalScope.launch
when you're already inside of a CoroutineScope 2)
.launch(content)
and then immediately doing
withContext
using the same exact context.
launch
is already a CoroutineScope. No need to immediately open a nested one
d
I'm a bit confused by coroutines in general... 😓
So that's the result of a lot of trial and error, some stuff is there from previous attempts
GlobalScope.launch
makes it run as a daemon, right? So if a rogue consumer doesn't finish consuming, it won't block the runtime from exiting
CoroutineScope
receiver was part of an earlier attempt that I haven't gotten around to cleaning up
And then
withContext
... I think I was originally thinking I needed to wait around for all those `launch`es to finish so I could do some cleanup
b
Copy code
launch {
    launch { delay(100) }
    launch { delay(200) }
    launch { delay(300) }
}
That top-level job doesn't complete until all of the children jobs complete too
GlobalScope
just removes it from being properly tracked by structured hierarchy. If the JVM runtime exits, all
GlobalScope
jobs will cancel too
d
So something like...
Copy code
interface TriplexChannel<I, out O> : SendChannel<I>, ReceiveChannel<O>, Closeable {
    data class Error<I>(val input: I, val error: Throwable)

    val errors: ReceiveChannel<Error<I>>
}

@ExperimentalCoroutinesApi
@UseExperimental(ExperimentalTypeInference::class)
fun <I, O> CoroutineScope.triplex(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = Channel.RENDEZVOUS,
    capacityOut: Int = capacity,
    capacityErrors: Int = capacity,
    @BuilderInference block: suspend SendChannel<O>.(I) -> Unit
): TriplexChannel<I, O> {
    val channelIn = Channel<I>(capacity)
    val channelOut = Channel<O>(capacityOut)
    val channelErrors = Channel<TriplexChannel.Error<I>>(capacityErrors)

    launch(context) {
        for (value in channelIn) {
            launch {
                try {
                    channelOut.block(value)
                } catch (e: Throwable) {
                    when (e) {
                        is Error -> throw e
                        else -> channelErrors.send(TriplexChannel.Error(value, e))
                    }
                }
            }
        }
    }.invokeOnCompletion { e ->
        when(e) {
            null -> {
                channelOut.close()
                channelErrors.close()
            }
            else -> {
                launch { channelIn.close(e) }
                launch { channelOut.close(e) }
                launch { channelErrors.close(e) }
            }
        }
    }

    return object : TriplexChannel<I, O>, 
        SendChannel<I> by channelIn, 
        ReceiveChannel<O> by channelOut,
        Closeable
    {
        override val errors = channelErrors
        override fun close() {
            launch {
                channelIn.close()
            }
        }
    }
}
That still relies on the caller to close the channel but at least kotlin provides
use
GlobalScope.triplex
works too now that I think about it.