Daniel
10/24/2020, 11:30 AMclass LIFOQueue<T> {
private val queue = ArrayDeque<T>()
private val shouldRecheckQueue = Channel<Unit>(Channel.UNLIMITED)
private val flow = flow<T> {
for (msg in shouldRecheckQueue) {
currentCoroutineContext().ensureActive()
val item = queue.removeFirstOrNull()
if (item != null) {
emit(item)
}
}
}
fun enqueue(item: T) {
queue.addFirst(item)
if (!shouldRecheckQueue.offer(Unit)) {
throw IllegalStateException("shouldRecheckQueue should be unlimited, so offer should never fail")
}
}
suspend fun collect(block: suspend (T) -> Unit) {
flow.collect(block)
}
}
@OptIn(ExperimentalCoroutinesApi::class)
class LIFOQueueTest {
@Test
fun `enqueues accepts values`(): Unit = runBlockingTest {
val subject = LIFOQueue<Int>()
subject.enqueue(1)
subject.enqueue(2)
}
@Test
fun `collect receives values in correct order`() = runBlockingTest {
launch {
val values = mutableListOf<Int>()
val subject = LIFOQueue<Int>()
subject.enqueue(1)
subject.enqueue(2)
subject.enqueue(3)
subject.collect {
values.add(it)
if (it == 1) {
assertThat(values, `is`(listOf(3, 2, 1)))
cancel("Done")
}
}
}
}
@Test
fun `enqueue handles concurrent calls`() = runBlockingTest {
launch {
val subject = LIFOQueue<Int>()
val callCount = 10_000
for (n in 0..callCount) {
launch {
subject.enqueue(n)
}
}
var count = 0
subject.collect {
if (count == 0) {
// The head has a big number
assertThat(it, greaterThan(9_000))
}
count++
if (count == callCount) {
cancel("Done")
}
}
}
}
@Test
fun `multiple consumers get different items`(): Unit = runBlocking {
// NOTE: We don't use runBlocking because we need a realistic clock
launch {
val subject = LIFOQueue<Int>()
subject.enqueue(1)
subject.enqueue(2)
var totalCallCount = 0
launch {
withTimeout(100) {
var callCount = 0
subject.collect {
callCount++
totalCallCount++
assertThat(callCount, `is`(1))
assertThat(it, `is`(2))
delay(10) // give the other collector time to get the next item
}
}
}
launch {
withTimeout(100) {
var callCount = 0
subject.collect {
callCount++
totalCallCount++
assertThat(callCount, `is`(1))
assertThat(it, `is`(1))
}
}
}
delay(150) // wait for the timeouts
assertThat(totalCallCount, `is`(2))
}
}
}