https://kotlinlang.org logo
#coroutines
Title
# coroutines
l

louiscad

08/14/2018, 2:06 PM
It's been closed because it's overbooked
w

withoutclass

08/14/2018, 2:06 PM
While loop probably, since the for loop will suspend waiting on items as I recall
m

Martin Devillers

08/14/2018, 2:12 PM
That’s kind of the point, since it needs to suspend until at least one item has been received
w

withoutclass

08/14/2018, 2:34 PM
What I’m saying is your if condition there will probably never trigger because the for loop will not run if it’s empty
The only way I’ve found to do batching is to use a suspension function with 2 while loops. You have an outer loop with some delay on it (or not), then you use another while loop that while the channel you want to batch is not empty, receive and add an item, then you can send/process all the items you received
Something like this:
Copy code
internal suspend fun batchEvents(inChannel: ReceiveChannel<Unit>,
                                                            outChannel: SendChannel<List<Unit>>) {

    while (isActive) {
        delay(UPDATE_INTERVAL)
        val accumulator = mutableListOf<Unit>()
        while (!inChannel.isEmpty) {  
            val event = inChannel.receive()
            accumulator.add(packet)
        }

        outChannel.send(accumulator)

        accumulator.clear()
    }
}
m

Martin Devillers

08/14/2018, 2:46 PM
I’m not trying to batch the channel based on a time period, I’m trying to bach on each call which receives elements from the channel. Basically, I’m going to have a class doing a foreach loop on the resulting
ReceiveChannel<List<T>>
. That loop should suspend until at least one item is available, then return the batch of all the elements received from the
ReceiveChannel<T>
since the last iteration. So regarding your comment
What I’m saying is your if condition there will probably never trigger because the for loop will not run if it’s empty
I don’t want the for loop to run if it’s empty, because in that case there’s nothing to handle in a batch. It should wait until a non-empty batch is available.
I hope that makes things clearer.
w

withoutclass

08/14/2018, 2:54 PM
I understand what you’re trying to do, you’re trying to read all the items out of the original channel whenever it receives an item, then send a single batched event down the producer channel. My code above was merely an example, not a solution. What I am saying is in your example, your if condition, as far as I can tell, will never be true. You can take the inner while loop I posted above, and replace the for loop with it, then move your send out of the loop
You don’t need the delay from my example for it to work, but you’ll probably want a
yield()
so that it cooperates with other things running on the dispatcher
m

Martin Devillers

08/14/2018, 3:21 PM
I tried running a test
Copy code
@Test
fun test() {
    val channel = Channel<Int>(Channel.UNLIMITED)
    val batched = channel.batched(Unconfined)
    val batches = mutableListOf<List<Int>>()
    launch(Unconfined) {
        for (it in 1..3) { channel.send(it) }
        batches += batched.receive()
        for (it in 4..6) { channel.send(it) }
        batches += batched.receive()
        batches += batched.receive()
    }
    assertEquals(listOf(listOf(1), listOf(2, 3), listOf(4, 5, 6)), batches)
}
It passes. I actually had to fix one one thing, because the batched channel needed to be a rendez-vous channel. So the for loop doesn’t seem to be an issue. Your example should also work I think, but I’m not sure that it improves anything (I was looking for maybe a more “direct” solution).
w

withoutclass

08/14/2018, 3:29 PM
Ah sweet, I was wrong then!
Sorry I wasn’t much help!
m

Martin Devillers

08/14/2018, 3:30 PM
🙂 Thanks for your input
4 Views