https://kotlinlang.org logo
Title
l

Lilly

03/23/2021, 4:54 PM
Is there an equivalent for
while (!packetChannel.isEmpty) {
	packetChannel.receive()
}
in flows when I convert the channel to a flow:
packetChannel.receiveAsFlow()
Edit: I would like to consume the channel as a flow, but instead of collect all values then parse I would like to collect one value, parse it, wait for next one then emit final result
d

Dominaezzz

03/23/2021, 4:56 PM
packetChannel.receiveAsFlow().toList()
l

Lilly

03/23/2021, 5:06 PM
@Dominaezzz hmm I see I didn't elaborate my question properly. I would like to receive one value and parse it rather than wait for all all values then parse it. It want to make a shift from synchronous processing to asynchronous.
e

ephemient

03/23/2021, 5:07 PM
packetFlow.collect {
    // it: Packet
}
l

Lilly

03/23/2021, 5:29 PM
Basically I need a condiition when all values have been arrived to emit the final result to the caller.
fun reutnResult() = flow {
  val destination = mutableMapOf()
  subscribeToPacketChannel().collect { value ->
     // do some heavy work
     // put result of work into destination
     // when done close this -> how to notice?
   }
   ...
   emit(destination) // result
}
e

Erik

03/23/2021, 5:30 PM
Close the channel at the sender side when all packets sent
l

Lilly

03/23/2021, 5:31 PM
Is there another option. The channel has to stay open
e

Erik

03/23/2021, 5:32 PM
Send a token that indicates that you've received enough items to be emitted?
l

Lilly

03/23/2021, 5:32 PM
The producer on the other side is a bluetooth classic input stream and is kept open until socket is closed
Would be a solution. Is there no possibility to check if
packetChannel.isEmpty
in flow?
l

Lilly

03/23/2021, 5:40 PM
But I'm not able to check for it in flow?!
Send a token that indicates that you've received enough items to be emitted?
The problem here I currently noticed is that I don't collect prmitves but instead a
ResponsePacket
so I would have to fake one to indicate last value
e

Erik

03/23/2021, 5:42 PM
Or send null, or some other wrapped value?
l

Lilly

03/23/2021, 5:57 PM
I guess there is no other solution right?
e

ephemient

03/23/2021, 5:58 PM
well maybe you could create and use some debounce-like operator, there was one in this channel just a little while ago
but honestly this seems like doing it the hard way. if you want to operate on groups of packets, can't you group them before stuffing them in the flow?
l

Lilly

03/23/2021, 6:06 PM
That's what my current approach is doing.
internal suspend fun sendPacketWithBatchResponse(
        packet: RequestPacket
        onResponse: suspend (List<ResponsePacket>) -> Unit
    ) {
        sendPacketWithConfirmation(packet)
        onResponse(
            mutableListOf<ResponsePacket>().apply {
                while (!packetChannel.isEmpty) {
                    add(packetChannel.receive())
                }
            }
        )
    }
TBH I don't even know why this
while
is working when I think about it now. But it works and it returns a group. Then I do my processing. But this performs synchronous. You have to know that I make a request to the device and the device could reponse with 1, 2,3 or 30 packets depending on the request I send. I would like to shift to an asynchronously approach, that says I collect one packet and while others are on the way to arrive I can process the arrived packets
e

Erik

03/23/2021, 6:14 PM
Maybe it's better to just send a request to the device and receive all packets from it and treat that as one transaction. And repeat that for every request. Or don't you know when the last packet was responded?
l

Lilly

03/23/2021, 6:18 PM
And here begins the story ^^. How the device responds is handled by a protcol which is out of my control. It's more than 20 years old and it's a huge bunch of shit unfortunately.
We have 2 possible solutions to indicate, transaction is done 1. close channel 2. offer an indicator What about a timer? If within 2 seconds nothing arrives/is collected, close flow and emit collection to caller?
e

ephemient

03/23/2021, 6:26 PM
not sure that will give you what you need, but you could start experimenting from there
l

Lilly

03/23/2021, 6:31 PM
I will have a look, thanks!
n

Natsuki(开元米粉实力代购)

03/24/2021, 6:09 AM
will
fold
fits your needs?
val result = flowOf(1,2,3,4).fold(init) { acc,ele ->
  // do what ever with ele and acc, return the desired acc if you want to use it next iteration 
}

println(result) // flow has finished, the last result you returned will print here
l

Lilly

03/24/2021, 11:37 AM
@Natsuki(开元米粉实力代购) Thanks for your time. I had the same idea. This would work and all the other solutions too. But what I'm missing is an appropriate cancel condition. My flow is a channel, i.e. it won't close until I close the channel (no option) or cancel the flow manually (can't determine this properly). But I found a way which I can live with