https://kotlinlang.org logo
#coroutines
Title
# coroutines
d

dkhusainov

01/14/2020, 4:32 PM
How can I wrap reading data packets from socket inside a
Flow
? With
callbackFlow
or a channel and a separate coroutine that reads from socket and sends to channel I end up with a
Flow
which isn't cancelable because
socket.receive
is a blocking call(which supposedly never returns) inside a while loop.
d

Dominaezzz

01/14/2020, 4:40 PM
Yeah,
callbackFlow
. If there's no way to cancel
socket.receive
, then there's not much you can do about it.
a

Adam Powell

01/14/2020, 4:41 PM
you can close the socket
d

dkhusainov

01/14/2020, 5:06 PM
Closing socket is called from
onCompletion
, which is not called until the blocking call is finished
d

Dominaezzz

01/14/2020, 5:08 PM
If you're using
channelFlow
I'd expect you to be closing the socket in
awaitClose
.
☝️ 1
d

dkhusainov

01/14/2020, 5:18 PM
Yes, but the code is either not going to even reach registering the callback because the suspending lamda is doing a blocking loop, or the callback is not called because another coroutine which is running the blocking loop can't complete.
In short - I don't see a way of implementing this in a self contained way
l

louiscad

01/14/2020, 5:28 PM
You can use
launch(<http://Dispatchers.IO|Dispatchers.IO>)
in
callbackFlow
☝🏼 1
h

hmole

01/14/2020, 6:19 PM
Working variant if anyone intersted
Copy code
fun receiveUdpBroadcasts(
    port: Int,
    bufferSize: Int = DEFAULT_BUFFER_SIZE
): Flow<UdpPacket> = callbackFlow {
    val socket = DatagramSocket(port)

    suspend fun receiveAndSend() {
        val packet = DatagramPacket(ByteArray(bufferSize), bufferSize)

        socket.receive(packet)

        if (isActive && !channel.isClosedForSend) {
            channel.send(
                UdpPacket(
                    data = packet.data.toByteString(),
                    sender = socket.inetAddress
                )
            )
        }
    }

    launch(<http://Dispatchers.IO|Dispatchers.IO>) {
        while (isActive) {
            try {
                receiveAndSend()
            } catch (socketClosed: SocketException) {
                break
            }
        }
    }

    awaitClose(socket::close)
}
41 Views