So I'm trying to wrap my head around interop betwe...
# coroutines
d
So I'm trying to wrap my head around interop between traditional threads and coroutines. I have a pair of reader/writer threads performing blocking operations on a serial port (using a Java library that naturally, has no concept of coroutines). I would like to execute a suspendable callback for each received message on the serial port, with some ordering guarantees (e.g, callback for message N+1 must not execute before the callback for message N has finished). Below is a toy example that demonstrates roughly what I'm doing. I have a channel that serializes the messages to be processed and a coroutine that consumes the messages in order. To actually put messages into the channel though, I launch a new coroutine just to send the message. That seems kind of weird and has me wondering if I'm overlooking something obvious.
Copy code
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.concurrent.thread

var readCount = 0
fun blockingSerialRead(): String {
    val message = "MSG$readCount"
    readCount += 1
    // Pretend to take some time, blocking.
    Thread.sleep(500L)
    return message
}

fun main(args: Array<String>) {
    runBlocking {
        val messages = Channel<String>()
        thread(start = true) {
            while (true) {
                val message = blockingSerialRead()
                println("Read message $message")
                launch {
                    messages.send(message)
                }
            }
        }

        // Start a coroutine that does work on each received message
        launch {
            var takeExtraTime = false
            messages.consumeEach {
                // Do some suspendable work with the message
                // For example, maybe we'll send a message back
                // and wait for the response.
                if (takeExtraTime) {
                    // Pretend to do some expensive suspendable work
                    delay(1000L)
                    takeExtraTime = false
                } else {
                    takeExtraTime = true
                }
                println("Processed message $it")
            }
        }
    }
}
Hmm, actually, I guess for channels specifically, I could replace launching the message.send coroutine with calling
trySend()
until it succeeds or the channel is closed. Kind of ugly that I would have to either accept that it might burn a bunch of cycles trying to send repeatedly or add a backoff compared to if there were a blocking equivalent for calling from traditional threads, but that seems like a workable solution.
Oh, I take that back. I see that there is a trySendBlocking extension function that seems like it should do exactly what I want.
👍 1
y
Not having reviewed it fully, but I wouldn't mix traditional threading and coroutines. Consider using (launching?) a coroutines and withContext(Dispatchers.IO) for any code that is doing (only) blocking IO. You still get other benefits like structure concurrency, cancellation etc.
d
That does seem a lot simpler. Should I worry that the reader coroutine would be spending most of its time waiting on blocking read calls, as opposed to being suspended? I see that Dispatchers.IO is elastic, so maybe it will just add another thread to the pool to compensate and it won't be any worse than having the dedicated reader thread?
y
Exactly - it's designed for that. But shouldn't be used to anything contributing substantial CPU load.
d
I see, that simplifies things quite a bit. Thanks!
d
two notes for future readers: A) Launching a coroutine from a thread-aware-only function is the only way to interact with suspending functions, any calls that appear to avoid that are really doing it inside. The coroutine libraries are written to be non blocking so any function that may need to wait for some event to occur - like a full buffer being consumed enough to make room for the next item -- needs to suspend -- and for that you need a coroutine. B) creating coroutines is not expensive. Try it, you will be supprised at how lightweight it is -- compared to almost anything -- most 'normal' functions that do very little have more overhead then creating a coroutine. C) I cant count, its 3: to reduce the 'minimal' overhead of coroutine creation, you can reuse them by making one coroutine at the 'top' -- providing its all in scope as your sample function -- the "while" loop -- if instead of a thread, as @yschimke suggests create 1 coroutine outside the while() -- then you dont need one inside -- but -- note B: if the only reason you do so is avoiding the overhead of coroutine creation -- take another look, often by the time you do what it takes to arrange the code to make it share 1 coroutine you've added complexity and code that more then negate the imagined savings. Look at the implementations of light functions you would otherwise not think twice about using -- like delay() -- you will either be shocked and stop using them, * ---- or ---- * realize you should worry about more important things then the overhead of a coroutine creation.
1