I wanted a flow I could send values to that would ...
# flow
d
I wanted a flow I could send values to that would have multiple workers collect values and do slow tasks based on them, and have the workers pull tasks in a LIFO order. I made this. Basic tests suggest it works, but I'm a total novice so I would appreciate feedback.
Copy code
class LIFOQueue<T> {
    private val queue = ArrayDeque<T>()
    private val shouldRecheckQueue = Channel<Unit>(Channel.UNLIMITED)

    private val flow = flow<T> {
        for (msg in shouldRecheckQueue) {
            currentCoroutineContext().ensureActive()

            val item = queue.removeFirstOrNull()
            if (item != null) {
                emit(item)
            }
        }
    }

    fun enqueue(item: T) {
        queue.addFirst(item)
        if (!shouldRecheckQueue.offer(Unit)) {
            throw IllegalStateException("shouldRecheckQueue should be unlimited, so offer should never fail")
        }
    }

    suspend fun collect(block: suspend (T) -> Unit) {
        flow.collect(block)
    }
}
Copy code
@OptIn(ExperimentalCoroutinesApi::class)
class LIFOQueueTest {
    @Test
    fun `enqueues accepts values`(): Unit = runBlockingTest {
        val subject = LIFOQueue<Int>()
        subject.enqueue(1)
        subject.enqueue(2)
    }

    @Test
    fun `collect receives values in correct order`() = runBlockingTest {
        launch {
            val values = mutableListOf<Int>()
            val subject = LIFOQueue<Int>()

            subject.enqueue(1)
            subject.enqueue(2)
            subject.enqueue(3)

            subject.collect {
                values.add(it)

                if (it == 1) {
                    assertThat(values, `is`(listOf(3, 2, 1)))
                    cancel("Done")
                }
            }
        }
    }

    @Test
    fun `enqueue handles concurrent calls`() = runBlockingTest {
        launch {
            val subject = LIFOQueue<Int>()
            val callCount = 10_000

            for (n in 0..callCount) {
                launch {
                    subject.enqueue(n)
                }
            }

            var count = 0
            subject.collect {
                if (count == 0) {
                    // The head has a big number
                    assertThat(it, greaterThan(9_000))
                }

                count++

                if (count == callCount) {
                    cancel("Done")
                }
            }
        }
    }

    @Test
    fun `multiple consumers get different items`(): Unit = runBlocking {
        // NOTE: We don't use runBlocking because we need a realistic clock

        launch {
            val subject = LIFOQueue<Int>()

            subject.enqueue(1)
            subject.enqueue(2)

            var totalCallCount = 0

            launch {
                withTimeout(100) {
                    var callCount = 0
                    subject.collect {
                        callCount++
                        totalCallCount++
                        assertThat(callCount, `is`(1))

                        assertThat(it, `is`(2))

                        delay(10) // give the other collector time to get the next item
                    }
                }
            }

            launch {
                withTimeout(100) {
                    var callCount = 0
                    subject.collect {
                        callCount++
                        totalCallCount++
                        assertThat(callCount, `is`(1))

                        assertThat(it, `is`(1))
                    }
                }
            }

            delay(150) // wait for the timeouts
            assertThat(totalCallCount, `is`(2))
        }
    }
}