Lilly
03/19/2021, 1:37 AMprivate val packetChannel: Channel<ResponsePacket> by lazy { Channel(Channel.UNLIMITED) }
that receives ResponsePacket
which is just a data class which holds a ByteArray
. After making a request to the API that produces the packets, I offer it via the channel (packets are only offered on request and communication is sequential, so request waits until packet is offered). I would like to consume these packets via flow but some packets are different which require different actions on collection, see screenshot. I'm clueless how to achieve this. For case 1 I would have to request 3 packets and while packet A is received I would parse it but then would have to wait until packet B + C is received and parsed and then merge the results of these and emit the result or otherwise case 2. That's the part I can't get my head around. Some help is much appreciated 🙏Erik
03/19/2021, 6:58 AMtransform
operator. When a packet arrives, you can check its contents/type and start processing it with strategy one or two. Once processed you can emit the result. That can be consumed downstream.Erik
03/19/2021, 7:00 AMLilly
03/20/2021, 12:35 AMin repo class
override suspend fun fetchParameter() = flow {
// I have to send a request for packet A synchronously because I need some bytes in this packet to determine if isLagacy is true or not
dataSource.sendPacketWithResponse(A) { A ->
version = A.bytes[0].also { version ->
isLegacy = version < 800
}
}
if (isLegacy) dataSource.sendPacketWithResponseAsync(listOf(B, C)) // send request for B, C asynchronously --> channel will receive response for B, C
else dataSource.sendPacketWithResponseAsync(Y) // send request for Y asynchronously --> channel will receive 20x response for Y
}
// Emit all received packets to caller (use case)
dataSource.subscribeToPacketChannel().collect { responsePacket -> emit(responsePacket) } // this line feels not done the right way
}
I guess this would work but it feels not done the right way yet. In use case I would consume the packets and apply some business logic (transform). This leads to the next problem I can't solve. Lets say packet A arrives in use case and it gets transformed, how would I wait for packet B and C to arrive + get transformed and then transform altogehter?Lilly
03/20/2021, 1:03 AMErik
03/20/2021, 9:00 AMErik
03/20/2021, 9:01 AMLilly
03/23/2021, 5:00 PMmutableListOf<ResponsePacket>().apply {
while (!packetChannel.isEmpty) {
add(packetChannel.receive())
}
}
But it's an synchronous approach. Now I would like to do it asynchronously, i.e. get one packet, parse it, repeat. The result of parsing is put into a collection. Wait until all packets received and parsed and only emit the collection.