Jorge Castillo
09/04/2021, 10:52 AMChannel
does this, but I'm trying to manually write a Queue
that suspends when enqueing an element and the queue is full, and also suspends when dequeing and the queue is empty
class Queue<T>(private val limit: Int = 5) {
private val elements: MutableList<T> = mutableListOf()
suspend fun enqueue(t: T): Unit = suspendCancellableCoroutine { cont ->
var enqueued = false
while (!enqueued) {
if (elements.size < limit) {
elements.add(t)
enqueued = true
cont.resume(Unit)
}
}
}
suspend fun dequeue(): T = suspendCancellableCoroutine { cont ->
var dequeued = false
while (!dequeued) {
if (elements.isNotEmpty()) {
val element = elements.first()
elements.remove(element)
dequeued = true
cont.resume(element)
}
}
}
}
Adding a question about how to test this in a thread.Jorge Castillo
09/04/2021, 10:52 AMJorge Castillo
09/04/2021, 10:52 AM@Test
fun `enqueuing over limit suspends until a slot is available`() {
runBlocking {
val queue = Queue<Int>(limit = 2)
queue.enqueue(1)
println("Enqueue 1")
queue.enqueue(2)
println("Enqueue 2")
val job1 = launch {
delay(500)
println("Suspending to enqueue 3")
queue.enqueue(3)
println("3")
}
val job2 = launch(Dispatchers.Default) {
delay(1000)
println("Dequeuing 2")
queue.dequeue()
println("Dequeued value: 2")
}
job1.join()
job2.join()
assertThat(queue.dequeue(), `is`(2))
assertThat(queue.dequeue(), `is`(3))
}
}
}
Jorge Castillo
09/04/2021, 10:53 AMEnqueue 1
Enqueue 2
Suspending to enqueue 3
Dequeuing 2
Dequeued value: 2
But should be enqueing 3 after 2 is dequeued since there is a new slot available and the active wait of enqueue
is waiting for that.uli
09/04/2021, 12:14 PMelements
is not thread safe, but used from different coroutinesJorge Castillo
09/04/2021, 12:54 PMJorge Castillo
09/04/2021, 1:03 PM(`java.util.Collections.synchronizedList(mutableListOf<T>())
Zach Klippenstein (he/him) [MOD]
09/04/2021, 2:13 PMJorge Castillo
09/04/2021, 5:29 PMJorge Castillo
09/04/2021, 5:32 PMJorge Castillo
09/04/2021, 5:35 PMJorge Castillo
09/04/2021, 5:37 PMZach Klippenstein (he/him) [MOD]
09/04/2021, 6:53 PMZach Klippenstein (he/him) [MOD]
09/04/2021, 6:56 PMZach Klippenstein (he/him) [MOD]
09/04/2021, 6:58 PMuli
09/04/2021, 7:11 PMenqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
enqueueResult is Receive<*> -> {} // try to offer instead
And both should, as already proposed, not happen:
ENQUEUE_FAILED -- buffer is not full (should not enqueue)
ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
As I understand it, it would be a bug if enqueueSend
(i.e. sendSuspend
) was called by send
if, instead offer
could have been called.
And that is exactly what you find down the call stack in `send`:
public final override suspend fun send(element: E) {
// fast path -- try offer non-blocking
if (offerInternal(element) === OFFER_SUCCESS) return
// slow-path does suspend or throws exception
return sendSuspend(element)
}
sendSuspend
-> enqueueSend
is only called if offer did not work.Jorge Castillo
09/04/2021, 7:53 PM