Is there an equivalent for ```while (!packetChanne...
# coroutines
l
Is there an equivalent for
Copy code
while (!packetChannel.isEmpty) {
	packetChannel.receive()
}
in flows when I convert the channel to a flow:
Copy code
packetChannel.receiveAsFlow()
Edit: I would like to consume the channel as a flow, but instead of collect all values then parse I would like to collect one value, parse it, wait for next one then emit final result
d
Copy code
packetChannel.receiveAsFlow().toList()
l
@Dominaezzz hmm I see I didn't elaborate my question properly. I would like to receive one value and parse it rather than wait for all all values then parse it. It want to make a shift from synchronous processing to asynchronous.
e
Copy code
packetFlow.collect {
    // it: Packet
}
l
Basically I need a condiition when all values have been arrived to emit the final result to the caller.
Copy code
fun reutnResult() = flow {
  val destination = mutableMapOf()
  subscribeToPacketChannel().collect { value ->
     // do some heavy work
     // put result of work into destination
     // when done close this -> how to notice?
   }
   ...
   emit(destination) // result
}
e
Close the channel at the sender side when all packets sent
l
Is there another option. The channel has to stay open
e
Send a token that indicates that you've received enough items to be emitted?
l
The producer on the other side is a bluetooth classic input stream and is kept open until socket is closed
Would be a solution. Is there no possibility to check if
Copy code
packetChannel.isEmpty
in flow?
l
But I'm not able to check for it in flow?!
Send a token that indicates that you've received enough items to be emitted?
The problem here I currently noticed is that I don't collect prmitves but instead a
ResponsePacket
so I would have to fake one to indicate last value
e
Or send null, or some other wrapped value?
l
I guess there is no other solution right?
e
well maybe you could create and use some debounce-like operator, there was one in this channel just a little while ago
but honestly this seems like doing it the hard way. if you want to operate on groups of packets, can't you group them before stuffing them in the flow?
l
That's what my current approach is doing.
Copy code
internal suspend fun sendPacketWithBatchResponse(
        packet: RequestPacket
        onResponse: suspend (List<ResponsePacket>) -> Unit
    ) {
        sendPacketWithConfirmation(packet)
        onResponse(
            mutableListOf<ResponsePacket>().apply {
                while (!packetChannel.isEmpty) {
                    add(packetChannel.receive())
                }
            }
        )
    }
TBH I don't even know why this
while
is working when I think about it now. But it works and it returns a group. Then I do my processing. But this performs synchronous. You have to know that I make a request to the device and the device could reponse with 1, 2,3 or 30 packets depending on the request I send. I would like to shift to an asynchronously approach, that says I collect one packet and while others are on the way to arrive I can process the arrived packets
e
Maybe it's better to just send a request to the device and receive all packets from it and treat that as one transaction. And repeat that for every request. Or don't you know when the last packet was responded?
l
And here begins the story ^^. How the device responds is handled by a protcol which is out of my control. It's more than 20 years old and it's a huge bunch of shit unfortunately.
We have 2 possible solutions to indicate, transaction is done 1. close channel 2. offer an indicator What about a timer? If within 2 seconds nothing arrives/is collected, close flow and emit collection to caller?
e
not sure that will give you what you need, but you could start experimenting from there
l
I will have a look, thanks!
n
will
fold
fits your needs?
Copy code
val result = flowOf(1,2,3,4).fold(init) { acc,ele ->
  // do what ever with ele and acc, return the desired acc if you want to use it next iteration 
}

println(result) // flow has finished, the last result you returned will print here
l
@Natsuki(开元米粉实力代购) Thanks for your time. I had the same idea. This would work and all the other solutions too. But what I'm missing is an appropriate cancel condition. My flow is a channel, i.e. it won't close until I close the channel (no option) or cancel the flow manually (can't determine this properly). But I found a way which I can live with