Hello, when processing this, I realized that a me...
# coroutines
g
Hello, when processing this, I realized that a message is skipped due to the timeout. It's important for my application to avoid skipping messages. How could I avoid that? Any hint appreciated :)
Copy code
fun main() = runBlocking {
  val channel = Channel<String>()
  var i = 0
  launch {
    repeat(1000) {
      channel.send("i=$i")
      i++
    }
  }

  withTimeoutOrNull(20L) {
    while (true) {
      val message = channel.receive()
      println(message)
    }
  }

  println("TIMEOUT")

  while (true) {
    val message = channel.receive()
    println(message)
  }
}
I've found this:
Copy code
withTimeoutOrNull(20L) {
    while (isActive) {
      withContext(NonCancellable) {
        val message = channel.receive()
        println(message)
      }
    }
  }
m
The problem is that withTimeoutOrNull is asynchronous, so the cancellation can happen in between the receive and the
println()
. See here for details.
Ah, you beat me to it 🙂
g
actually, my solution above if wrong, because the code could stay stuck at channel.receive(). And the message "TIMEOUT" will never be displayed. I can see here:
Copy code
fun main() = runBlocking {
  val channel = Channel<String>()
  var i = 0
  // Une coroutine pour envoyer un message dans le canal après un délai
  launch {
    repeat(200) {
      channel.send("i=$i")
      i++
    }
  }

  withTimeoutOrNull(20L) {
    while (isActive) {
      withContext(NonCancellable) {
        val message = channel.receive()
        println(message)
      }
    }
  }

  println("TIMEOUT")

  while (true) {
    val message = channel.receive()
    println(message)
  }
}
Look at section: Prompt cancellation guarantee
👍 1
And next: Undelivered elements
g
Thx, it seems a right solution would be:
Copy code
fun main() = runBlocking {

  val execute = { message: String -> println(message) }

  val channel = Channel(onUndeliveredElement = execute)

  var i = 0
  // Une coroutine pour envoyer un message dans le canal après un délai
  launch {
    repeat(1000) {
      channel.send("i=$i")
      i++
    }
  }


  withTimeoutOrNull(20L) {
    while (true) {
      execute(channel.receive())
    }
  }

  println("TIMEOUT")

  while (true) {
    execute(channel.receive())
  }
}
I wonder if there is a more elegant/obvious one
ok ChatGPT helped me - the issue is that the
receive()
function is within
withTimeoutOrNull
- I can do differrently:
Copy code
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun <S, T> Channel<S>.onReceiveTimeout(timeout: Long, action: (S) -> T) {
  val endTime = System.currentTimeMillis() + timeout
  var isOpen = true

  while (isOpen) {
    // Use select to handle both timeout and receive without message loss
    select {
      onTimeout(endTime - System.currentTimeMillis()) {
        isOpen = false
      }

      onReceive {
        action(it)
      }
    }
  }
}

fun main() = runBlocking {

  val execute = { message: String -> println(message) }

  val channel = Channel(onUndeliveredElement = execute)

  var i = 0
  // Une coroutine pour envoyer un message dans le canal après un délai
  launch {
    repeat(1000) {
      channel.send("i=$i")
      i++
    }
  }

  channel.onReceiveTimeout(30L) {
    println(it)
  }

  println("TIMEOUT")

  while (true) {
    val message = channel.receive()
    println(message)
  }
}
by separating the tiemout from the channel.receive() function I do not lost any message anymore
p
What if you change 'Channel(onUndeliveredElement = execute)' to 'Channel()'? If after this change you'll have lost messages, than this variant is the same as previous one.
g
Sorry yes you can remove onUndeliveredElement, it works
👍 1