Hi! I know `Channel` does this, but I'm trying to ...
# coroutines
j
Hi! I know
Channel
does this, but I'm trying to manually write a
Queue
that suspends when enqueing an element and the queue is full, and also suspends when dequeing and the queue is empty
Copy code
class Queue<T>(private val limit: Int = 5) {

  private val elements: MutableList<T> = mutableListOf()

  suspend fun enqueue(t: T): Unit = suspendCancellableCoroutine { cont ->
    var enqueued = false
    while (!enqueued) {
      if (elements.size < limit) {
        elements.add(t)
        enqueued = true
        cont.resume(Unit)
      }
    }
  }

  suspend fun dequeue(): T = suspendCancellableCoroutine { cont ->
    var dequeued = false
    while (!dequeued) {
      if (elements.isNotEmpty()) {
        val element = elements.first()
        elements.remove(element)
        dequeued = true
        cont.resume(element)
      }
    }
  }
}
Adding a question about how to test this in a thread.
Wrote this test, but seems to deadlock. Any clues on why? 👇
Copy code
@Test
  fun `enqueuing over limit suspends until a slot is available`() {
    runBlocking {
      val queue = Queue<Int>(limit = 2)
      queue.enqueue(1)
      println("Enqueue 1")
      queue.enqueue(2)
      println("Enqueue 2")

      val job1 = launch {
        delay(500)
        println("Suspending to enqueue 3")
        queue.enqueue(3)
        println("3")
      }

      val job2 = launch(Dispatchers.Default) {
        delay(1000)
        println("Dequeuing 2")
        queue.dequeue()
        println("Dequeued value: 2")
      }

      job1.join()
      job2.join()

      assertThat(queue.dequeue(), `is`(2))
      assertThat(queue.dequeue(), `is`(3))
    }
  }
}
Prints:
Copy code
Enqueue 1
Enqueue 2
Suspending to enqueue 3
Dequeuing 2
Dequeued value: 2
But should be enqueing 3 after 2 is dequeued since there is a new slot available and the active wait of
enqueue
is waiting for that.
u
1st looks like you are burning CPU cycles and blocking a thread while waiting for space in the queue. 2nd access to
elements
is not thread safe, but used from different coroutines
j
In this implementation blocking is intentional, you don't want callers to resume their execution until there is space. Actually that's how Channel is implemented and allows for coroutine sincronization Indeed I suspected there's a concurrency issue here since the deadlock, I've tried a couple things but not been able to fix it so far
Sry, my mistake. Looks like using a synchronized list is enough. I.e:
Copy code
(`java.util.Collections.synchronizedList(mutableListOf<T>())
z
I haven’t had coffee yet this morning and it’s been a while since I read the channel code, but I am pretty sure this is not how channel works because it would defeat the purpose. Suspending just to block is pointless - might as well just block without suspending, but then there’s no point to even being a suspend function because you’re just a regular blocking function. Channel will enqueue the send operation itself, not just the sent value, if the channel is full when the send happens. So really there are two queues in a channel: the queue of values that have been successfully sent (where the senders have been resumed), and the queue of continuations waiting to send a value. If you actually want a blocking queue that actually blocks and is thread safe, the standard library (at least on jvm) has an implementation of that. If you want a suspending queue, are you really sure you can’t just use channel? It’s probably gonna be really hard to write a completely safe and correct implementation that is also performant, and definitely much harder than using channel. You can always wrap channel with your own api (ie use it internally inside this class you’re writing) if that’s what you want to do.
j
It was only as an exercise
The rationale I had in mind was to do like those promises you call get() on and if there is not a value (promise is not complete) it will suspend until a value is available
The implementation might definitely not be correct though
Doesn’t Channel also suspendCancellableCoroutine and while(true) to enqueue the send action and enforce await until it can be consumed? (So it will resume via the continuation) Seems like the same concept
z
It does a loop to enqueue the send operation, but it returns from that loop as soon as the operation has been enqueued, even though the caller is still suspended. In practice, I think that loop probably almost never runs more than a single iteration unless there’s a lot of contention on the channel.
The code in the OP of this thread uses suspendCoroutine, but it wouldn’t need to - blocking the thread inside a suspend function is the exact same thing as suspending the coroutine, blocking the thread, then immediately resuming (at least in the case where the coroutine isn’t cancelled). Using suspendCoroutine doesn’t mean anything if there is no code path where you return from its lambda without having resumed the continuation.
The OP has written a non-thread-safe version of a simple blocking queue. As it is, it doesn’t need to use any coroutine features or suspend functions at all, because it’s never actually suspending.
u
send only ever loops for `enqueueSend`returning `ENQUEUE_FAILED`and of type Receivve:
Copy code
enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
              enqueueResult is Receive<*> -> {} // try to offer instead
And both should, as already proposed, not happen:
Copy code
ENQUEUE_FAILED -- buffer is not full (should not enqueue)
ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
As I understand it, it would be a bug if
enqueueSend
(i.e.
sendSuspend
) was called by
send
if, instead
offer
could have been called. And that is exactly what you find down the call stack in `send`:
Copy code
public final override suspend fun send(element: E) {
        // fast path -- try offer non-blocking
        if (offerInternal(element) === OFFER_SUCCESS) return
        // slow-path does suspend or throws exception
        return sendSuspend(element)
    }
sendSuspend
->
enqueueSend
is only called if offer did not work.
j
Yeah makes sense. Thanks for the explanations, my brain is a bit overloaded today 😅 I definitely need to avoid blocking by suspending, not both. Will have a deeper look to channel. Just trying to understand how these data structures work internally