Lilly
12/11/2020, 9:01 PMprivate val packetChannel: Channel<ResponsePacket> = Channel(Channel.UNLIMITED)
suspend fun sendPacketWithBatchResponse(packet: RequestPacket): List<ResponsePacket> {
sendPacketWithAcknowledgement(packet)
val packets = mutableListOf<ResponsePacket>()
for (item in packetChannel) {
Timber.e("here")
packets.add(item)
}
// Unreached code
return packets
}
// Another non-suspending function which is decoupled from the consuming function
fun produce() {
...
packetChannel.offer(packet)
}
The goal is to offer multiple packets (~20) to the channel and when finished jump to the sendPacketWithBatchResponse
and consume these packets all at once but sendPacketWithBatchResponse
never returns and I don't know whytravis
12/11/2020, 9:10 PMoffer
call in produce
returned true
?Channel
then the for
loop will go forever.
You need some criteria in which to stop consuming the Channel
.
Perhaps something like take(n)
if you know how many packets you want to consume?Lilly
12/11/2020, 10:32 PMtake
is deprecated but what works is:
val packets = mutableListOf<ResponsePacket>()
repeat(20) {
packets.add(packetChannel.receive())
}
But why does this work and something like packetChannel.toList()
not?packetChannel.offer
returns always truetravis
12/11/2020, 10:50 PMreceive
will receive a single time. It suspends until an item is available and takes it from the Channel
.
So with the repeat
you're essentially saying you want to pull 20
items from the Channel
(one after another), then you're done.
toList
will wait until the Channel
closes before it is considered "done". In other words, it needs some way to know how many items to put in the List
it is returning, Channel
termination is how it decides it is time to return the list of items.take
in front of toList
to get a set number of items returned as a list:
packetChannel.take(20).toList()
Channel
are deprecated and will go away soon. You should operate on a Flow
.
private val packetChannel: Channel<ResponsePacket> = Channel(Channel.UNLIMITED)
private val packetFlow = packetChannel.receiveAsFlow()
Then you can push values into your data stream with offer
on packetChannel
and pull values from the stream via the packetFlow
.Lilly
12/12/2020, 12:14 AM