Devan Lai
03/15/2022, 10:50 PMimport kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.concurrent.thread
var readCount = 0
fun blockingSerialRead(): String {
val message = "MSG$readCount"
readCount += 1
// Pretend to take some time, blocking.
Thread.sleep(500L)
return message
}
fun main(args: Array<String>) {
runBlocking {
val messages = Channel<String>()
thread(start = true) {
while (true) {
val message = blockingSerialRead()
println("Read message $message")
launch {
messages.send(message)
}
}
}
// Start a coroutine that does work on each received message
launch {
var takeExtraTime = false
messages.consumeEach {
// Do some suspendable work with the message
// For example, maybe we'll send a message back
// and wait for the response.
if (takeExtraTime) {
// Pretend to do some expensive suspendable work
delay(1000L)
takeExtraTime = false
} else {
takeExtraTime = true
}
println("Processed message $it")
}
}
}
}
trySend()
until it succeeds or the channel is closed. Kind of ugly that I would have to either accept that it might burn a bunch of cycles trying to send repeatedly or add a backoff compared to if there were a blocking equivalent for calling from traditional threads, but that seems like a workable solution.yschimke
03/16/2022, 6:47 AMDevan Lai
03/16/2022, 3:52 PMyschimke
03/16/2022, 6:24 PMDevan Lai
03/16/2022, 6:25 PMDALDEI
06/05/2022, 10:33 AM