https://kotlinlang.org logo
#coroutines
Title
# coroutines
l

Lilly

03/19/2021, 1:37 AM
Hi, I'm struggling with an specific implementation of my use case. I have a simple channel
Copy code
private val packetChannel: Channel<ResponsePacket> by lazy { Channel(Channel.UNLIMITED) }
that receives
ResponsePacket
which is just a data class which holds a
ByteArray
. After making a request to the API that produces the packets, I offer it via the channel (packets are only offered on request and communication is sequential, so request waits until packet is offered). I would like to consume these packets via flow but some packets are different which require different actions on collection, see screenshot. I'm clueless how to achieve this. For case 1 I would have to request 3 packets and while packet A is received I would parse it but then would have to wait until packet B + C is received and parsed and then merge the results of these and emit the result or otherwise case 2. That's the part I can't get my head around. Some help is much appreciated 🙏
e

Erik

03/19/2021, 6:58 AM
Your can likely achieve this with the
transform
operator. When a packet arrives, you can check its contents/type and start processing it with strategy one or two. Once processed you can emit the result. That can be consumed downstream.
I'm not sure if you know the order of packets once you received packet A? Are you sure you will always receive B and C next? Sounds unlikely if network IO is involved...
l

Lilly

03/20/2021, 12:35 AM
@Erik Thanks for your reply. Unfortunately it's a bit more complex. The communication to the data source is synchronous by nature so the order is ensured. It's a bluetooth classic device btw. Maybe some code snippets will enlighten this situation:
Copy code
in repo class

    override suspend fun fetchParameter() = flow {
        // I have to send a request for packet A synchronously because I need some bytes in this packet to determine if isLagacy is true or not
        dataSource.sendPacketWithResponse(A) { A ->
                version = A.bytes[0].also { version ->
                    isLegacy = version < 800
                }
            }

            if (isLegacy) dataSource.sendPacketWithResponseAsync(listOf(B, C)) // send request for B, C asynchronously --> channel will receive response for B, C
            else dataSource.sendPacketWithResponseAsync(Y) // send request for Y asynchronously --> channel will receive 20x response for Y
        }
        // Emit all received packets to caller (use case)
        dataSource.subscribeToPacketChannel().collect { responsePacket -> emit(responsePacket) } // this line feels not done the right way
    }
I guess this would work but it feels not done the right way yet. In use case I would consume the packets and apply some business logic (transform). This leads to the next problem I can't solve. Lets say packet A arrives in use case and it gets transformed, how would I wait for packet B and C to arrive + get transformed and then transform altogehter?
So at the end I need some mechanism to wait for all packets, in case 1 3 packets in case 2 20 packets. Edit: I guess I don't have to wait for them but rather accumulate the results of transformation in a Map. A Map should be the final result. So receive packet -> transform -> push result into Map. Might be achievable with fold??!!
e

Erik

03/20/2021, 9:00 AM
Maybe just Transform: Put packet in some collection (e.g. a map like you suggested). Increment counter If old packet and counter is three packets, Then emit collection Else it's new, and if counter is twenty packets Then emit collection So you could track collection and counter state (size of the collection) outside of the transformation operation.
There's likely a more efficient or more functional way of doing this, but this is a start
l

Lilly

03/23/2021, 5:00 PM
@Erik The packet counts aren't fixed, it was just an example. In my old approach I'm using this approach to know when all values are collected:
Copy code
mutableListOf<ResponsePacket>().apply {
                while (!packetChannel.isEmpty) {
                    add(packetChannel.receive())
                }
            }
But it's an synchronous approach. Now I would like to do it asynchronously, i.e. get one packet, parse it, repeat. The result of parsing is put into a collection. Wait until all packets received and parsed and only emit the collection.
2 Views