Lilly
12/11/2020, 9:01 PMprivate val packetChannel: Channel<ResponsePacket> = Channel(Channel.UNLIMITED)
suspend fun sendPacketWithBatchResponse(packet: RequestPacket): List<ResponsePacket> {
sendPacketWithAcknowledgement(packet)
val packets = mutableListOf<ResponsePacket>()
for (item in packetChannel) {
Timber.e("here")
packets.add(item)
}
// Unreached code
return packets
}
// Another non-suspending function which is decoupled from the consuming function
fun produce() {
...
packetChannel.offer(packet)
}
The goal is to offer multiple packets (~20) to the channel and when finished jump to the sendPacketWithBatchResponse and consume these packets all at once but sendPacketWithBatchResponse never returns and I don't know whytravis
12/11/2020, 9:10 PMoffer call in produce returned true?travis
12/11/2020, 9:12 PMChannel then the for loop will go forever.
You need some criteria in which to stop consuming the Channel.
Perhaps something like take(n) if you know how many packets you want to consume?Lilly
12/11/2020, 10:32 PMtake is deprecated but what works is:
val packets = mutableListOf<ResponsePacket>()
repeat(20) {
packets.add(packetChannel.receive())
}
But why does this work and something like packetChannel.toList() not?Lilly
12/11/2020, 10:46 PMpacketChannel.offer returns always truetravis
12/11/2020, 10:50 PMreceive will receive a single time. It suspends until an item is available and takes it from the Channel.
So with the repeat you're essentially saying you want to pull 20 items from the Channel (one after another), then you're done.
toList will wait until the Channel closes before it is considered "done". In other words, it needs some way to know how many items to put in the List it is returning, Channel termination is how it decides it is time to return the list of items.travis
12/11/2020, 10:52 PMtake in front of toList to get a set number of items returned as a list:
packetChannel.take(20).toList()travis
12/11/2020, 10:52 PMChannel are deprecated and will go away soon. You should operate on a Flow.
private val packetChannel: Channel<ResponsePacket> = Channel(Channel.UNLIMITED)
private val packetFlow = packetChannel.receiveAsFlow()
Then you can push values into your data stream with offer on packetChannel and pull values from the stream via the packetFlow.Lilly
12/12/2020, 12:14 AM