How can I wrap reading data packets from socket in...
# coroutines
d
How can I wrap reading data packets from socket inside a
Flow
? With
callbackFlow
or a channel and a separate coroutine that reads from socket and sends to channel I end up with a
Flow
which isn't cancelable because
socket.receive
is a blocking call(which supposedly never returns) inside a while loop.
d
Yeah,
callbackFlow
. If there's no way to cancel
socket.receive
, then there's not much you can do about it.
a
you can close the socket
d
Closing socket is called from
onCompletion
, which is not called until the blocking call is finished
d
If you're using
channelFlow
I'd expect you to be closing the socket in
awaitClose
.
☝️ 1
d
Yes, but the code is either not going to even reach registering the callback because the suspending lamda is doing a blocking loop, or the callback is not called because another coroutine which is running the blocking loop can't complete.
In short - I don't see a way of implementing this in a self contained way
l
You can use
launch(<http://Dispatchers.IO|Dispatchers.IO>)
in
callbackFlow
☝🏼 1
h
Working variant if anyone intersted
Copy code
fun receiveUdpBroadcasts(
    port: Int,
    bufferSize: Int = DEFAULT_BUFFER_SIZE
): Flow<UdpPacket> = callbackFlow {
    val socket = DatagramSocket(port)

    suspend fun receiveAndSend() {
        val packet = DatagramPacket(ByteArray(bufferSize), bufferSize)

        socket.receive(packet)

        if (isActive && !channel.isClosedForSend) {
            channel.send(
                UdpPacket(
                    data = packet.data.toByteString(),
                    sender = socket.inetAddress
                )
            )
        }
    }

    launch(<http://Dispatchers.IO|Dispatchers.IO>) {
        while (isActive) {
            try {
                receiveAndSend()
            } catch (socketClosed: SocketException) {
                break
            }
        }
    }

    awaitClose(socket::close)
}