bj0
07/09/2019, 11:45 PMFlow
of data that has separators in it, is there an easy way to split it into a flow of separated chunks? ie: "@some data@more data@
-> [some data, more data]
jw
07/09/2019, 11:49 PMflatMapConcat(item -> split(item).toFlow())
bj0
07/09/2019, 11:59 PMbj0
07/09/2019, 11:59 PMjw
07/09/2019, 11:59 PMbj0
07/10/2019, 12:00 AMjw
07/10/2019, 12:01 AMjw
07/10/2019, 12:01 AMbj0
07/10/2019, 12:02 AMbj0
07/10/2019, 12:02 AMbj0
07/10/2019, 12:02 AMbj0
07/10/2019, 12:04 AMfun Flow<ByteArray>.parse() = flow {
// message separator
val eom = 0x7e.toByte()
var data = flatMapConcat { it.asList().asFlow() }
while (true) {
data = data.dropWhile { it != eom }.drop(1)
val msg = data.takeWhile { it != eom }
.toList().toByteArray()
if (msg.isEmpty())
break
emit(msg)
data.drop(msg.size)
}
}
bj0
07/10/2019, 12:05 AMmsg.isEmpty()
is a problem, if there's an empty message it will stop early, but if it's not there the loop never endsZach Klippenstein (he/him) [MOD]
07/10/2019, 12:39 AMval eom = 0x7e.toByte()
fun Flow<ByteArray>.parse() = flow {
val data = flatMapConcat {
it.toList().asFlow()
}
val buffer = mutableListOf<Byte>()
data.collect { byte ->
if (byte == eom) {
emit(buffer.toList())
buffer.clear()
} else {
buffer += byte
}
}
// Emit the last buffer.
emit(buffer.toList())
}
Zach Klippenstein (he/him) [MOD]
07/10/2019, 12:43 AMBuffer
instead of a simple mutable list, since it’s probably more efficient, but same idea.ursus
07/10/2019, 3:28 AMbuffer
?louiscad
07/10/2019, 6:42 AMursus
07/10/2019, 6:43 AMasync { }
? then it would be needed right#louiscad
07/10/2019, 6:47 AMemit
from inside them because the CoroutineContext
would not be the same (different Job
). Calls to emit
must be called on the came CoroutineContext
, it's specified in the documentation, and checked at runtime.ursus
07/10/2019, 6:51 AMlouiscad
07/10/2019, 6:52 AMursus
07/10/2019, 6:53 AMlouiscad
07/10/2019, 6:55 AMMutex
or actor
when you still need it.ursus
07/10/2019, 6:57 AMjw
07/10/2019, 11:09 AMlouiscad
07/10/2019, 11:42 AMDeferred
into a List<Deferred>
and call awaitAll()
on it. You shouldn't need extra synchronization if you don't have shared mutable state as I said.bj0
07/10/2019, 2:59 PMZach Klippenstein (he/him) [MOD]
07/10/2019, 11:57 PMbj0
07/11/2019, 8:41 PM.split
methodZach Klippenstein (he/him) [MOD]
07/11/2019, 9:16 PMsplit
is probably going to use some mutable data structure or a mutable reference to a persistent immutable data structure under the hood anyway though.