dkhusainov
01/14/2020, 4:32 PMFlow
? With callbackFlow
or a channel and a separate coroutine that reads from socket and sends to channel I end up with a Flow
which isn't cancelable because socket.receive
is a blocking call(which supposedly never returns) inside a while loop.Dominaezzz
01/14/2020, 4:40 PMcallbackFlow
. If there's no way to cancel socket.receive
, then there's not much you can do about it.Adam Powell
01/14/2020, 4:41 PMdkhusainov
01/14/2020, 5:06 PMonCompletion
, which is not called until the blocking call is finishedDominaezzz
01/14/2020, 5:08 PMchannelFlow
I'd expect you to be closing the socket in awaitClose
.dkhusainov
01/14/2020, 5:18 PMlouiscad
01/14/2020, 5:28 PMlaunch(<http://Dispatchers.IO|Dispatchers.IO>)
in callbackFlow
hmole
01/14/2020, 6:19 PMfun receiveUdpBroadcasts(
port: Int,
bufferSize: Int = DEFAULT_BUFFER_SIZE
): Flow<UdpPacket> = callbackFlow {
val socket = DatagramSocket(port)
suspend fun receiveAndSend() {
val packet = DatagramPacket(ByteArray(bufferSize), bufferSize)
socket.receive(packet)
if (isActive && !channel.isClosedForSend) {
channel.send(
UdpPacket(
data = packet.data.toByteString(),
sender = socket.inetAddress
)
)
}
}
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
while (isActive) {
try {
receiveAndSend()
} catch (socketClosed: SocketException) {
break
}
}
}
awaitClose(socket::close)
}