Is it possible to use a Channel A, and remove elem...
# coroutines
j
Is it possible to use a Channel A, and remove elements sent to it, while consuming and consume element? If not, whats the best way of doing it thats concurrent safe and suspending in same way consume blocking work inside each consumed element from Channel? Just one additional problem is that ALSO need to be able to cancel ongoing consuming event in Channel, not only pending ones.
b
I have this same issue and I think I came up with a solution. Taken from my gist. I’m still playing around with it a little bit but it does work as I expect it would. I needed the additional functionality of being able to determine when no elements are in the queue which it does by adding a timeout of 1. A timeout of 0 will return immediately so there’s a short time between the next suspending element getting into the queue that was causing a small amount of unexpected behavior in certain scenarios. It’s a rendezvous channel so if you try and add an element to it, it will suspend until the current element is done being processed. If you cancel the element currently being worked on, then the first element to suspend will be added to the channel.
Copy code
class DeferredQueue<R>(private val coroutineScope: CoroutineScope) {
  private val queue = Channel<Deferred<R>>(Channel.RENDEZVOUS)
  private var onIdle: (suspend () -> Unit) = {}

  init {
    coroutineScope.launch {
      while (true) {
        select {
          queue.onReceive { element ->
            element.join()
          }

          // When there are no items in the queue, invoke the onIdle callback if set then wait for an item in the queue
          // to continue
          onTimeout(1) {
            onIdle()
            queue.receive().join()
          }
        }
      }
    }
  }

  fun onIdle(block: suspend () -> Unit): DeferredQueue<R> = also { onIdle = block }

  suspend fun add(block: suspend CoroutineScope.() -> R): R {
    val deferred = coroutineScope.async(start = CoroutineStart.LAZY) {
      block()
    }

    queue.send(deferred)
    return deferred.await()
  }
}